Skip to main content

pi/
session_index.rs

1//! SQLite session index (derived from JSONL sessions).
2
3use crate::config::Config;
4use crate::error::{Error, Result};
5use crate::session::{Session, SessionEntry, SessionHeader};
6use fs4::fs_std::FileExt;
7use serde::Deserialize;
8use sqlmodel_core::Value;
9use sqlmodel_sqlite::{OpenFlags, SqliteConfig, SqliteConnection};
10use std::borrow::Borrow;
11use std::fs::{self, File};
12use std::io::{BufRead, BufReader, Read};
13use std::path::{Path, PathBuf};
14use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
15
16const MAX_JSONL_LINE_BYTES: usize = 100 * 1024 * 1024;
17
18#[derive(Debug, Clone)]
19pub struct SessionMeta {
20    pub path: String,
21    pub id: String,
22    pub cwd: String,
23    pub timestamp: String,
24    pub message_count: u64,
25    pub last_modified_ms: i64,
26    pub size_bytes: u64,
27    pub name: Option<String>,
28}
29
30#[derive(Debug, Clone)]
31pub struct SessionIndex {
32    db_path: PathBuf,
33    lock_path: PathBuf,
34}
35
36impl SessionIndex {
37    pub fn new() -> Self {
38        let root = Config::sessions_dir();
39        Self::for_sessions_root(&root)
40    }
41
42    pub fn for_sessions_root(root: &Path) -> Self {
43        Self {
44            db_path: root.join("session-index.sqlite"),
45            lock_path: root.join("session-index.lock"),
46        }
47    }
48
49    pub fn index_session(&self, session: &Session) -> Result<()> {
50        let Some(path) = session.path.as_ref() else {
51            return Ok(());
52        };
53
54        let meta = build_meta(path, &session.header, &session.entries)?;
55        self.upsert_meta(meta)
56    }
57
58    /// Update index metadata for an already-persisted session snapshot.
59    ///
60    /// This avoids requiring a full `Session` clone when callers already have
61    /// header + aggregate entry stats.
62    pub fn index_session_snapshot(
63        &self,
64        path: &Path,
65        header: &SessionHeader,
66        message_count: u64,
67        name: Option<String>,
68    ) -> Result<()> {
69        let (last_modified_ms, size_bytes) = session_file_stats(path)?;
70        let meta = SessionMeta {
71            path: path.display().to_string(),
72            id: header.id.clone(),
73            cwd: header.cwd.clone(),
74            timestamp: header.timestamp.clone(),
75            message_count,
76            last_modified_ms,
77            size_bytes,
78            name,
79        };
80        self.upsert_meta(meta)
81    }
82
83    pub(crate) fn upsert_session_meta(&self, meta: SessionMeta) -> Result<()> {
84        self.upsert_meta(meta)
85    }
86
87    fn upsert_meta(&self, meta: SessionMeta) -> Result<()> {
88        self.with_lock(|conn| {
89            init_schema(conn)?;
90
91            conn.execute_raw("BEGIN IMMEDIATE")
92                .map_err(|e| Error::session(format!("BEGIN failed: {e}")))?;
93
94            let result = (|| -> Result<()> {
95                let message_count = sqlite_i64_from_u64("message_count", meta.message_count)?;
96                let size_bytes = sqlite_i64_from_u64("size_bytes", meta.size_bytes)?;
97                conn.execute_sync(
98                    "INSERT INTO sessions (path,id,cwd,timestamp,message_count,last_modified_ms,size_bytes,name)
99                     VALUES (?1,?2,?3,?4,?5,?6,?7,?8)
100                     ON CONFLICT(path) DO UPDATE SET
101                       id=excluded.id,
102                       cwd=excluded.cwd,
103                       timestamp=excluded.timestamp,
104                       message_count=excluded.message_count,
105                       last_modified_ms=excluded.last_modified_ms,
106                       size_bytes=excluded.size_bytes,
107                       name=excluded.name",
108                    &[
109                        Value::Text(meta.path),
110                        Value::Text(meta.id),
111                        Value::Text(meta.cwd),
112                        Value::Text(meta.timestamp),
113                        Value::BigInt(message_count),
114                        Value::BigInt(meta.last_modified_ms),
115                        Value::BigInt(size_bytes),
116                        meta.name.map_or(Value::Null, Value::Text),
117                    ],
118                ).map_err(|e| Error::session(format!("Insert failed: {e}")))?;
119
120                conn.execute_sync(
121                    "INSERT INTO meta (key,value) VALUES ('last_sync_epoch_ms', ?1)
122                     ON CONFLICT(key) DO UPDATE SET value=excluded.value",
123                    &[Value::Text(current_epoch_ms())],
124                ).map_err(|e| Error::session(format!("Meta update failed: {e}")))?;
125                Ok(())
126            })();
127
128            match result {
129                Ok(()) => {
130                    conn.execute_raw("COMMIT")
131                        .map_err(|e| Error::session(format!("COMMIT failed: {e}")))?;
132                    Ok(())
133                }
134                Err(e) => {
135                    let _ = conn.execute_raw("ROLLBACK");
136                    Err(e)
137                }
138            }
139        })
140    }
141
142    pub fn list_sessions(&self, cwd: Option<&str>) -> Result<Vec<SessionMeta>> {
143        self.with_lock(|conn| {
144            init_schema(conn)?;
145
146            let (sql, params): (&str, Vec<Value>) = cwd.map_or_else(
147                || {
148                    (
149                        "SELECT path,id,cwd,timestamp,message_count,last_modified_ms,size_bytes,name
150                         FROM sessions ORDER BY last_modified_ms DESC",
151                        vec![],
152                    )
153                },
154                |cwd| {
155                    (
156                        "SELECT path,id,cwd,timestamp,message_count,last_modified_ms,size_bytes,name
157                         FROM sessions WHERE cwd=?1 ORDER BY last_modified_ms DESC",
158                        vec![Value::Text(cwd.to_string())],
159                    )
160                },
161            );
162
163            let rows = conn
164                .query_sync(sql, &params)
165                .map_err(|e| Error::session(format!("Query failed: {e}")))?;
166
167            let mut result = Vec::new();
168            for row in rows {
169                result.push(row_to_meta(&row)?);
170            }
171            Ok(result)
172        })
173    }
174
175    pub fn delete_session_path(&self, path: &Path) -> Result<()> {
176        let path = path.to_string_lossy().to_string();
177        self.with_lock(|conn| {
178            init_schema(conn)?;
179
180            conn.execute_raw("BEGIN IMMEDIATE")
181                .map_err(|e| Error::session(format!("BEGIN failed: {e}")))?;
182
183            let result = (|| -> Result<()> {
184                conn.execute_sync("DELETE FROM sessions WHERE path=?1", &[Value::Text(path)])
185                    .map_err(|e| Error::session(format!("Delete failed: {e}")))?;
186
187                conn.execute_sync(
188                    "INSERT INTO meta (key,value) VALUES ('last_sync_epoch_ms', ?1)
189                     ON CONFLICT(key) DO UPDATE SET value=excluded.value",
190                    &[Value::Text(current_epoch_ms())],
191                )
192                .map_err(|e| Error::session(format!("Meta update failed: {e}")))?;
193                Ok(())
194            })();
195
196            match result {
197                Ok(()) => {
198                    conn.execute_raw("COMMIT")
199                        .map_err(|e| Error::session(format!("COMMIT failed: {e}")))?;
200                    Ok(())
201                }
202                Err(e) => {
203                    let _ = conn.execute_raw("ROLLBACK");
204                    Err(e)
205                }
206            }
207        })
208    }
209
210    pub fn reindex_all(&self) -> Result<()> {
211        let sessions_root = self.sessions_root();
212        if !sessions_root.exists() {
213            return Ok(());
214        }
215
216        let mut metas = Vec::new();
217        for entry in walk_sessions(sessions_root) {
218            let Ok(path) = entry else { continue };
219            if let Ok(meta) = build_meta_from_file(&path) {
220                metas.push(meta);
221            }
222        }
223
224        self.with_lock(|conn| {
225            init_schema(conn)?;
226
227            conn.execute_raw("BEGIN IMMEDIATE")
228                .map_err(|e| Error::session(format!("BEGIN failed: {e}")))?;
229
230            let result = (|| -> Result<()> {
231                conn.execute_sync("DELETE FROM sessions", &[])
232                    .map_err(|e| Error::session(format!("Delete failed: {e}")))?;
233
234                for meta in metas {
235                    let message_count = sqlite_i64_from_u64("message_count", meta.message_count)?;
236                    let size_bytes = sqlite_i64_from_u64("size_bytes", meta.size_bytes)?;
237                    conn.execute_sync(
238                        "INSERT INTO sessions (path,id,cwd,timestamp,message_count,last_modified_ms,size_bytes,name)
239                         VALUES (?1,?2,?3,?4,?5,?6,?7,?8)",
240                        &[
241                            Value::Text(meta.path),
242                            Value::Text(meta.id),
243                            Value::Text(meta.cwd),
244                            Value::Text(meta.timestamp),
245                            Value::BigInt(message_count),
246                            Value::BigInt(meta.last_modified_ms),
247                            Value::BigInt(size_bytes),
248                            meta.name.map_or(Value::Null, Value::Text),
249                        ],
250                    ).map_err(|e| Error::session(format!("Insert failed: {e}")))?;
251                }
252
253                conn.execute_sync(
254                    "INSERT INTO meta (key,value) VALUES ('last_sync_epoch_ms', ?1)
255                     ON CONFLICT(key) DO UPDATE SET value=excluded.value",
256                    &[Value::Text(current_epoch_ms())],
257                ).map_err(|e| Error::session(format!("Meta update failed: {e}")))?;
258
259                Ok(())
260            })();
261
262            match result {
263                Ok(()) => {
264                    conn.execute_raw("COMMIT")
265                        .map_err(|e| Error::session(format!("COMMIT failed: {e}")))?;
266                    Ok(())
267                }
268                Err(e) => {
269                    let _ = conn.execute_raw("ROLLBACK");
270                    Err(e)
271                }
272            }
273        })
274    }
275
276    /// Check whether the on-disk index is stale enough to reindex.
277    pub fn should_reindex(&self, max_age: Duration) -> bool {
278        if !self.db_path.exists() {
279            return true;
280        }
281        // Prefer the persisted sync epoch over the main SQLite file mtime.
282        // In WAL mode, recent writes can live in the sidecar files while the
283        // base database timestamp stays old enough to look stale.
284        if let Ok(Some(last_sync_epoch_ms)) = self.with_lock(load_last_sync_epoch_ms) {
285            return epoch_ms_is_stale(last_sync_epoch_ms, max_age);
286        }
287        let Ok(meta) = fs::metadata(&self.db_path) else {
288            return true;
289        };
290        let Ok(modified) = meta.modified() else {
291            return true;
292        };
293        let age = SystemTime::now()
294            .duration_since(modified)
295            .unwrap_or_default();
296        age > max_age
297    }
298
299    /// Reindex the session database if the index is stale.
300    pub fn reindex_if_stale(&self, max_age: Duration) -> Result<bool> {
301        if !self.should_reindex(max_age) {
302            return Ok(false);
303        }
304        self.reindex_all()?;
305        Ok(true)
306    }
307
308    fn with_lock<T>(&self, f: impl FnOnce(&SqliteConnection) -> Result<T>) -> Result<T> {
309        if let Some(parent) = self.db_path.parent() {
310            fs::create_dir_all(parent)?;
311        }
312        let lock_file = File::options()
313            .read(true)
314            .write(true)
315            .create(true)
316            .truncate(false)
317            .open(&self.lock_path)?;
318        let _lock = lock_file_guard(&lock_file, Duration::from_secs(5))?;
319
320        let config = SqliteConfig::file(self.db_path.to_string_lossy())
321            .flags(OpenFlags::create_read_write())
322            .busy_timeout(5000);
323
324        let conn = SqliteConnection::open(&config)
325            .map_err(|e| Error::session(format!("SQLite open: {e}")))?;
326
327        // Set pragmas for performance
328        conn.execute_raw("PRAGMA journal_mode = WAL")
329            .map_err(|e| Error::session(format!("PRAGMA journal_mode: {e}")))?;
330        conn.execute_raw("PRAGMA synchronous = NORMAL")
331            .map_err(|e| Error::session(format!("PRAGMA synchronous: {e}")))?;
332        conn.execute_raw("PRAGMA wal_autocheckpoint = 1000")
333            .map_err(|e| Error::session(format!("PRAGMA wal_autocheckpoint: {e}")))?;
334        conn.execute_raw("PRAGMA foreign_keys = ON")
335            .map_err(|e| Error::session(format!("PRAGMA foreign_keys: {e}")))?;
336
337        f(&conn)
338    }
339
340    fn sessions_root(&self) -> &Path {
341        self.db_path.parent().unwrap_or_else(|| Path::new("."))
342    }
343}
344
345impl Default for SessionIndex {
346    fn default() -> Self {
347        Self::new()
348    }
349}
350
351/// Queue (currently immediate) index update for a persisted session snapshot.
352///
353/// Callers use this helper from save paths where index freshness is
354/// best-effort and must not fail the underlying session write.
355pub(crate) fn enqueue_session_index_snapshot_update(
356    sessions_root: &Path,
357    path: &Path,
358    header: &SessionHeader,
359    message_count: u64,
360    name: Option<String>,
361) {
362    let sessions_root = sessions_root.to_path_buf();
363    let path = path.to_path_buf();
364    let header = header.clone();
365
366    if let Err(err) = SessionIndex::for_sessions_root(&sessions_root).index_session_snapshot(
367        &path,
368        &header,
369        message_count,
370        name,
371    ) {
372        tracing::warn!(
373            sessions_root = %sessions_root.display(),
374            path = %path.display(),
375            error = %err,
376            "Failed to update session index snapshot"
377        );
378    }
379}
380
381fn init_schema(conn: &SqliteConnection) -> Result<()> {
382    conn.execute_raw(
383        "CREATE TABLE IF NOT EXISTS sessions (
384            path TEXT PRIMARY KEY,
385            id TEXT NOT NULL,
386            cwd TEXT NOT NULL,
387            timestamp TEXT NOT NULL,
388            message_count INTEGER NOT NULL,
389            last_modified_ms INTEGER NOT NULL,
390            size_bytes INTEGER NOT NULL,
391            name TEXT
392        )",
393    )
394    .map_err(|e| Error::session(format!("Create sessions table: {e}")))?;
395
396    conn.execute_raw(
397        "CREATE TABLE IF NOT EXISTS meta (
398            key TEXT PRIMARY KEY,
399            value TEXT NOT NULL
400        )",
401    )
402    .map_err(|e| Error::session(format!("Create meta table: {e}")))?;
403
404    Ok(())
405}
406
407fn sqlite_i64_from_u64(field: &str, value: u64) -> Result<i64> {
408    i64::try_from(value)
409        .map_err(|_| Error::session(format!("{field} exceeds SQLite INTEGER range: {value}")))
410}
411
412fn sqlite_u64_from_i64(field: &str, value: i64) -> Result<u64> {
413    u64::try_from(value).map_err(|_| {
414        Error::session(format!(
415            "{field} must be non-negative in session index: {value}"
416        ))
417    })
418}
419
420fn row_to_meta(row: &sqlmodel_core::Row) -> Result<SessionMeta> {
421    let message_count = row
422        .get_named::<i64>("message_count")
423        .map_err(|e| Error::session(format!("get message_count: {e}")))?;
424    let size_bytes = row
425        .get_named::<i64>("size_bytes")
426        .map_err(|e| Error::session(format!("get size_bytes: {e}")))?;
427
428    Ok(SessionMeta {
429        path: row
430            .get_named("path")
431            .map_err(|e| Error::session(format!("get path: {e}")))?,
432        id: row
433            .get_named("id")
434            .map_err(|e| Error::session(format!("get id: {e}")))?,
435        cwd: row
436            .get_named("cwd")
437            .map_err(|e| Error::session(format!("get cwd: {e}")))?,
438        timestamp: row
439            .get_named("timestamp")
440            .map_err(|e| Error::session(format!("get timestamp: {e}")))?,
441        message_count: sqlite_u64_from_i64("message_count", message_count)?,
442        last_modified_ms: row
443            .get_named("last_modified_ms")
444            .map_err(|e| Error::session(format!("get last_modified_ms: {e}")))?,
445        size_bytes: sqlite_u64_from_i64("size_bytes", size_bytes)?,
446        name: row
447            .get_named::<Option<String>>("name")
448            .map_err(|e| Error::session(format!("get name: {e}")))?,
449    })
450}
451
452fn build_meta(
453    path: &Path,
454    header: &SessionHeader,
455    entries: &[SessionEntry],
456) -> Result<SessionMeta> {
457    header
458        .validate()
459        .map_err(|reason| Error::session(format!("Invalid session header: {reason}")))?;
460    let (message_count, name) = session_stats(entries);
461    let (last_modified_ms, size_bytes) = session_file_stats(path)?;
462    Ok(SessionMeta {
463        path: path.display().to_string(),
464        id: header.id.clone(),
465        cwd: header.cwd.clone(),
466        timestamp: header.timestamp.clone(),
467        message_count,
468        last_modified_ms,
469        size_bytes,
470        name,
471    })
472}
473
474fn read_capped_utf8_line_with_limit<R: BufRead>(
475    reader: &mut R,
476    max_bytes: usize,
477) -> std::io::Result<Option<String>> {
478    let limit = u64::try_from(max_bytes)
479        .unwrap_or(u64::MAX.saturating_sub(2))
480        .saturating_add(2);
481    let mut bytes = Vec::new();
482    let bytes_read = reader.take(limit).read_until(b'\n', &mut bytes)?;
483    if bytes_read == 0 {
484        return Ok(None);
485    }
486
487    let content_len = bytes.strip_suffix(b"\n").map_or(bytes.len(), <[u8]>::len);
488    if content_len > max_bytes {
489        if !bytes.ends_with(b"\n") {
490            let mut discard = Vec::new();
491            loop {
492                discard.clear();
493                let discarded = reader.read_until(b'\n', &mut discard)?;
494                if discarded == 0 || discard.ends_with(b"\n") {
495                    break;
496                }
497            }
498        }
499        return Err(std::io::Error::new(
500            std::io::ErrorKind::InvalidData,
501            format!("JSONL line exceeds {max_bytes} bytes"),
502        ));
503    }
504
505    String::from_utf8(bytes)
506        .map(Some)
507        .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))
508}
509
510fn read_capped_utf8_line<R: BufRead>(reader: &mut R) -> std::io::Result<Option<String>> {
511    read_capped_utf8_line_with_limit(reader, MAX_JSONL_LINE_BYTES)
512}
513
514pub(crate) fn build_meta_from_file(path: &Path) -> Result<SessionMeta> {
515    match path.extension().and_then(|ext| ext.to_str()) {
516        Some("jsonl") => build_meta_from_jsonl(path),
517        #[cfg(feature = "sqlite-sessions")]
518        Some("sqlite") => build_meta_from_sqlite(path),
519        _ => build_meta_from_jsonl(path),
520    }
521}
522
523#[derive(Deserialize)]
524struct PartialEntry {
525    #[serde(default)]
526    r#type: String,
527    #[serde(default)]
528    name: Option<String>,
529}
530
531fn build_meta_from_jsonl(path: &Path) -> Result<SessionMeta> {
532    let file = File::open(path)
533        .map_err(|err| Error::session(format!("Read session file {}: {err}", path.display())))?;
534    let mut reader = BufReader::new(file);
535    let Some(header_line) = read_capped_utf8_line(&mut reader)
536        .map_err(|err| Error::session(format!("Read session header {}: {err}", path.display())))?
537    else {
538        return Err(Error::session(format!(
539            "Empty session file {}",
540            path.display()
541        )));
542    };
543
544    let header: SessionHeader = serde_json::from_str(&header_line)
545        .map_err(|err| Error::session(format!("Parse session header {}: {err}", path.display())))?;
546    header.validate().map_err(|reason| {
547        Error::session(format!(
548            "Invalid session header {}: {reason}",
549            path.display()
550        ))
551    })?;
552
553    let mut message_count = 0u64;
554    let mut name = None;
555    loop {
556        let Some(line_buf) = read_capped_utf8_line(&mut reader).map_err(|err| {
557            Error::session(format!("Read session entry line {}: {err}", path.display()))
558        })?
559        else {
560            break;
561        };
562
563        if let Ok(entry) = serde_json::from_str::<PartialEntry>(&line_buf) {
564            match entry.r#type.as_str() {
565                "message" => message_count += 1,
566                "session_info" if entry.name.is_some() => {
567                    name = entry.name;
568                }
569                _ => {}
570            }
571        }
572    }
573
574    let meta = fs::metadata(path)?;
575    let size_bytes = meta.len();
576    let modified = meta.modified().unwrap_or(SystemTime::UNIX_EPOCH);
577    let millis = modified
578        .duration_since(UNIX_EPOCH)
579        .unwrap_or_default()
580        .as_millis();
581    let last_modified_ms = i64::try_from(millis).unwrap_or(i64::MAX);
582
583    Ok(SessionMeta {
584        path: path.display().to_string(),
585        id: header.id,
586        cwd: header.cwd,
587        timestamp: header.timestamp,
588        message_count,
589        last_modified_ms,
590        size_bytes,
591        name,
592    })
593}
594
595#[cfg(feature = "sqlite-sessions")]
596fn build_meta_from_sqlite(path: &Path) -> Result<SessionMeta> {
597    let meta = futures::executor::block_on(crate::session_sqlite::load_session_meta(path))?;
598    let header = meta.header;
599    header.validate().map_err(|reason| {
600        Error::session(format!(
601            "Invalid session header {}: {reason}",
602            path.display()
603        ))
604    })?;
605    let (last_modified_ms, size_bytes) = session_file_stats(path)?;
606
607    Ok(SessionMeta {
608        path: path.display().to_string(),
609        id: header.id,
610        cwd: header.cwd,
611        timestamp: header.timestamp,
612        message_count: meta.message_count,
613        last_modified_ms,
614        size_bytes,
615        name: meta.name,
616    })
617}
618
619fn session_stats<T>(entries: &[T]) -> (u64, Option<String>)
620where
621    T: Borrow<SessionEntry>,
622{
623    let mut message_count = 0u64;
624    let mut name = None;
625    for entry in entries {
626        match entry.borrow() {
627            SessionEntry::Message(_) => message_count += 1,
628            SessionEntry::SessionInfo(info) if info.name.is_some() => {
629                name.clone_from(&info.name);
630            }
631            _ => {}
632        }
633    }
634    (message_count, name)
635}
636
637#[cfg(feature = "sqlite-sessions")]
638fn sqlite_auxiliary_paths(path: &Path) -> [PathBuf; 2] {
639    ["-wal", "-shm"].map(|suffix| {
640        let mut candidate = path.as_os_str().to_os_string();
641        candidate.push(suffix);
642        PathBuf::from(candidate)
643    })
644}
645
646pub(crate) fn session_file_stats(path: &Path) -> Result<(i64, u64)> {
647    let meta = fs::metadata(path)?;
648    #[cfg(feature = "sqlite-sessions")]
649    let (size, modified) = {
650        let mut size = meta.len();
651        let mut modified = meta.modified().unwrap_or(SystemTime::UNIX_EPOCH);
652
653        if path.extension().and_then(|ext| ext.to_str()) == Some("sqlite") {
654            for auxiliary_path in sqlite_auxiliary_paths(path) {
655                let Ok(aux_meta) = fs::metadata(&auxiliary_path) else {
656                    continue;
657                };
658                size = size.saturating_add(aux_meta.len());
659                let aux_modified = aux_meta.modified().unwrap_or(SystemTime::UNIX_EPOCH);
660                if aux_modified > modified {
661                    modified = aux_modified;
662                }
663            }
664        }
665
666        (size, modified)
667    };
668
669    #[cfg(not(feature = "sqlite-sessions"))]
670    let (size, modified) = (
671        meta.len(),
672        meta.modified().unwrap_or(SystemTime::UNIX_EPOCH),
673    );
674
675    let millis = modified
676        .duration_since(UNIX_EPOCH)
677        .unwrap_or_default()
678        .as_millis();
679    let ms = i64::try_from(millis).unwrap_or(i64::MAX);
680    Ok((ms, size))
681}
682
683pub(crate) fn is_session_file_path(path: &Path) -> bool {
684    if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
685        if name.starts_with("session-index.") {
686            return false;
687        }
688    }
689    match path.extension().and_then(|ext| ext.to_str()) {
690        Some("jsonl") => true,
691        #[cfg(feature = "sqlite-sessions")]
692        Some("sqlite") => true,
693        _ => false,
694    }
695}
696
697pub(crate) fn walk_sessions(root: &Path) -> Vec<std::io::Result<PathBuf>> {
698    let mut out = Vec::new();
699    let mut stack = vec![root.to_path_buf()];
700
701    while let Some(dir) = stack.pop() {
702        if let Ok(entries) = fs::read_dir(&dir) {
703            for entry in entries.flatten() {
704                let path = entry.path();
705                let Ok(file_type) = entry.file_type() else {
706                    continue;
707                };
708
709                if file_type.is_dir() {
710                    stack.push(path);
711                } else if file_type.is_symlink() {
712                    // Allow symlinks to files, but skip symlinked directories to avoid cycles
713                    if let Ok(meta) = fs::metadata(&path) {
714                        if meta.is_file() && is_session_file_path(&path) {
715                            out.push(Ok(path));
716                        }
717                    }
718                } else if is_session_file_path(&path) {
719                    out.push(Ok(path));
720                }
721            }
722        }
723    }
724    out
725}
726
727fn current_epoch_ms() -> String {
728    chrono::Utc::now().timestamp_millis().to_string()
729}
730
731fn current_epoch_ms_i64() -> i64 {
732    let millis = SystemTime::now()
733        .duration_since(UNIX_EPOCH)
734        .unwrap_or_default()
735        .as_millis();
736    i64::try_from(millis).unwrap_or(i64::MAX)
737}
738
739fn epoch_ms_is_stale(epoch_ms: i64, max_age: Duration) -> bool {
740    let age_ms = current_epoch_ms_i64().saturating_sub(epoch_ms);
741    u128::try_from(age_ms).unwrap_or(u128::MAX) > max_age.as_millis()
742}
743
744fn load_last_sync_epoch_ms(conn: &SqliteConnection) -> Result<Option<i64>> {
745    let rows = conn
746        .query_sync(
747            "SELECT value FROM meta WHERE key='last_sync_epoch_ms' LIMIT 1",
748            &[],
749        )
750        .map_err(|err| Error::session(format!("Query meta failed: {err}")))?;
751    let Some(row) = rows.into_iter().next() else {
752        return Ok(None);
753    };
754    let value = row
755        .get_named::<String>("value")
756        .map_err(|err| Error::session(format!("get meta value: {err}")))?;
757    Ok(value.parse::<i64>().ok())
758}
759
760fn lock_file_guard(file: &File, timeout: Duration) -> Result<LockGuard<'_>> {
761    let start = Instant::now();
762    loop {
763        match FileExt::try_lock_exclusive(file) {
764            Ok(true) => return Ok(LockGuard { file }),
765            Ok(false) => {}
766            Err(err) => {
767                return Err(Error::session(format!(
768                    "Failed to acquire session index lock: {err}"
769                )));
770            }
771        }
772
773        if start.elapsed() >= timeout {
774            return Err(Error::session(
775                "Timed out waiting for session index lock".to_string(),
776            ));
777        }
778
779        std::thread::sleep(Duration::from_millis(50));
780    }
781}
782
783#[derive(Debug)]
784struct LockGuard<'a> {
785    file: &'a File,
786}
787
788impl Drop for LockGuard<'_> {
789    fn drop(&mut self) {
790        let _ = FileExt::unlock(self.file);
791    }
792}
793
794#[cfg(test)]
795#[path = "../tests/common/mod.rs"]
796mod test_common;
797
798#[cfg(test)]
799mod tests {
800    use super::*;
801
802    use super::test_common::TestHarness;
803    use crate::model::UserContent;
804    use crate::session::{EntryBase, MessageEntry, SessionInfoEntry, SessionMessage};
805    use pretty_assertions::assert_eq;
806    use proptest::prelude::*;
807    use proptest::string::string_regex;
808    use std::collections::HashMap;
809    use std::fs;
810    #[cfg(unix)]
811    use std::process::Command;
812    use std::time::Duration;
813
814    fn write_session_jsonl(path: &Path, header: &SessionHeader, entries: &[SessionEntry]) {
815        let mut jsonl = String::new();
816        jsonl.push_str(&serde_json::to_string(header).expect("serialize session header"));
817        jsonl.push('\n');
818        for entry in entries {
819            jsonl.push_str(&serde_json::to_string(entry).expect("serialize session entry"));
820            jsonl.push('\n');
821        }
822        fs::write(path, jsonl).expect("write session jsonl");
823    }
824
825    fn make_header(id: &str, cwd: &str) -> SessionHeader {
826        let mut header = SessionHeader::new();
827        header.id = id.to_string();
828        header.cwd = cwd.to_string();
829        header
830    }
831
832    fn make_user_entry(parent_id: Option<String>, id: &str, text: &str) -> SessionEntry {
833        SessionEntry::Message(MessageEntry {
834            base: EntryBase::new(parent_id, id.to_string()),
835            message: SessionMessage::User {
836                content: UserContent::Text(text.to_string()),
837                timestamp: Some(chrono::Utc::now().timestamp_millis()),
838            },
839        })
840    }
841
842    fn make_session_info_entry(
843        parent_id: Option<String>,
844        id: &str,
845        name: Option<&str>,
846    ) -> SessionEntry {
847        SessionEntry::SessionInfo(SessionInfoEntry {
848            base: EntryBase::new(parent_id, id.to_string()),
849            name: name.map(ToString::to_string),
850        })
851    }
852
853    fn read_meta_last_sync_epoch_ms(index: &SessionIndex) -> String {
854        index
855            .with_lock(|conn| {
856                init_schema(conn)?;
857                let rows = conn
858                    .query_sync(
859                        "SELECT value FROM meta WHERE key='last_sync_epoch_ms' LIMIT 1",
860                        &[],
861                    )
862                    .map_err(|err| Error::session(format!("Query meta failed: {err}")))?;
863                let row = rows
864                    .into_iter()
865                    .next()
866                    .ok_or_else(|| Error::session("Missing meta row".to_string()))?;
867                row.get_named::<String>("value")
868                    .map_err(|err| Error::session(format!("get meta value: {err}")))
869            })
870            .expect("read meta.last_sync_epoch_ms")
871    }
872
873    #[derive(Debug, Clone)]
874    struct ArbitraryMetaRow {
875        id: String,
876        cwd: String,
877        timestamp: String,
878        message_count: i64,
879        last_modified_ms: i64,
880        size_bytes: i64,
881        name: Option<String>,
882    }
883
884    fn ident_strategy() -> impl Strategy<Value = String> {
885        string_regex("[a-z0-9_-]{1,16}").expect("valid identifier regex")
886    }
887
888    fn cwd_strategy() -> impl Strategy<Value = String> {
889        prop_oneof![
890            Just("cwd-a".to_string()),
891            Just("cwd-b".to_string()),
892            string_regex("[a-z0-9_./-]{1,20}").expect("valid cwd regex"),
893        ]
894    }
895
896    fn timestamp_strategy() -> impl Strategy<Value = String> {
897        string_regex("[0-9TZ:.-]{10,32}").expect("valid timestamp regex")
898    }
899
900    fn optional_name_strategy() -> impl Strategy<Value = Option<String>> {
901        prop::option::of(string_regex("[A-Za-z0-9 _.:-]{0,32}").expect("valid name regex"))
902    }
903
904    fn arbitrary_meta_row_strategy() -> impl Strategy<Value = ArbitraryMetaRow> {
905        (
906            ident_strategy(),
907            cwd_strategy(),
908            timestamp_strategy(),
909            any::<i64>(),
910            any::<i64>(),
911            any::<i64>(),
912            optional_name_strategy(),
913        )
914            .prop_map(
915                |(id, cwd, timestamp, message_count, last_modified_ms, size_bytes, name)| {
916                    ArbitraryMetaRow {
917                        id,
918                        cwd,
919                        timestamp,
920                        message_count,
921                        last_modified_ms,
922                        size_bytes,
923                        name,
924                    }
925                },
926            )
927    }
928
929    #[test]
930    fn index_session_on_in_memory_session_is_noop() {
931        let harness = TestHarness::new("index_session_on_in_memory_session_is_noop");
932        let root = harness.temp_path("sessions");
933        fs::create_dir_all(&root).expect("create root dir");
934        let index = SessionIndex::for_sessions_root(&root);
935        let session = Session::in_memory();
936
937        index
938            .index_session(&session)
939            .expect("index in-memory session");
940
941        harness
942            .log()
943            .info_ctx("verify", "No index files created", |ctx| {
944                ctx.push(("db_path".into(), index.db_path.display().to_string()));
945                ctx.push(("lock_path".into(), index.lock_path.display().to_string()));
946            });
947        assert!(!index.db_path.exists());
948        assert!(!index.lock_path.exists());
949    }
950
951    #[test]
952    fn index_session_inserts_row_and_updates_meta() {
953        let harness = TestHarness::new("index_session_inserts_row_and_updates_meta");
954        let root = harness.temp_path("sessions");
955        fs::create_dir_all(&root).expect("create root dir");
956        let index = SessionIndex::for_sessions_root(&root);
957
958        let session_path = harness.temp_path("sessions/project/a.jsonl");
959        fs::create_dir_all(session_path.parent().expect("parent")).expect("create session dir");
960        fs::write(&session_path, "hello").expect("write session file");
961
962        let mut session = Session::in_memory();
963        session.header = make_header("id-a", "cwd-a");
964        session.path = Some(session_path.clone());
965        session.entries.push(make_user_entry(None, "m1", "hi"));
966
967        index.index_session(&session).expect("index session");
968
969        let sessions = index.list_sessions(Some("cwd-a")).expect("list sessions");
970        assert_eq!(sessions.len(), 1);
971        assert_eq!(sessions[0].id, "id-a");
972        assert_eq!(sessions[0].cwd, "cwd-a");
973        assert_eq!(sessions[0].message_count, 1);
974        assert_eq!(sessions[0].path, session_path.display().to_string());
975
976        let meta_value = read_meta_last_sync_epoch_ms(&index);
977        harness
978            .log()
979            .info_ctx("verify", "meta.last_sync_epoch_ms present", |ctx| {
980                ctx.push(("value".into(), meta_value.clone()));
981            });
982        assert!(
983            meta_value.parse::<i64>().is_ok(),
984            "Expected meta value to be an integer epoch ms"
985        );
986    }
987
988    #[test]
989    fn index_session_updates_existing_row() {
990        let harness = TestHarness::new("index_session_updates_existing_row");
991        let root = harness.temp_path("sessions");
992        fs::create_dir_all(&root).expect("create root dir");
993        let index = SessionIndex::for_sessions_root(&root);
994
995        let session_path = harness.temp_path("sessions/project/update.jsonl");
996        fs::create_dir_all(session_path.parent().expect("parent")).expect("create session dir");
997        fs::write(&session_path, "first").expect("write session file");
998
999        let mut session = Session::in_memory();
1000        session.header = make_header("id-update", "cwd-update");
1001        session.path = Some(session_path.clone());
1002        session.entries.push(make_user_entry(None, "m1", "hi"));
1003
1004        index
1005            .index_session(&session)
1006            .expect("index session first time");
1007        let first_meta = index
1008            .list_sessions(Some("cwd-update"))
1009            .expect("list sessions")[0]
1010            .clone();
1011        let first_sync = read_meta_last_sync_epoch_ms(&index);
1012
1013        std::thread::sleep(Duration::from_millis(10));
1014        fs::write(&session_path, "second-longer").expect("rewrite session file");
1015        session
1016            .entries
1017            .push(make_user_entry(Some("m1".to_string()), "m2", "again"));
1018
1019        index
1020            .index_session(&session)
1021            .expect("index session second time");
1022        let second_meta = index
1023            .list_sessions(Some("cwd-update"))
1024            .expect("list sessions")[0]
1025            .clone();
1026        let second_sync = read_meta_last_sync_epoch_ms(&index);
1027
1028        harness.log().info_ctx("verify", "row updated", |ctx| {
1029            ctx.push((
1030                "first_message_count".into(),
1031                first_meta.message_count.to_string(),
1032            ));
1033            ctx.push((
1034                "second_message_count".into(),
1035                second_meta.message_count.to_string(),
1036            ));
1037            ctx.push(("first_size".into(), first_meta.size_bytes.to_string()));
1038            ctx.push(("second_size".into(), second_meta.size_bytes.to_string()));
1039            ctx.push(("first_sync".into(), first_sync.clone()));
1040            ctx.push(("second_sync".into(), second_sync.clone()));
1041        });
1042
1043        assert_eq!(second_meta.message_count, 2);
1044        assert!(second_meta.size_bytes >= first_meta.size_bytes);
1045        assert!(second_meta.last_modified_ms >= first_meta.last_modified_ms);
1046        assert!(second_sync.parse::<i64>().unwrap_or(0) >= first_sync.parse::<i64>().unwrap_or(0));
1047    }
1048
1049    #[test]
1050    fn list_sessions_orders_by_last_modified_desc() {
1051        let harness = TestHarness::new("list_sessions_orders_by_last_modified_desc");
1052        let root = harness.temp_path("sessions");
1053        fs::create_dir_all(&root).expect("create root dir");
1054        let index = SessionIndex::for_sessions_root(&root);
1055
1056        let path_a = harness.temp_path("sessions/project/a.jsonl");
1057        fs::create_dir_all(path_a.parent().expect("parent")).expect("create dirs");
1058        fs::write(&path_a, "a").expect("write file a");
1059
1060        let mut session_a = Session::in_memory();
1061        session_a.header = make_header("id-a", "cwd-a");
1062        session_a.path = Some(path_a);
1063        session_a.entries.push(make_user_entry(None, "m1", "a"));
1064        index.index_session(&session_a).expect("index a");
1065
1066        std::thread::sleep(Duration::from_millis(10));
1067
1068        let path_b = harness.temp_path("sessions/project/b.jsonl");
1069        fs::create_dir_all(path_b.parent().expect("parent")).expect("create dirs");
1070        fs::write(&path_b, "bbbbb").expect("write file b");
1071
1072        let mut session_b = Session::in_memory();
1073        session_b.header = make_header("id-b", "cwd-b");
1074        session_b.path = Some(path_b);
1075        session_b.entries.push(make_user_entry(None, "m1", "b"));
1076        index.index_session(&session_b).expect("index b");
1077
1078        let sessions = index.list_sessions(None).expect("list sessions");
1079        harness
1080            .log()
1081            .info("verify", format!("listed {} sessions", sessions.len()));
1082        assert!(sessions.len() >= 2);
1083        assert_eq!(sessions[0].id, "id-b");
1084        assert_eq!(sessions[1].id, "id-a");
1085        assert!(sessions[0].last_modified_ms >= sessions[1].last_modified_ms);
1086    }
1087
1088    #[test]
1089    fn list_sessions_filters_by_cwd() {
1090        let harness = TestHarness::new("list_sessions_filters_by_cwd");
1091        let root = harness.temp_path("sessions");
1092        fs::create_dir_all(&root).expect("create root dir");
1093        let index = SessionIndex::for_sessions_root(&root);
1094
1095        for (id, cwd) in [("id-a", "cwd-a"), ("id-b", "cwd-b")] {
1096            let path = harness.temp_path(format!("sessions/project/{id}.jsonl"));
1097            fs::create_dir_all(path.parent().expect("parent")).expect("create dirs");
1098            fs::write(&path, id).expect("write session file");
1099
1100            let mut session = Session::in_memory();
1101            session.header = make_header(id, cwd);
1102            session.path = Some(path);
1103            session.entries.push(make_user_entry(None, "m1", id));
1104            index.index_session(&session).expect("index session");
1105        }
1106
1107        let only_a = index
1108            .list_sessions(Some("cwd-a"))
1109            .expect("list sessions cwd-a");
1110        assert_eq!(only_a.len(), 1);
1111        assert_eq!(only_a[0].id, "id-a");
1112    }
1113
1114    #[test]
1115    fn reindex_all_is_noop_when_sessions_root_missing() {
1116        let harness = TestHarness::new("reindex_all_is_noop_when_sessions_root_missing");
1117        let missing_root = harness.temp_path("does-not-exist");
1118        let index = SessionIndex::for_sessions_root(&missing_root);
1119
1120        index.reindex_all().expect("reindex_all");
1121        assert!(!index.db_path.exists());
1122        assert!(!index.lock_path.exists());
1123    }
1124
1125    #[test]
1126    fn reindex_all_rebuilds_index_from_disk() {
1127        let harness = TestHarness::new("reindex_all_rebuilds_index_from_disk");
1128        let root = harness.temp_path("sessions");
1129        fs::create_dir_all(&root).expect("create root dir");
1130        let index = SessionIndex::for_sessions_root(&root);
1131
1132        let path = harness.temp_path("sessions/project/reindex.jsonl");
1133        fs::create_dir_all(path.parent().expect("parent")).expect("create dirs");
1134
1135        let header = make_header("id-reindex", "cwd-reindex");
1136        let entries = vec![
1137            make_user_entry(None, "m1", "hello"),
1138            make_session_info_entry(Some("m1".to_string()), "info1", Some("My Session")),
1139            make_user_entry(Some("info1".to_string()), "m2", "world"),
1140        ];
1141        write_session_jsonl(&path, &header, &entries);
1142
1143        index.reindex_all().expect("reindex_all");
1144
1145        let sessions = index
1146            .list_sessions(Some("cwd-reindex"))
1147            .expect("list sessions");
1148        assert_eq!(sessions.len(), 1);
1149        assert_eq!(sessions[0].id, "id-reindex");
1150        assert_eq!(sessions[0].message_count, 2);
1151        assert_eq!(sessions[0].name.as_deref(), Some("My Session"));
1152
1153        let meta_value = read_meta_last_sync_epoch_ms(&index);
1154        harness.log().info_ctx("verify", "meta updated", |ctx| {
1155            ctx.push(("value".into(), meta_value.clone()));
1156        });
1157        assert!(meta_value.parse::<i64>().unwrap_or(0) > 0);
1158    }
1159
1160    #[test]
1161    fn reindex_all_skips_invalid_jsonl_files() {
1162        let harness = TestHarness::new("reindex_all_skips_invalid_jsonl_files");
1163        let root = harness.temp_path("sessions");
1164        fs::create_dir_all(&root).expect("create root dir");
1165        let index = SessionIndex::for_sessions_root(&root);
1166
1167        let good = harness.temp_path("sessions/project/good.jsonl");
1168        fs::create_dir_all(good.parent().expect("parent")).expect("create dirs");
1169        let header = make_header("id-good", "cwd-good");
1170        let entries = vec![make_user_entry(None, "m1", "ok")];
1171        write_session_jsonl(&good, &header, &entries);
1172
1173        let bad = harness.temp_path("sessions/project/bad.jsonl");
1174        fs::write(&bad, "not-json\n{").expect("write bad jsonl");
1175
1176        index.reindex_all().expect("reindex_all should succeed");
1177        let sessions = index.list_sessions(None).expect("list sessions");
1178        assert_eq!(sessions.len(), 1);
1179        assert_eq!(sessions[0].id, "id-good");
1180    }
1181
1182    #[test]
1183    fn build_meta_from_file_returns_session_error_on_invalid_header() {
1184        let harness =
1185            TestHarness::new("build_meta_from_file_returns_session_error_on_invalid_header");
1186        let path = harness.temp_path("bad_header.jsonl");
1187        fs::write(&path, "not json\n").expect("write bad header");
1188
1189        let err = build_meta_from_file(&path).expect_err("expected error");
1190        harness.log().info("verify", format!("error: {err}"));
1191
1192        assert!(
1193            matches!(err, Error::Session(ref msg) if msg.contains("Parse session header")),
1194            "Expected Error::Session containing Parse session header, got {err:?}",
1195        );
1196    }
1197
1198    #[test]
1199    fn build_meta_from_file_rejects_semantically_invalid_header() {
1200        let harness = TestHarness::new("build_meta_from_file_rejects_semantically_invalid_header");
1201        let path = harness.temp_path("bad_semantic_header.jsonl");
1202        let header = SessionHeader {
1203            r#type: "note".to_string(),
1204            id: "bad-id".to_string(),
1205            cwd: "/tmp".to_string(),
1206            timestamp: "2026-01-01T00:00:00.000Z".to_string(),
1207            ..SessionHeader::default()
1208        };
1209        write_session_jsonl(&path, &header, &[]);
1210
1211        let err = build_meta_from_file(&path).expect_err("expected invalid header error");
1212        harness.log().info("verify", format!("error: {err}"));
1213
1214        assert!(
1215            matches!(err, Error::Session(ref msg) if msg.contains("Invalid session header")),
1216            "Expected Error::Session containing Invalid session header, got {err:?}",
1217        );
1218    }
1219
1220    #[test]
1221    fn build_meta_from_file_returns_session_error_on_empty_file() {
1222        let harness = TestHarness::new("build_meta_from_file_returns_session_error_on_empty_file");
1223        let path = harness.temp_path("empty.jsonl");
1224        fs::write(&path, "").expect("write empty");
1225
1226        let err = build_meta_from_file(&path).expect_err("expected error");
1227        if let Error::Session(msg) = &err {
1228            harness.log().info("verify", msg.clone());
1229        }
1230        assert!(
1231            matches!(err, Error::Session(ref msg) if msg.contains("Empty session file")),
1232            "Expected Error::Session containing Empty session file, got {err:?}",
1233        );
1234    }
1235
1236    #[test]
1237    fn list_sessions_returns_session_error_when_db_path_is_directory() {
1238        let harness =
1239            TestHarness::new("list_sessions_returns_session_error_when_db_path_is_directory");
1240        let root = harness.temp_path("sessions");
1241        fs::create_dir_all(&root).expect("create root dir");
1242
1243        let db_dir = root.join("session-index.sqlite");
1244        fs::create_dir_all(&db_dir).expect("create db dir to force sqlite open failure");
1245
1246        let index = SessionIndex::for_sessions_root(&root);
1247        let err = index.list_sessions(None).expect_err("expected error");
1248        if let Error::Session(msg) = &err {
1249            harness.log().info("verify", msg.clone());
1250        }
1251        assert!(
1252            matches!(err, Error::Session(ref msg) if msg.contains("SQLite open")),
1253            "Expected Error::Session containing SQLite open, got {err:?}",
1254        );
1255    }
1256
1257    #[test]
1258    fn lock_file_guard_prevents_concurrent_access() {
1259        let harness = TestHarness::new("lock_file_guard_prevents_concurrent_access");
1260        let path = harness.temp_path("lockfile.lock");
1261        fs::write(&path, "").expect("create lock file");
1262
1263        let file1 = File::options()
1264            .read(true)
1265            .write(true)
1266            .open(&path)
1267            .expect("open file1");
1268        let file2 = File::options()
1269            .read(true)
1270            .write(true)
1271            .open(&path)
1272            .expect("open file2");
1273
1274        let guard1 = lock_file_guard(&file1, Duration::from_millis(50)).expect("acquire lock");
1275        let err =
1276            lock_file_guard(&file2, Duration::from_millis(50)).expect_err("expected lock timeout");
1277        drop(guard1);
1278
1279        assert!(
1280            matches!(err, Error::Session(ref msg) if msg.contains("Timed out")),
1281            "Expected Error::Session containing Timed out, got {err:?}",
1282        );
1283
1284        let _guard2 =
1285            lock_file_guard(&file2, Duration::from_millis(50)).expect("lock after release");
1286    }
1287
1288    #[test]
1289    fn should_reindex_returns_true_when_db_missing() {
1290        let harness = TestHarness::new("should_reindex_returns_true_when_db_missing");
1291        let root = harness.temp_path("sessions");
1292        fs::create_dir_all(&root).expect("create root dir");
1293        let index = SessionIndex::for_sessions_root(&root);
1294
1295        assert!(index.should_reindex(Duration::from_secs(60)));
1296    }
1297
1298    // ── session_stats ────────────────────────────────────────────────
1299
1300    #[test]
1301    fn session_stats_empty_entries() {
1302        let (count, name) = session_stats::<SessionEntry>(&[]);
1303        assert_eq!(count, 0);
1304        assert!(name.is_none());
1305    }
1306
1307    #[test]
1308    fn session_stats_counts_messages_only() {
1309        let entries = vec![
1310            make_user_entry(None, "m1", "hello"),
1311            make_session_info_entry(Some("m1".to_string()), "info1", None),
1312            make_user_entry(Some("info1".to_string()), "m2", "world"),
1313        ];
1314        let (count, name) = session_stats(&entries);
1315        assert_eq!(count, 2);
1316        assert!(name.is_none());
1317    }
1318
1319    #[test]
1320    fn session_stats_extracts_last_name() {
1321        let entries = vec![
1322            make_session_info_entry(None, "info1", Some("First Name")),
1323            make_user_entry(Some("info1".to_string()), "m1", "msg"),
1324            make_session_info_entry(Some("m1".to_string()), "info2", Some("Final Name")),
1325        ];
1326        let (count, name) = session_stats(&entries);
1327        assert_eq!(count, 1);
1328        assert_eq!(name.as_deref(), Some("Final Name"));
1329    }
1330
1331    #[test]
1332    fn session_stats_name_not_overwritten_by_none() {
1333        let entries = vec![
1334            make_session_info_entry(None, "info1", Some("My Session")),
1335            make_session_info_entry(Some("info1".to_string()), "info2", None),
1336        ];
1337        let (_, name) = session_stats(&entries);
1338        // None doesn't overwrite previous name because of `if info.name.is_some()`
1339        assert_eq!(name.as_deref(), Some("My Session"));
1340    }
1341
1342    // ── file_stats ──────────────────────────────────────────────────
1343
1344    #[test]
1345    fn file_stats_returns_size_and_mtime() {
1346        let harness = TestHarness::new("file_stats_returns_size_and_mtime");
1347        let path = harness.temp_path("test_file.txt");
1348        fs::write(&path, "hello world").expect("write");
1349
1350        let (last_modified_ms, size_bytes) = session_file_stats(&path).expect("file_stats");
1351        assert_eq!(size_bytes, 11); // "hello world" = 11 bytes
1352        assert!(last_modified_ms > 0, "Expected positive modification time");
1353    }
1354
1355    #[cfg(feature = "sqlite-sessions")]
1356    #[test]
1357    fn file_stats_sqlite_includes_wal_and_shm_sizes() {
1358        let harness = TestHarness::new("file_stats_sqlite_includes_wal_and_shm_sizes");
1359        let path = harness.temp_path("test_session.sqlite");
1360        let [wal_path, shm_path] = sqlite_auxiliary_paths(&path);
1361
1362        fs::write(&path, b"db").expect("write sqlite db");
1363        fs::write(&wal_path, b"walpayload").expect("write sqlite wal");
1364        fs::write(&shm_path, b"shm!").expect("write sqlite shm");
1365
1366        let (_, size_bytes) = session_file_stats(&path).expect("file_stats");
1367        assert_eq!(size_bytes, 2 + 10 + 4);
1368    }
1369
1370    #[cfg(feature = "sqlite-sessions")]
1371    #[test]
1372    fn index_session_snapshot_uses_newest_sqlite_sidecar_mtime_and_size() {
1373        let harness =
1374            TestHarness::new("index_session_snapshot_uses_newest_sqlite_sidecar_mtime_and_size");
1375        let root = harness.temp_path("sessions");
1376        let project_dir = root.join("project");
1377        fs::create_dir_all(&project_dir).expect("create project dir");
1378
1379        let path = project_dir.join("test.sqlite");
1380        let [wal_path, _shm_path] = sqlite_auxiliary_paths(&path);
1381        fs::write(&path, b"db").expect("write sqlite db");
1382
1383        let base_millis = fs::metadata(&path)
1384            .expect("base metadata")
1385            .modified()
1386            .expect("base modified")
1387            .duration_since(UNIX_EPOCH)
1388            .expect("base since epoch")
1389            .as_millis();
1390        std::thread::sleep(Duration::from_millis(1_100));
1391        fs::write(&wal_path, b"walpayload").expect("write sqlite wal");
1392        let wal_millis = fs::metadata(&wal_path)
1393            .expect("wal metadata")
1394            .modified()
1395            .expect("wal modified")
1396            .duration_since(UNIX_EPOCH)
1397            .expect("wal since epoch")
1398            .as_millis();
1399
1400        assert!(
1401            wal_millis > base_millis,
1402            "test requires WAL sidecar mtime to be newer than base db mtime"
1403        );
1404
1405        let index = SessionIndex::for_sessions_root(&root);
1406        let header = make_header("sqlite-id", "sqlite-cwd");
1407        index
1408            .index_session_snapshot(&path, &header, 3, Some("sqlite session".to_string()))
1409            .expect("index sqlite snapshot");
1410
1411        let listed = index
1412            .list_sessions(Some("sqlite-cwd"))
1413            .expect("list sqlite session");
1414        assert_eq!(listed.len(), 1);
1415        assert_eq!(listed[0].size_bytes, 2 + 10);
1416        assert_eq!(
1417            listed[0].last_modified_ms,
1418            i64::try_from(wal_millis).expect("wal mtime fits in i64")
1419        );
1420    }
1421
1422    #[test]
1423    fn enqueue_session_index_snapshot_update_persists_row_immediately() {
1424        let harness =
1425            TestHarness::new("enqueue_session_index_snapshot_update_persists_row_immediately");
1426        let root = harness.temp_path("sessions");
1427        let project_dir = root.join("project");
1428        fs::create_dir_all(&project_dir).expect("create project dir");
1429
1430        let path = project_dir.join("session.jsonl");
1431        fs::write(&path, b"{\"type\":\"header\"}\n").expect("write session file");
1432
1433        let header = make_header("queued-id", "queued-cwd");
1434        enqueue_session_index_snapshot_update(
1435            &root,
1436            &path,
1437            &header,
1438            3,
1439            Some("Queued Session".to_string()),
1440        );
1441
1442        let index = SessionIndex::for_sessions_root(&root);
1443        let listed = index
1444            .list_sessions(Some("queued-cwd"))
1445            .expect("list queued snapshot rows");
1446        assert_eq!(listed.len(), 1);
1447        assert_eq!(listed[0].id, "queued-id");
1448        assert_eq!(listed[0].path, path.display().to_string());
1449        assert_eq!(listed[0].message_count, 3);
1450        assert_eq!(listed[0].name.as_deref(), Some("Queued Session"));
1451    }
1452
1453    #[test]
1454    fn file_stats_missing_file_returns_error() {
1455        let err = session_file_stats(Path::new("/nonexistent/file.txt"));
1456        assert!(err.is_err());
1457    }
1458
1459    #[test]
1460    fn list_sessions_errors_on_negative_message_count() {
1461        let harness = TestHarness::new("list_sessions_errors_on_negative_message_count");
1462        let root = harness.temp_path("sessions");
1463        fs::create_dir_all(&root).expect("create root dir");
1464        let index = SessionIndex::for_sessions_root(&root);
1465
1466        index
1467            .with_lock(|conn| {
1468                init_schema(conn)?;
1469                conn.execute_sync(
1470                    "INSERT INTO sessions (path,id,cwd,timestamp,message_count,last_modified_ms,size_bytes,name)
1471                     VALUES (?1,?2,?3,?4,?5,?6,?7,?8)",
1472                    &[
1473                        Value::Text("/tmp/negative-message-count.jsonl".to_string()),
1474                        Value::Text("id-neg".to_string()),
1475                        Value::Text("cwd-neg".to_string()),
1476                        Value::Text("2026-01-01T00:00:00Z".to_string()),
1477                        Value::BigInt(-1),
1478                        Value::BigInt(1),
1479                        Value::BigInt(1),
1480                        Value::Null,
1481                    ],
1482                )
1483                .map_err(|err| Error::session(format!("insert negative row: {err}")))?;
1484                Ok(())
1485            })
1486            .expect("seed negative row");
1487
1488        let err = index
1489            .list_sessions(None)
1490            .expect_err("negative count should error");
1491        assert!(
1492            matches!(err, Error::Session(ref msg) if msg.contains("message_count must be non-negative")),
1493            "unexpected error: {err:?}"
1494        );
1495    }
1496
1497    #[test]
1498    fn list_sessions_errors_on_negative_size_bytes() {
1499        let harness = TestHarness::new("list_sessions_errors_on_negative_size_bytes");
1500        let root = harness.temp_path("sessions");
1501        fs::create_dir_all(&root).expect("create root dir");
1502        let index = SessionIndex::for_sessions_root(&root);
1503
1504        index
1505            .with_lock(|conn| {
1506                init_schema(conn)?;
1507                conn.execute_sync(
1508                    "INSERT INTO sessions (path,id,cwd,timestamp,message_count,last_modified_ms,size_bytes,name)
1509                     VALUES (?1,?2,?3,?4,?5,?6,?7,?8)",
1510                    &[
1511                        Value::Text("/tmp/negative-size-bytes.jsonl".to_string()),
1512                        Value::Text("id-neg".to_string()),
1513                        Value::Text("cwd-neg".to_string()),
1514                        Value::Text("2026-01-01T00:00:00Z".to_string()),
1515                        Value::BigInt(1),
1516                        Value::BigInt(1),
1517                        Value::BigInt(-1),
1518                        Value::Null,
1519                    ],
1520                )
1521                .map_err(|err| Error::session(format!("insert negative row: {err}")))?;
1522                Ok(())
1523            })
1524            .expect("seed negative row");
1525
1526        let err = index
1527            .list_sessions(None)
1528            .expect_err("negative size should error");
1529        assert!(
1530            matches!(err, Error::Session(ref msg) if msg.contains("size_bytes must be non-negative")),
1531            "unexpected error: {err:?}"
1532        );
1533    }
1534
1535    // ── is_session_file_path ────────────────────────────────────────
1536
1537    #[test]
1538    fn is_session_file_path_jsonl() {
1539        assert!(is_session_file_path(Path::new("session.jsonl")));
1540        assert!(is_session_file_path(Path::new("/foo/bar/test.jsonl")));
1541    }
1542
1543    #[test]
1544    fn is_session_file_path_non_session() {
1545        assert!(!is_session_file_path(Path::new("session.txt")));
1546        assert!(!is_session_file_path(Path::new("session.json")));
1547        assert!(!is_session_file_path(Path::new("session")));
1548    }
1549
1550    // ── walk_sessions ───────────────────────────────────────────────
1551
1552    #[test]
1553    fn walk_sessions_finds_jsonl_files_recursively() {
1554        let harness = TestHarness::new("walk_sessions_finds_jsonl_files_recursively");
1555        let root = harness.temp_path("sessions");
1556        fs::create_dir_all(root.join("project")).expect("create dirs");
1557
1558        fs::write(root.join("a.jsonl"), "").expect("write");
1559        fs::write(root.join("project/b.jsonl"), "").expect("write");
1560        fs::write(root.join("not_session.txt"), "").expect("write");
1561
1562        let paths = walk_sessions(&root);
1563        let ok_paths: Vec<_> = paths
1564            .into_iter()
1565            .filter_map(std::result::Result::ok)
1566            .collect();
1567        assert_eq!(ok_paths.len(), 2);
1568        assert!(ok_paths.iter().any(|p| p.ends_with("a.jsonl")));
1569        assert!(ok_paths.iter().any(|p| p.ends_with("b.jsonl")));
1570    }
1571
1572    #[test]
1573    fn walk_sessions_empty_dir() {
1574        let harness = TestHarness::new("walk_sessions_empty_dir");
1575        let root = harness.temp_path("sessions");
1576        fs::create_dir_all(&root).expect("create dirs");
1577
1578        let paths = walk_sessions(&root);
1579        assert!(paths.is_empty());
1580    }
1581
1582    #[test]
1583    fn walk_sessions_nonexistent_dir() {
1584        let paths = walk_sessions(Path::new("/nonexistent/path"));
1585        assert!(paths.is_empty());
1586    }
1587
1588    // ── current_epoch_ms ────────────────────────────────────────────
1589
1590    #[test]
1591    fn current_epoch_ms_is_valid_number() {
1592        let ms = current_epoch_ms();
1593        let parsed: i64 = ms.parse().expect("should be valid i64");
1594        assert!(parsed > 0, "Epoch ms should be positive");
1595        // Should be after 2020-01-01
1596        assert!(parsed > 1_577_836_800_000, "Epoch ms should be after 2020");
1597    }
1598
1599    // ── delete_session_path ─────────────────────────────────────────
1600
1601    #[test]
1602    fn delete_session_path_removes_row() {
1603        let harness = TestHarness::new("delete_session_path_removes_row");
1604        let root = harness.temp_path("sessions");
1605        fs::create_dir_all(&root).expect("create root dir");
1606        let index = SessionIndex::for_sessions_root(&root);
1607
1608        let session_path = harness.temp_path("sessions/project/del.jsonl");
1609        fs::create_dir_all(session_path.parent().expect("parent")).expect("create dirs");
1610        fs::write(&session_path, "data").expect("write");
1611
1612        let mut session = Session::in_memory();
1613        session.header = make_header("id-del", "cwd-del");
1614        session.path = Some(session_path.clone());
1615        session.entries.push(make_user_entry(None, "m1", "hi"));
1616        index.index_session(&session).expect("index session");
1617
1618        let before = index.list_sessions(None).expect("list before");
1619        assert_eq!(before.len(), 1);
1620
1621        index
1622            .delete_session_path(&session_path)
1623            .expect("delete session path");
1624
1625        let after = index.list_sessions(None).expect("list after");
1626        assert!(after.is_empty());
1627    }
1628
1629    #[test]
1630    fn delete_session_path_noop_when_not_exists() {
1631        let harness = TestHarness::new("delete_session_path_noop_when_not_exists");
1632        let root = harness.temp_path("sessions");
1633        fs::create_dir_all(&root).expect("create root dir");
1634        let index = SessionIndex::for_sessions_root(&root);
1635
1636        // Delete a path that was never indexed — should succeed without error
1637        index
1638            .delete_session_path(Path::new("/nonexistent/session.jsonl"))
1639            .expect("delete nonexistent should succeed");
1640    }
1641
1642    // ── should_reindex ──────────────────────────────────────────────
1643
1644    #[test]
1645    fn should_reindex_returns_false_when_db_is_fresh() {
1646        let harness = TestHarness::new("should_reindex_returns_false_when_db_is_fresh");
1647        let root = harness.temp_path("sessions");
1648        fs::create_dir_all(&root).expect("create root dir");
1649        let index = SessionIndex::for_sessions_root(&root);
1650
1651        // Create the db by indexing a session
1652        let session_path = harness.temp_path("sessions/project/fresh.jsonl");
1653        fs::create_dir_all(session_path.parent().expect("parent")).expect("create dirs");
1654        fs::write(&session_path, "data").expect("write");
1655
1656        let mut session = Session::in_memory();
1657        session.header = make_header("id-fresh", "cwd-fresh");
1658        session.path = Some(session_path);
1659        session.entries.push(make_user_entry(None, "m1", "hi"));
1660        index.index_session(&session).expect("index session");
1661
1662        // DB just created — should not need reindex for large max_age
1663        assert!(!index.should_reindex(Duration::from_secs(3600)));
1664    }
1665
1666    #[cfg(unix)]
1667    #[test]
1668    fn should_reindex_prefers_meta_timestamp_over_stale_db_mtime() {
1669        let harness = TestHarness::new("should_reindex_prefers_meta_timestamp_over_stale_db_mtime");
1670        let root = harness.temp_path("sessions");
1671        fs::create_dir_all(&root).expect("create root dir");
1672        let index = SessionIndex::for_sessions_root(&root);
1673
1674        let session_path = harness.temp_path("sessions/project/fresh-meta.jsonl");
1675        fs::create_dir_all(session_path.parent().expect("parent")).expect("create dirs");
1676        fs::write(&session_path, "data").expect("write");
1677
1678        let mut session = Session::in_memory();
1679        session.header = make_header("id-fresh-meta", "cwd-fresh-meta");
1680        session.path = Some(session_path);
1681        session.entries.push(make_user_entry(None, "m1", "hi"));
1682        index.index_session(&session).expect("index session");
1683
1684        let status = Command::new("touch")
1685            .args([
1686                "-t",
1687                "200001010000",
1688                index.db_path.to_str().expect("utf-8 db path"),
1689            ])
1690            .status()
1691            .expect("run touch");
1692        assert!(status.success(), "touch should succeed");
1693
1694        assert!(
1695            !index.should_reindex(Duration::from_secs(3600)),
1696            "fresh meta.last_sync_epoch_ms should outrank stale db mtime"
1697        );
1698    }
1699
1700    // ── reindex_if_stale ────────────────────────────────────────────
1701
1702    #[test]
1703    fn reindex_if_stale_returns_false_when_fresh() {
1704        let harness = TestHarness::new("reindex_if_stale_returns_false_when_fresh");
1705        let root = harness.temp_path("sessions");
1706        fs::create_dir_all(&root).expect("create root dir");
1707        let index = SessionIndex::for_sessions_root(&root);
1708
1709        // Create a session file on disk
1710        let session_path = harness.temp_path("sessions/project/stale_test.jsonl");
1711        fs::create_dir_all(session_path.parent().expect("parent")).expect("create dirs");
1712        let header = make_header("id-stale", "cwd-stale");
1713        let entries = vec![make_user_entry(None, "m1", "msg")];
1714        write_session_jsonl(&session_path, &header, &entries);
1715
1716        // First reindex (no db exists yet)
1717        let result = index
1718            .reindex_if_stale(Duration::from_secs(3600))
1719            .expect("reindex");
1720        assert!(result, "First reindex should return true (no db)");
1721
1722        // Second call with large max_age should return false (fresh)
1723        let result = index
1724            .reindex_if_stale(Duration::from_secs(3600))
1725            .expect("reindex");
1726        assert!(!result, "Second reindex should return false (fresh)");
1727    }
1728
1729    #[test]
1730    fn reindex_if_stale_returns_true_when_stale() {
1731        let harness = TestHarness::new("reindex_if_stale_returns_true_when_stale");
1732        let root = harness.temp_path("sessions");
1733        fs::create_dir_all(&root).expect("create root dir");
1734        let index = SessionIndex::for_sessions_root(&root);
1735
1736        // Create a session on disk
1737        let session_path = harness.temp_path("sessions/project/stale.jsonl");
1738        fs::create_dir_all(session_path.parent().expect("parent")).expect("create dirs");
1739        let header = make_header("id-stale2", "cwd-stale2");
1740        let entries = vec![make_user_entry(None, "m1", "msg")];
1741        write_session_jsonl(&session_path, &header, &entries);
1742
1743        // Reindex with zero max_age — always stale
1744        let result = index.reindex_if_stale(Duration::ZERO).expect("reindex");
1745        assert!(result, "Should reindex with zero max_age");
1746    }
1747
1748    // ── build_meta ──────────────────────────────────────────────────
1749
1750    #[test]
1751    fn build_meta_from_file_returns_correct_fields() {
1752        let harness = TestHarness::new("build_meta_from_file_returns_correct_fields");
1753        let path = harness.temp_path("test_session.jsonl");
1754        let header = make_header("id-bm", "cwd-bm");
1755        let entries = vec![
1756            make_user_entry(None, "m1", "hello"),
1757            make_user_entry(Some("m1".to_string()), "m2", "world"),
1758            make_session_info_entry(Some("m2".to_string()), "info1", Some("Named Session")),
1759        ];
1760        write_session_jsonl(&path, &header, &entries);
1761
1762        let meta = build_meta_from_file(&path).expect("build_meta_from_file");
1763        assert_eq!(meta.id, "id-bm");
1764        assert_eq!(meta.cwd, "cwd-bm");
1765        assert_eq!(meta.message_count, 2);
1766        assert_eq!(meta.name.as_deref(), Some("Named Session"));
1767        assert!(meta.size_bytes > 0);
1768        assert!(meta.last_modified_ms > 0);
1769        assert!(meta.path.contains("test_session.jsonl"));
1770    }
1771
1772    // ── for_sessions_root path construction ─────────────────────────
1773
1774    #[test]
1775    fn for_sessions_root_constructs_correct_paths() {
1776        let root = Path::new("/home/user/.pi/sessions");
1777        let index = SessionIndex::for_sessions_root(root);
1778        assert_eq!(
1779            index.db_path,
1780            PathBuf::from("/home/user/.pi/sessions/session-index.sqlite")
1781        );
1782        assert_eq!(
1783            index.lock_path,
1784            PathBuf::from("/home/user/.pi/sessions/session-index.lock")
1785        );
1786    }
1787
1788    // ── sessions_root accessor ──────────────────────────────────────
1789
1790    #[test]
1791    fn sessions_root_returns_parent_of_db_path() {
1792        let root = Path::new("/home/user/.pi/sessions");
1793        let index = SessionIndex::for_sessions_root(root);
1794        assert_eq!(index.sessions_root(), root);
1795    }
1796
1797    // ── reindex_all clears old rows ─────────────────────────────────
1798
1799    #[test]
1800    fn reindex_all_replaces_stale_rows() {
1801        let harness = TestHarness::new("reindex_all_replaces_stale_rows");
1802        let root = harness.temp_path("sessions");
1803        fs::create_dir_all(root.join("project")).expect("create dirs");
1804
1805        // Index two sessions manually
1806        let index = SessionIndex::for_sessions_root(&root);
1807
1808        let path_a = harness.temp_path("sessions/project/a.jsonl");
1809        let header_a = make_header("id-a", "cwd-a");
1810        write_session_jsonl(&path_a, &header_a, &[make_user_entry(None, "m1", "a")]);
1811
1812        let path_b = harness.temp_path("sessions/project/b.jsonl");
1813        let header_b = make_header("id-b", "cwd-b");
1814        write_session_jsonl(&path_b, &header_b, &[make_user_entry(None, "m1", "b")]);
1815
1816        // Index both
1817        index.reindex_all().expect("reindex_all");
1818        let all = index.list_sessions(None).expect("list all");
1819        assert_eq!(all.len(), 2);
1820
1821        // Now delete one file on disk and reindex
1822        fs::remove_file(&path_a).expect("remove file");
1823        index.reindex_all().expect("reindex_all after delete");
1824        let all = index.list_sessions(None).expect("list after reindex");
1825        assert_eq!(all.len(), 1);
1826        assert_eq!(all[0].id, "id-b");
1827    }
1828
1829    // ── Session with multiple info entries ───────────────────────────
1830
1831    #[test]
1832    fn index_session_with_session_name() {
1833        let harness = TestHarness::new("index_session_with_session_name");
1834        let root = harness.temp_path("sessions");
1835        fs::create_dir_all(&root).expect("create root dir");
1836        let index = SessionIndex::for_sessions_root(&root);
1837
1838        let session_path = harness.temp_path("sessions/project/named.jsonl");
1839        fs::create_dir_all(session_path.parent().expect("parent")).expect("create dirs");
1840        fs::write(&session_path, "data").expect("write");
1841
1842        let mut session = Session::in_memory();
1843        session.header = make_header("id-named", "cwd-named");
1844        session.path = Some(session_path);
1845        session.entries.push(make_user_entry(None, "m1", "hi"));
1846        session.entries.push(make_session_info_entry(
1847            Some("m1".to_string()),
1848            "info1",
1849            Some("My Project"),
1850        ));
1851
1852        index.index_session(&session).expect("index session");
1853
1854        let sessions = index.list_sessions(None).expect("list");
1855        assert_eq!(sessions.len(), 1);
1856        assert_eq!(sessions[0].name.as_deref(), Some("My Project"));
1857    }
1858
1859    #[test]
1860    fn index_session_update_clears_stale_session_name() {
1861        let harness = TestHarness::new("index_session_update_clears_stale_session_name");
1862        let root = harness.temp_path("sessions");
1863        fs::create_dir_all(&root).expect("create root dir");
1864        let index = SessionIndex::for_sessions_root(&root);
1865
1866        let session_path = harness.temp_path("sessions/project/clear-name.jsonl");
1867        fs::create_dir_all(session_path.parent().expect("parent")).expect("create dirs");
1868        fs::write(&session_path, "first").expect("write");
1869
1870        let mut named = Session::in_memory();
1871        named.header = make_header("id-clear-name", "cwd-clear-name");
1872        named.path = Some(session_path.clone());
1873        named.entries.push(make_user_entry(None, "m1", "hi"));
1874        named.entries.push(make_session_info_entry(
1875            Some("m1".to_string()),
1876            "info1",
1877            Some("My Project"),
1878        ));
1879
1880        index.index_session(&named).expect("index named session");
1881        let first = index.list_sessions(None).expect("list named");
1882        assert_eq!(first.len(), 1);
1883        assert_eq!(first[0].name.as_deref(), Some("My Project"));
1884
1885        std::thread::sleep(Duration::from_millis(10));
1886        fs::write(&session_path, "second").expect("rewrite");
1887
1888        let mut unnamed = Session::in_memory();
1889        unnamed.header = make_header("id-clear-name", "cwd-clear-name");
1890        unnamed.path = Some(session_path);
1891        unnamed.entries.push(make_user_entry(None, "m1", "hi"));
1892
1893        index
1894            .index_session(&unnamed)
1895            .expect("index unnamed session");
1896        let second = index.list_sessions(None).expect("list unnamed");
1897        assert_eq!(second.len(), 1);
1898        assert_eq!(second[0].name, None);
1899    }
1900
1901    // ── Multiple cwd filtering ──────────────────────────────────────
1902
1903    #[test]
1904    fn list_sessions_no_cwd_returns_all() {
1905        let harness = TestHarness::new("list_sessions_no_cwd_returns_all");
1906        let root = harness.temp_path("sessions");
1907        fs::create_dir_all(&root).expect("create root dir");
1908        let index = SessionIndex::for_sessions_root(&root);
1909
1910        for (id, cwd) in [("id-x", "cwd-x"), ("id-y", "cwd-y"), ("id-z", "cwd-z")] {
1911            let path = harness.temp_path(format!("sessions/project/{id}.jsonl"));
1912            fs::create_dir_all(path.parent().expect("parent")).expect("create dirs");
1913            fs::write(&path, id).expect("write");
1914
1915            let mut session = Session::in_memory();
1916            session.header = make_header(id, cwd);
1917            session.path = Some(path);
1918            session.entries.push(make_user_entry(None, "m1", id));
1919            index.index_session(&session).expect("index session");
1920        }
1921
1922        let all = index.list_sessions(None).expect("list all");
1923        assert_eq!(all.len(), 3);
1924    }
1925
1926    // ── build_meta_from_jsonl with entries having parse errors ───────
1927
1928    #[test]
1929    fn build_meta_from_jsonl_skips_bad_entry_lines() {
1930        let harness = TestHarness::new("build_meta_from_jsonl_skips_bad_entry_lines");
1931        let path = harness.temp_path("mixed.jsonl");
1932
1933        let header = make_header("id-mixed", "cwd-mixed");
1934        let good_entry = make_user_entry(None, "m1", "good");
1935        let mut content = serde_json::to_string(&header).expect("ser header");
1936        content.push('\n');
1937        content.push_str(&serde_json::to_string(&good_entry).expect("ser entry"));
1938        content.push('\n');
1939        content.push_str("not valid json\n");
1940        content.push_str(
1941            &serde_json::to_string(&make_user_entry(Some("m1".to_string()), "m2", "another"))
1942                .expect("ser entry"),
1943        );
1944        content.push('\n');
1945
1946        fs::write(&path, content).expect("write");
1947
1948        let meta = build_meta_from_jsonl(&path).expect("build_meta");
1949        // Bad line is skipped, so we get 2 messages
1950        assert_eq!(meta.message_count, 2);
1951    }
1952
1953    #[test]
1954    fn build_meta_from_jsonl_errors_on_invalid_utf8_entry_line() {
1955        let harness = TestHarness::new("build_meta_from_jsonl_errors_on_invalid_utf8_entry_line");
1956        let path = harness.temp_path("invalid_utf8.jsonl");
1957
1958        let header = make_header("id-invalid", "cwd-invalid");
1959        let mut bytes = serde_json::to_vec(&header).expect("serialize header");
1960        bytes.push(b'\n');
1961        bytes.extend_from_slice(br#"{"type":"message","message":{"role":"user","content":"ok"}}"#);
1962        bytes.push(b'\n');
1963        bytes.extend_from_slice(&[0xFF, 0xFE, b'\n']);
1964
1965        fs::write(&path, bytes).expect("write");
1966
1967        let err = build_meta_from_jsonl(&path).expect_err("invalid utf8 should error");
1968        assert!(
1969            matches!(err, Error::Session(ref msg) if msg.contains("Read session entry line")),
1970            "Expected entry line read error, got {err:?}"
1971        );
1972    }
1973
1974    #[test]
1975    fn read_capped_utf8_line_with_limit_rejects_oversized_line_without_newline() {
1976        let oversized = "x".repeat(5);
1977        let mut reader = std::io::Cursor::new(oversized.into_bytes());
1978
1979        let err = read_capped_utf8_line_with_limit(&mut reader, 4).expect_err("oversized line");
1980        assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
1981        assert!(err.to_string().contains("JSONL line exceeds 4 bytes"));
1982    }
1983
1984    #[test]
1985    fn read_capped_utf8_line_with_limit_allows_exact_limit_before_newline() {
1986        let mut reader = std::io::Cursor::new(b"abcd\n".to_vec());
1987
1988        let line = read_capped_utf8_line_with_limit(&mut reader, 4)
1989            .expect("read line")
1990            .expect("line present");
1991        assert_eq!(line, "abcd\n");
1992        assert!(
1993            read_capped_utf8_line_with_limit(&mut reader, 4)
1994                .expect("read eof")
1995                .is_none()
1996        );
1997    }
1998
1999    #[test]
2000    fn read_capped_utf8_line_with_limit_drains_oversized_line_remainder() {
2001        let mut reader = std::io::Cursor::new(b"xxxxx\ny\n".to_vec());
2002
2003        let err = read_capped_utf8_line_with_limit(&mut reader, 4).expect_err("oversized line");
2004        assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
2005
2006        let next_line = read_capped_utf8_line_with_limit(&mut reader, 4)
2007            .expect("read next line")
2008            .expect("next line present");
2009        assert_eq!(next_line, "y\n");
2010    }
2011
2012    #[test]
2013    fn index_session_snapshot_rejects_message_count_over_i64_max() {
2014        let harness = TestHarness::new("index_session_snapshot_rejects_message_count_over_i64_max");
2015        let root = harness.temp_path("sessions");
2016        fs::create_dir_all(root.join("project")).expect("create project dir");
2017        let index = SessionIndex::for_sessions_root(&root);
2018
2019        let path = root.join("project").join("overflow.jsonl");
2020        fs::write(&path, "").expect("write session payload");
2021
2022        let header = make_header("id-overflow", "cwd-overflow");
2023        let err = index
2024            .index_session_snapshot(&path, &header, (i64::MAX as u64) + 1, None)
2025            .expect_err("out-of-range message_count should error");
2026        assert!(
2027            matches!(err, Error::Session(ref msg) if msg.contains("message_count exceeds SQLite INTEGER range")),
2028            "expected out-of-range message_count error, got {err:?}"
2029        );
2030    }
2031
2032    proptest! {
2033        #![proptest_config(ProptestConfig { cases: 128, .. ProptestConfig::default() })]
2034
2035        #[test]
2036        fn proptest_list_sessions_handles_arbitrary_sql_rows(
2037            rows in prop::collection::vec(arbitrary_meta_row_strategy(), 1..16)
2038        ) {
2039            let harness = TestHarness::new("proptest_list_sessions_handles_arbitrary_sql_rows");
2040            let root = harness.temp_path("sessions");
2041            fs::create_dir_all(&root).expect("create root dir");
2042            let index = SessionIndex::for_sessions_root(&root);
2043
2044            let expected_by_path: HashMap<String, ArbitraryMetaRow> = rows
2045                .iter()
2046                .cloned()
2047                .enumerate()
2048                .map(|(idx, row)| (format!("/tmp/pi-session-index-{idx}.jsonl"), row))
2049                .collect();
2050
2051            index
2052                .with_lock(|conn| {
2053                    init_schema(conn)?;
2054                    conn.execute_sync("DELETE FROM sessions", &[])
2055                        .map_err(|err| Error::session(format!("delete sessions: {err}")))?;
2056
2057                    for (idx, row) in rows.iter().enumerate() {
2058                        let path = format!("/tmp/pi-session-index-{idx}.jsonl");
2059                        conn.execute_sync(
2060                            "INSERT INTO sessions (path,id,cwd,timestamp,message_count,last_modified_ms,size_bytes,name)
2061                             VALUES (?1,?2,?3,?4,?5,?6,?7,?8)",
2062                            &[
2063                                Value::Text(path),
2064                                Value::Text(row.id.clone()),
2065                                Value::Text(row.cwd.clone()),
2066                                Value::Text(row.timestamp.clone()),
2067                                Value::BigInt(row.message_count),
2068                                Value::BigInt(row.last_modified_ms),
2069                                Value::BigInt(row.size_bytes),
2070                                row.name.clone().map_or(Value::Null, Value::Text),
2071                            ],
2072                        )
2073                        .map_err(|err| Error::session(format!("insert session row {idx}: {err}")))?;
2074                    }
2075
2076                    Ok(())
2077                })
2078                .expect("seed session rows");
2079
2080            let has_invalid_unsigned = rows
2081                .iter()
2082                .any(|row| row.message_count < 0 || row.size_bytes < 0);
2083
2084            let listed = index.list_sessions(None);
2085            if has_invalid_unsigned {
2086                prop_assert!(listed.is_err(), "negative message_count/size_bytes should error");
2087                return Ok(());
2088            }
2089            let listed = listed.expect("list all sessions");
2090            prop_assert_eq!(listed.len(), rows.len());
2091            for pair in listed.windows(2) {
2092                prop_assert!(pair[0].last_modified_ms >= pair[1].last_modified_ms);
2093            }
2094
2095            for meta in &listed {
2096                let expected = expected_by_path
2097                    .get(&meta.path)
2098                    .expect("expected row should exist");
2099                prop_assert_eq!(&meta.id, &expected.id);
2100                prop_assert_eq!(&meta.cwd, &expected.cwd);
2101                prop_assert_eq!(&meta.timestamp, &expected.timestamp);
2102                prop_assert_eq!(
2103                    meta.message_count,
2104                    u64::try_from(expected.message_count).expect("filtered non-negative count")
2105                );
2106                prop_assert_eq!(
2107                    meta.size_bytes,
2108                    u64::try_from(expected.size_bytes).expect("filtered non-negative size")
2109                );
2110                prop_assert_eq!(&meta.name, &expected.name);
2111            }
2112
2113            let filtered = index
2114                .list_sessions(Some("cwd-a"))
2115                .expect("list cwd-a sessions");
2116            let expected_filtered = rows.iter().filter(|row| row.cwd == "cwd-a").count();
2117            prop_assert_eq!(filtered.len(), expected_filtered);
2118            prop_assert!(filtered.iter().all(|meta| meta.cwd == "cwd-a"));
2119            for pair in filtered.windows(2) {
2120                prop_assert!(pair[0].last_modified_ms >= pair[1].last_modified_ms);
2121            }
2122        }
2123    }
2124
2125    proptest! {
2126        #![proptest_config(ProptestConfig { cases: 128, .. ProptestConfig::default() })]
2127
2128        #[test]
2129        fn proptest_index_session_snapshot_roundtrip_metadata(
2130            id in ident_strategy(),
2131            cwd in cwd_strategy(),
2132            timestamp in timestamp_strategy(),
2133            message_count in any::<u64>(),
2134            name in optional_name_strategy(),
2135            content in prop::collection::vec(any::<u8>(), 0..256)
2136        ) {
2137            let harness = TestHarness::new("proptest_index_session_snapshot_roundtrip_metadata");
2138            let root = harness.temp_path("sessions");
2139            fs::create_dir_all(root.join("project")).expect("create project dir");
2140            let index = SessionIndex::for_sessions_root(&root);
2141
2142            let path = root.join("project").join(format!("{id}.jsonl"));
2143            fs::write(&path, &content).expect("write session payload");
2144
2145            let mut header = make_header(&id, &cwd);
2146            header.timestamp = timestamp.clone();
2147            let index_result = index.index_session_snapshot(&path, &header, message_count, name.clone());
2148            if message_count > i64::MAX as u64 {
2149                prop_assert!(
2150                    index_result.is_err(),
2151                    "expected out-of-range message_count to fail indexing"
2152                );
2153            } else {
2154                index_result.expect("index snapshot");
2155
2156                let listed = index
2157                    .list_sessions(Some(&cwd))
2158                    .expect("list sessions for cwd");
2159                prop_assert_eq!(listed.len(), 1);
2160
2161                let meta = &listed[0];
2162                let expected_count = message_count;
2163                prop_assert_eq!(&meta.id, &id);
2164                prop_assert_eq!(&meta.cwd, &cwd);
2165                prop_assert_eq!(&meta.timestamp, &timestamp);
2166                prop_assert_eq!(&meta.path, &path.display().to_string());
2167                prop_assert_eq!(meta.message_count, expected_count);
2168                prop_assert_eq!(meta.size_bytes, content.len() as u64);
2169                prop_assert_eq!(&meta.name, &name);
2170                prop_assert!(meta.last_modified_ms >= 0);
2171
2172                let other_cwd = index
2173                    .list_sessions(Some("definitely-not-this-cwd"))
2174                    .expect("list sessions for unmatched cwd");
2175                prop_assert!(other_cwd.is_empty());
2176            }
2177        }
2178    }
2179}