1use std::{
2 fs,
3 io::{BufRead, BufReader},
4 path::{Path, PathBuf},
5};
6
7use anyhow::{Context, Result};
8use rusqlite::{Connection, params};
9use serde_json::Value;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct SessionRow {
13 pub path: PathBuf,
14 pub id: Option<String>,
15 pub timestamp: Option<String>,
16 pub cwd: Option<String>,
17 pub repository_url: Option<String>,
18 pub branch: Option<String>,
19 pub first_message: String,
20 pub is_subsession: bool,
21}
22
23#[derive(Debug, Clone, Copy, Default)]
24pub struct CollectOptions {
25 pub include_subsessions: bool,
26 pub include_empty_messages: bool,
27}
28
29pub fn is_subsession_meta(payload: &Value) -> bool {
30 let thread_source = payload
31 .get("thread_source")
32 .and_then(Value::as_str)
33 .unwrap_or_default();
34
35 let source_is_subagent = payload
36 .get("source")
37 .and_then(Value::as_object)
38 .is_some_and(|source| source.contains_key("subagent"));
39
40 let has_agent_role = payload
41 .get("agent_role")
42 .and_then(Value::as_str)
43 .is_some_and(|role| !role.trim().is_empty());
44
45 thread_source == "subagent" || source_is_subagent || has_agent_role
46}
47
48pub fn parse_session_file(path: &Path) -> Result<Option<SessionRow>> {
49 let file =
50 fs::File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
51 let reader = BufReader::new(file);
52 let mut meta: Option<SessionRow> = None;
53 let mut first_message: Option<String> = None;
54
55 for (line_number, line) in reader.lines().enumerate() {
56 let line =
57 line.with_context(|| format!("failed to read {}:{}", path.display(), line_number + 1))?;
58 let line = line.trim();
59 if line.is_empty() {
60 continue;
61 }
62
63 let value: Value = match serde_json::from_str(line) {
64 Ok(value) => value,
65 Err(err) => {
66 eprintln!(
67 "warning: {}:{}: invalid json: {err}",
68 path.display(),
69 line_number + 1
70 );
71 continue;
72 }
73 };
74
75 let record_type = value.get("type").and_then(Value::as_str);
76 let payload = value.get("payload").unwrap_or(&Value::Null);
77
78 if meta.is_none() && record_type == Some("session_meta") {
79 let git = payload.get("git").unwrap_or(&Value::Null);
80 meta = Some(SessionRow {
81 path: path.to_path_buf(),
82 id: string_field(payload, "id"),
83 timestamp: string_field(payload, "timestamp"),
84 cwd: string_field(payload, "cwd"),
85 repository_url: string_field(git, "repository_url"),
86 branch: string_field(git, "branch"),
87 first_message: String::new(),
88 is_subsession: is_subsession_meta(payload),
89 });
90 }
91
92 if first_message.is_none()
93 && record_type == Some("event_msg")
94 && payload.get("type").and_then(Value::as_str) == Some("user_message")
95 {
96 first_message = Some(
97 payload
98 .get("message")
99 .and_then(Value::as_str)
100 .unwrap_or_default()
101 .to_string(),
102 );
103 }
104
105 if meta.is_some() && first_message.is_some() {
106 break;
107 }
108 }
109
110 let Some(mut row) = meta else {
111 return Ok(None);
112 };
113 row.first_message = first_message.unwrap_or_default();
114 Ok(Some(row))
115}
116
117pub fn should_include_row(row: &SessionRow, options: CollectOptions) -> bool {
118 if !options.include_empty_messages && row.first_message.trim().is_empty() {
119 return false;
120 }
121 if !options.include_subsessions && row.is_subsession {
122 return false;
123 }
124 true
125}
126
127pub fn session_date(row: &SessionRow) -> String {
128 row.timestamp
129 .as_deref()
130 .map(|timestamp| timestamp.chars().take(10).collect())
131 .unwrap_or_default()
132}
133
134pub fn searchable_text(row: &SessionRow) -> String {
135 [
136 row.first_message.as_str(),
137 row.cwd.as_deref().unwrap_or_default(),
138 row.repository_url.as_deref().unwrap_or_default(),
139 row.branch.as_deref().unwrap_or_default(),
140 row.timestamp.as_deref().unwrap_or_default(),
141 &session_date(row),
142 ]
143 .join("\n")
144 .to_lowercase()
145}
146
147pub fn filter_sessions(rows: &[SessionRow], query: &str) -> Vec<SessionRow> {
148 let terms: Vec<String> = query.split_whitespace().map(str::to_lowercase).collect();
149
150 if terms.is_empty() {
151 return rows.to_vec();
152 }
153
154 rows.iter()
155 .filter(|row| {
156 let haystack = searchable_text(row);
157 terms.iter().all(|term| haystack.contains(term))
158 })
159 .cloned()
160 .collect()
161}
162
163pub fn collect_rows(
164 sessions_root: &Path,
165 options: CollectOptions,
166) -> Result<(Vec<SessionRow>, usize, usize)> {
167 let mut paths = session_jsonl_paths(sessions_root)?;
168 paths.sort();
169
170 let mut rows = Vec::new();
171 let mut skipped = 0;
172 let total = paths.len();
173
174 for path in paths {
175 match parse_session_file(&path)? {
176 Some(row) if should_include_row(&row, options) => rows.push(row),
177 _ => skipped += 1,
178 }
179 }
180
181 Ok((rows, total, skipped))
182}
183
184pub fn session_jsonl_paths(sessions_root: &Path) -> Result<Vec<PathBuf>> {
185 let mut out = Vec::new();
186
187 if !sessions_root.exists() {
188 return Ok(out);
189 }
190
191 for year in read_dirs(sessions_root)? {
192 for month in read_dirs(&year)? {
193 for day in read_dirs(&month)? {
194 for entry in fs::read_dir(&day)
195 .with_context(|| format!("failed to read {}", day.display()))?
196 {
197 let path = entry?.path();
198 if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
199 out.push(path);
200 }
201 }
202 }
203 }
204 }
205
206 Ok(out)
207}
208
209pub fn recreate_database(db_path: &Path, rows: &[SessionRow]) -> Result<()> {
210 if let Some(parent) = db_path.parent() {
211 fs::create_dir_all(parent)
212 .with_context(|| format!("failed to create {}", parent.display()))?;
213 }
214
215 let mut conn = Connection::open(db_path)
216 .with_context(|| format!("failed to open {}", db_path.display()))?;
217 conn.execute("DROP TABLE IF EXISTS sessions", [])?;
218 conn.execute(
219 r#"
220 CREATE TABLE sessions (
221 path TEXT,
222 id TEXT,
223 timestamp TEXT,
224 cwd TEXT,
225 repository_url TEXT,
226 branch TEXT,
227 first_message TEXT
228 )
229 "#,
230 [],
231 )?;
232
233 let tx = conn.transaction()?;
234 {
235 let mut stmt = tx.prepare(
236 r#"
237 INSERT INTO sessions (
238 path, id, timestamp, cwd, repository_url, branch, first_message
239 )
240 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
241 "#,
242 )?;
243
244 for row in rows {
245 stmt.execute(params![
246 row.path.to_string_lossy().as_ref(),
247 row.id.as_deref(),
248 row.timestamp.as_deref(),
249 row.cwd.as_deref(),
250 row.repository_url.as_deref(),
251 row.branch.as_deref(),
252 row.first_message.as_str(),
253 ])?;
254 }
255 }
256 tx.commit()?;
257
258 conn.execute("CREATE INDEX sessions_path_idx ON sessions(path)", [])?;
259 conn.execute(
260 "CREATE INDEX sessions_timestamp_idx ON sessions(timestamp)",
261 [],
262 )?;
263 conn.execute("CREATE INDEX sessions_cwd_idx ON sessions(cwd)", [])?;
264 Ok(())
265}
266
267pub fn load_sessions(db_path: &Path) -> Result<Vec<SessionRow>> {
268 let conn = Connection::open(db_path)
269 .with_context(|| format!("failed to open {}", db_path.display()))?;
270 let mut stmt = conn.prepare(
271 r#"
272 SELECT path, id, timestamp, cwd, repository_url, branch, first_message
273 FROM sessions
274 ORDER BY timestamp DESC
275 "#,
276 )?;
277
278 let rows = stmt
279 .query_map([], |row| {
280 Ok(SessionRow {
281 path: PathBuf::from(row.get::<_, String>(0)?),
282 id: row.get(1)?,
283 timestamp: row.get(2)?,
284 cwd: row.get(3)?,
285 repository_url: row.get(4)?,
286 branch: row.get(5)?,
287 first_message: row.get::<_, Option<String>>(6)?.unwrap_or_default(),
288 is_subsession: false,
289 })
290 })?
291 .collect::<rusqlite::Result<Vec<_>>>()?;
292
293 Ok(rows)
294}
295
296fn string_field(value: &Value, field: &str) -> Option<String> {
297 value
298 .get(field)
299 .and_then(Value::as_str)
300 .map(ToOwned::to_owned)
301}
302
303fn read_dirs(path: &Path) -> Result<Vec<PathBuf>> {
304 let mut dirs = Vec::new();
305 for entry in fs::read_dir(path).with_context(|| format!("failed to read {}", path.display()))? {
306 let path = entry?.path();
307 if path.is_dir() {
308 dirs.push(path);
309 }
310 }
311 dirs.sort();
312 Ok(dirs)
313}
314
315#[cfg(test)]
316mod tests {
317 use std::{
318 fs,
319 time::{SystemTime, UNIX_EPOCH},
320 };
321
322 use serde_json::json;
323
324 use super::*;
325
326 fn temp_path(name: &str) -> PathBuf {
327 let nonce = SystemTime::now()
328 .duration_since(UNIX_EPOCH)
329 .unwrap()
330 .as_nanos();
331 std::env::temp_dir().join(format!("codex-session-selector-{nonce}-{name}"))
332 }
333
334 fn write_jsonl(path: &Path, message: &str, source: Value, thread_source: &str) {
335 let meta = json!({
336 "type": "session_meta",
337 "payload": {
338 "id": "session-id",
339 "timestamp": "2026-05-28T00:00:00Z",
340 "cwd": "/repo/demo",
341 "source": source,
342 "thread_source": thread_source,
343 "git": {
344 "repository_url": "https://git.example/demo.git",
345 "branch": "main"
346 }
347 }
348 });
349 let user = json!({
350 "type": "event_msg",
351 "payload": {
352 "type": "user_message",
353 "message": message
354 }
355 });
356 fs::write(path, format!("{meta}\n{user}\n")).unwrap();
357 }
358
359 #[test]
360 fn detects_subsessions_from_session_meta_structure() {
361 assert!(is_subsession_meta(&json!({
362 "source": {"subagent": {"thread_spawn": {"parent_thread_id": "parent"}}}
363 })));
364 assert!(is_subsession_meta(&json!({"thread_source": "subagent"})));
365 assert!(is_subsession_meta(&json!({"agent_role": "worker"})));
366 assert!(!is_subsession_meta(
367 &json!({"source": "cli", "thread_source": "user"})
368 ));
369 }
370
371 #[test]
372 fn parse_session_file_extracts_metadata_and_first_message() {
373 let path = temp_path("human.jsonl");
374 write_jsonl(&path, "real user request", json!("cli"), "user");
375
376 let row = parse_session_file(&path).unwrap().unwrap();
377
378 assert_eq!(row.path, path);
379 assert_eq!(row.id.as_deref(), Some("session-id"));
380 assert_eq!(row.timestamp.as_deref(), Some("2026-05-28T00:00:00Z"));
381 assert_eq!(row.cwd.as_deref(), Some("/repo/demo"));
382 assert_eq!(
383 row.repository_url.as_deref(),
384 Some("https://git.example/demo.git")
385 );
386 assert_eq!(row.branch.as_deref(), Some("main"));
387 assert_eq!(row.first_message, "real user request");
388 assert!(!row.is_subsession);
389
390 let _ = fs::remove_file(path);
391 }
392
393 #[test]
394 fn should_include_hides_empty_and_subsession_by_default() {
395 let mut row = SessionRow {
396 path: PathBuf::from("/tmp/a.jsonl"),
397 id: None,
398 timestamp: None,
399 cwd: None,
400 repository_url: None,
401 branch: None,
402 first_message: "ordinary looking task text".to_string(),
403 is_subsession: true,
404 };
405 assert!(!should_include_row(&row, CollectOptions::default()));
406
407 row.is_subsession = false;
408 row.first_message = " ".to_string();
409 assert!(!should_include_row(&row, CollectOptions::default()));
410
411 row.first_message = "human request".to_string();
412 assert!(should_include_row(&row, CollectOptions::default()));
413 }
414
415 #[test]
416 fn filter_sessions_searches_message_metadata_and_date() {
417 let rows = vec![
418 SessionRow {
419 path: PathBuf::from("/tmp/a.jsonl"),
420 id: None,
421 timestamp: Some("2026-05-27T01:00:00Z".to_string()),
422 cwd: Some("/repo/alpha".to_string()),
423 repository_url: Some("https://git.example/alpha.git".to_string()),
424 branch: Some("main".to_string()),
425 first_message: "fix docker compose".to_string(),
426 is_subsession: false,
427 },
428 SessionRow {
429 path: PathBuf::from("/tmp/b.jsonl"),
430 id: None,
431 timestamp: Some("2026-05-28T01:00:00Z".to_string()),
432 cwd: Some("/repo/beta".to_string()),
433 repository_url: Some("https://git.example/beta.git".to_string()),
434 branch: Some("feature/search".to_string()),
435 first_message: "add selector".to_string(),
436 is_subsession: false,
437 },
438 ];
439
440 assert_eq!(
441 filter_sessions(&rows, "docker")[0].path,
442 PathBuf::from("/tmp/a.jsonl")
443 );
444 assert_eq!(
445 filter_sessions(&rows, "alpha.git")[0].path,
446 PathBuf::from("/tmp/a.jsonl")
447 );
448 assert_eq!(
449 filter_sessions(&rows, "feature")[0].path,
450 PathBuf::from("/tmp/b.jsonl")
451 );
452 assert_eq!(
453 filter_sessions(&rows, "2026-05-28")[0].path,
454 PathBuf::from("/tmp/b.jsonl")
455 );
456 }
457}