Skip to main content

codex_helper_core/
sessions.rs

1use std::cmp::{Ordering, Reverse};
2use std::collections::HashMap;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use anyhow::{Context, Result, anyhow};
8use futures_util::StreamExt;
9use futures_util::stream;
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use tokio::fs;
13use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, BufReader};
14
15use crate::config::codex_sessions_dir;
16use crate::file_replace::write_bytes_file_async;
17
18mod stats_cache;
19mod transcript;
20
21use stats_cache::{SessionStatsCache, SessionStatsSnapshot};
22pub use transcript::{codex_session_transcript_tail_contains_query, read_codex_session_transcript};
23
24/// Summary information for a Codex conversation session.
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
26pub enum SessionSummarySource {
27    #[default]
28    LocalFile,
29    ObservedOnly,
30}
31
32#[derive(Debug, Clone)]
33pub struct SessionSummary {
34    pub id: String,
35    pub path: PathBuf,
36    pub cwd: Option<String>,
37    pub created_at: Option<String>,
38    pub updated_at: Option<String>,
39    /// RFC3339 timestamp string for the most recent assistant message, if available.
40    pub last_response_at: Option<String>,
41    /// Number of user turns (from `event_msg` user_message).
42    pub user_turns: usize,
43    /// Number of assistant messages (from `response_item` message role=assistant).
44    pub assistant_turns: usize,
45    /// Conversation rounds (best-effort; currently `min(user_turns, assistant_turns)`).
46    pub rounds: usize,
47    pub first_user_message: Option<String>,
48    pub source: SessionSummarySource,
49    pub sort_hint_ms: Option<u64>,
50}
51
52/// Basic metadata for a Codex session (best-effort parsed from JSONL).
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct SessionMeta {
55    pub id: String,
56    pub cwd: Option<String>,
57    pub created_at: Option<String>,
58}
59
60/// A single transcript message extracted from a Codex session JSONL.
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct SessionTranscriptMessage {
63    pub timestamp: Option<String>,
64    pub role: String,
65    pub text: String,
66}
67
68/// Minimal data for printing `project_root session_id` style lists.
69#[derive(Debug, Clone)]
70pub struct RecentSession {
71    pub id: String,
72    pub cwd: Option<String>,
73    pub mtime_ms: u64,
74}
75
76#[cfg(feature = "gui")]
77#[derive(Debug, Clone)]
78pub struct SessionDayDir {
79    pub date: String,
80    pub path: PathBuf,
81}
82
83#[cfg(feature = "gui")]
84#[derive(Debug, Clone)]
85pub struct SessionIndexItem {
86    pub id: String,
87    pub path: PathBuf,
88    pub cwd: Option<String>,
89    pub created_at: Option<String>,
90    pub updated_hint: Option<String>,
91    pub mtime_ms: u64,
92    pub first_user_message: Option<String>,
93}
94
95pub fn infer_project_root_from_cwd(cwd: &str) -> String {
96    let path = std::path::PathBuf::from(cwd);
97    if !path.is_absolute() {
98        return cwd.to_string();
99    }
100
101    let canonical = std::fs::canonicalize(&path).unwrap_or(path);
102    let mut cur = canonical.clone();
103    loop {
104        if cur.join(".git").exists() {
105            return cur.to_string_lossy().to_string();
106        }
107        if !cur.pop() {
108            break;
109        }
110    }
111    canonical.to_string_lossy().to_string()
112}
113
114const MAX_SCAN_FILES: usize = 10_000;
115const HEAD_SCAN_LINES: usize = 512;
116const IO_CHUNK_SIZE: usize = 64 * 1024;
117const TAIL_SCAN_MAX_BYTES: usize = 1024 * 1024;
118const SESSION_IO_CONCURRENCY: usize = 8;
119
120const MAX_SCAN_FILES_RECENT: usize = 200_000;
121
122/// Find recent Codex sessions for a given directory, preferring sessions whose cwd matches that directory
123/// (or one of its ancestors/descendants). Results are ordered newest-first by updated_at.
124pub async fn find_codex_sessions_for_dir(
125    root_dir: &Path,
126    limit: usize,
127) -> Result<Vec<SessionSummary>> {
128    let root = codex_sessions_dir();
129    if !root.exists() {
130        return Ok(Vec::new());
131    }
132
133    let mut matched: Vec<SessionHeader> = Vec::new();
134    let mut others: Vec<SessionHeader> = Vec::new();
135    let mut scanned_files: usize = 0;
136
137    let year_dirs = collect_dirs_desc(&root, |s| s.parse::<u32>().ok()).await?;
138
139    'outer: for (_year, year_path) in year_dirs {
140        let month_dirs = collect_dirs_desc(&year_path, |s| s.parse::<u8>().ok()).await?;
141        for (_month, month_path) in month_dirs {
142            let day_dirs = collect_dirs_desc(&month_path, |s| s.parse::<u8>().ok()).await?;
143            for (_day, day_path) in day_dirs {
144                let day_files = collect_rollout_files_sorted(&day_path).await?;
145                for path in day_files {
146                    if scanned_files >= MAX_SCAN_FILES {
147                        break 'outer;
148                    }
149                    scanned_files += 1;
150
151                    let header_opt = read_session_header(&path, root_dir).await?;
152                    let Some(header) = header_opt else {
153                        continue;
154                    };
155
156                    if header.is_cwd_match {
157                        matched.push(header);
158                    } else {
159                        others.push(header);
160                    }
161                }
162            }
163        }
164    }
165
166    select_and_expand_headers(matched, others, limit).await
167}
168
169/// Search Codex sessions for user messages containing the given substring.
170/// Matching is case-insensitive and only considers the first user message per session.
171pub async fn search_codex_sessions_for_dir(
172    root_dir: &Path,
173    query: &str,
174    limit: usize,
175) -> Result<Vec<SessionSummary>> {
176    let needle = query.to_lowercase();
177
178    let root = codex_sessions_dir();
179    if !root.exists() {
180        return Ok(Vec::new());
181    }
182
183    let mut matched: Vec<SessionHeader> = Vec::new();
184    let mut others: Vec<SessionHeader> = Vec::new();
185    let mut scanned_files: usize = 0;
186
187    let year_dirs = collect_dirs_desc(&root, |s| s.parse::<u32>().ok()).await?;
188
189    'outer: for (_year, year_path) in year_dirs {
190        let month_dirs = collect_dirs_desc(&year_path, |s| s.parse::<u8>().ok()).await?;
191        for (_month, month_path) in month_dirs {
192            let day_dirs = collect_dirs_desc(&month_path, |s| s.parse::<u8>().ok()).await?;
193            for (_day, day_path) in day_dirs {
194                let day_files = collect_rollout_files_sorted(&day_path).await?;
195                for path in day_files {
196                    if scanned_files >= MAX_SCAN_FILES {
197                        break 'outer;
198                    }
199                    scanned_files += 1;
200
201                    let header_opt = read_session_header(&path, root_dir).await?;
202                    let Some(header) = header_opt else {
203                        continue;
204                    };
205                    if !header
206                        .first_user_message
207                        .to_lowercase()
208                        .contains(needle.as_str())
209                    {
210                        continue;
211                    }
212
213                    if header.is_cwd_match {
214                        matched.push(header);
215                    } else {
216                        others.push(header);
217                    }
218                }
219            }
220        }
221    }
222
223    select_and_expand_headers(matched, others, limit).await
224}
225
226/// Convenience wrapper that uses the current working directory as the root for session matching.
227pub async fn find_codex_sessions_for_current_dir(limit: usize) -> Result<Vec<SessionSummary>> {
228    let cwd = std::env::current_dir().context("failed to resolve current directory")?;
229    find_codex_sessions_for_dir(&cwd, limit).await
230}
231
232/// Convenience wrapper to search sessions under the current working directory.
233pub async fn search_codex_sessions_for_current_dir(
234    query: &str,
235    limit: usize,
236) -> Result<Vec<SessionSummary>> {
237    let cwd = std::env::current_dir().context("failed to resolve current directory")?;
238    search_codex_sessions_for_dir(&cwd, query, limit).await
239}
240
241/// List recent Codex sessions across all projects, filtered by session file mtime.
242///
243/// This is optimized for "resume" workflows: it avoids counting turns/timestamps and only reads the
244/// `session_meta` header for sessions that pass the recency filter.
245pub async fn find_recent_codex_sessions(
246    since: Duration,
247    limit: usize,
248) -> Result<Vec<RecentSession>> {
249    let root = codex_sessions_dir();
250    find_recent_codex_sessions_in_dir(&root, since, limit).await
251}
252
253#[cfg(feature = "gui")]
254pub async fn find_recent_codex_session_summaries(
255    since: Duration,
256    limit: usize,
257) -> Result<Vec<SessionSummary>> {
258    if limit == 0 {
259        return Ok(Vec::new());
260    }
261    let sessions_dir = codex_sessions_dir();
262    if !sessions_dir.exists() {
263        return Ok(Vec::new());
264    }
265
266    let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
267
268    let now_ms = SystemTime::now()
269        .duration_since(UNIX_EPOCH)
270        .unwrap_or_default()
271        .as_millis()
272        .min(u64::MAX as u128) as u64;
273    let since_ms = since.as_millis().min(u64::MAX as u128) as u64;
274    let threshold_ms = now_ms.saturating_sub(since_ms);
275
276    let mut headers: Vec<SessionHeader> = Vec::new();
277    let mut scanned_files: usize = 0;
278
279    let year_dirs = collect_dirs_desc(&sessions_dir, |s| s.parse::<u32>().ok()).await?;
280    'outer: for (_year, year_path) in year_dirs {
281        let month_dirs = collect_dirs_desc(&year_path, |s| s.parse::<u8>().ok()).await?;
282        for (_month, month_path) in month_dirs {
283            let day_dirs = collect_dirs_desc(&month_path, |s| s.parse::<u8>().ok()).await?;
284            for (_day, day_path) in day_dirs {
285                let day_files = collect_rollout_files_sorted(&day_path).await?;
286                for path in day_files {
287                    if scanned_files >= MAX_SCAN_FILES_RECENT {
288                        break 'outer;
289                    }
290                    scanned_files += 1;
291
292                    let meta = match fs::metadata(&path).await {
293                        Ok(m) => m,
294                        Err(_) => continue,
295                    };
296                    let mtime_ms = meta
297                        .modified()
298                        .ok()
299                        .and_then(|t| t.duration_since(UNIX_EPOCH).ok())
300                        .map(|d| d.as_millis().min(u64::MAX as u128) as u64)
301                        .unwrap_or(0);
302                    if mtime_ms < threshold_ms {
303                        continue;
304                    }
305
306                    let header_opt = read_session_header(&path, &cwd).await?;
307                    let Some(header) = header_opt else {
308                        continue;
309                    };
310                    headers.push(header);
311                }
312            }
313        }
314    }
315
316    select_and_expand_headers(Vec::new(), headers, limit).await
317}
318
319#[cfg(feature = "gui")]
320pub async fn list_codex_session_day_dirs(limit: usize) -> Result<Vec<SessionDayDir>> {
321    if limit == 0 {
322        return Ok(Vec::new());
323    }
324    let root = codex_sessions_dir();
325    if !root.exists() {
326        return Ok(Vec::new());
327    }
328
329    let mut out: Vec<SessionDayDir> = Vec::new();
330    let year_dirs = collect_dirs_desc(&root, |s| s.parse::<u32>().ok()).await?;
331    'outer: for (year, year_path) in year_dirs {
332        let month_dirs = collect_dirs_desc(&year_path, |s| s.parse::<u8>().ok()).await?;
333        for (month, month_path) in month_dirs {
334            let day_dirs = collect_dirs_desc(&month_path, |s| s.parse::<u8>().ok()).await?;
335            for (day, day_path) in day_dirs {
336                out.push(SessionDayDir {
337                    date: format!("{year:04}-{month:02}-{day:02}"),
338                    path: day_path,
339                });
340                if out.len() >= limit {
341                    break 'outer;
342                }
343            }
344        }
345    }
346    Ok(out)
347}
348
349#[cfg(feature = "gui")]
350pub async fn list_codex_sessions_in_day_dir(
351    day_dir: &Path,
352    limit: usize,
353) -> Result<Vec<SessionIndexItem>> {
354    if limit == 0 {
355        return Ok(Vec::new());
356    }
357    if !day_dir.exists() {
358        return Ok(Vec::new());
359    }
360
361    let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
362    let day_files = collect_rollout_files_sorted(day_dir).await?;
363    let mut out: Vec<SessionIndexItem> = Vec::new();
364    for chunk in day_files.chunks(SESSION_IO_CONCURRENCY) {
365        let cwd = cwd.clone();
366        let mut stream = stream::iter(chunk.iter().cloned())
367            .map(move |path| {
368                let cwd = cwd.clone();
369                async move { read_session_index_item(path, cwd).await }
370            })
371            .buffer_unordered(SESSION_IO_CONCURRENCY);
372
373        while let Some(item) = stream.next().await {
374            if let Some(item) = item? {
375                out.push(item);
376            }
377        }
378        if out.len() >= limit {
379            break;
380        }
381    }
382
383    out.sort_by_key(|item| Reverse(item.mtime_ms));
384    out.truncate(limit);
385    Ok(out)
386}
387
388#[cfg(feature = "gui")]
389async fn read_session_index_item(path: PathBuf, cwd: PathBuf) -> Result<Option<SessionIndexItem>> {
390    let header_opt = read_session_header(&path, &cwd).await?;
391    let Some(mut header) = header_opt else {
392        return Ok(None);
393    };
394    header.updated_hint = read_last_timestamp_from_tail(&header.path)
395        .await?
396        .or_else(|| header.created_at.clone());
397    Ok(Some(SessionIndexItem {
398        id: header.id,
399        path: header.path,
400        cwd: header.cwd,
401        created_at: header.created_at,
402        updated_hint: header.updated_hint,
403        mtime_ms: header.mtime_ms,
404        first_user_message: Some(header.first_user_message),
405    }))
406}
407
408async fn find_recent_codex_sessions_in_dir(
409    sessions_dir: &Path,
410    since: Duration,
411    limit: usize,
412) -> Result<Vec<RecentSession>> {
413    if limit == 0 {
414        return Ok(Vec::new());
415    }
416    if since.is_zero() {
417        return Ok(Vec::new());
418    }
419    if !sessions_dir.exists() {
420        return Ok(Vec::new());
421    }
422
423    let now_ms = SystemTime::now()
424        .duration_since(UNIX_EPOCH)
425        .unwrap_or_default()
426        .as_millis()
427        .min(u64::MAX as u128) as u64;
428    let since_ms = since.as_millis().min(u64::MAX as u128) as u64;
429    let threshold_ms = now_ms.saturating_sub(since_ms);
430
431    let mut out: Vec<RecentSession> = Vec::new();
432    let mut scanned_files: usize = 0;
433
434    let year_dirs = collect_dirs_desc(sessions_dir, |s| s.parse::<u32>().ok()).await?;
435    'outer: for (_year, year_path) in year_dirs {
436        let month_dirs = collect_dirs_desc(&year_path, |s| s.parse::<u8>().ok()).await?;
437        for (_month, month_path) in month_dirs {
438            let day_dirs = collect_dirs_desc(&month_path, |s| s.parse::<u8>().ok()).await?;
439            for (_day, day_path) in day_dirs {
440                let day_files = collect_rollout_files_sorted(&day_path).await?;
441                for path in day_files {
442                    if scanned_files >= MAX_SCAN_FILES_RECENT {
443                        break 'outer;
444                    }
445                    scanned_files += 1;
446
447                    let meta = match fs::metadata(&path).await {
448                        Ok(m) => m,
449                        Err(_) => continue,
450                    };
451                    let mtime_ms = meta
452                        .modified()
453                        .ok()
454                        .and_then(|t| t.duration_since(UNIX_EPOCH).ok())
455                        .map(|d| d.as_millis().min(u64::MAX as u128) as u64)
456                        .unwrap_or(0);
457                    if mtime_ms < threshold_ms {
458                        continue;
459                    }
460
461                    let file_id = path
462                        .file_name()
463                        .and_then(|s| s.to_str())
464                        .and_then(parse_timestamp_and_uuid)
465                        .map(|(_, uuid)| uuid);
466
467                    let meta = read_codex_session_meta(&path).await?;
468                    let (id, cwd) = if let Some(meta) = meta {
469                        (meta.id, meta.cwd)
470                    } else if let Some(id) = file_id {
471                        (id, None)
472                    } else {
473                        continue;
474                    };
475
476                    out.push(RecentSession { id, cwd, mtime_ms });
477                }
478            }
479        }
480    }
481
482    out.sort_by_key(|item| Reverse((item.mtime_ms, item.id.clone())));
483    out.truncate(limit);
484    Ok(out)
485}
486
487/// Find a Codex session's cwd by its session id (UUID suffix in rollout filename).
488///
489/// This is best-effort and scans session files from newest to oldest until it finds a match.
490pub async fn find_codex_session_cwd_by_id(session_id: &str) -> Result<Option<String>> {
491    let root = codex_sessions_dir();
492    if !root.exists() {
493        return Ok(None);
494    }
495
496    let year_dirs = collect_dirs_desc(&root, |s| s.parse::<u32>().ok()).await?;
497    for (_year, year_path) in year_dirs {
498        let month_dirs = collect_dirs_desc(&year_path, |s| s.parse::<u8>().ok()).await?;
499        for (_month, month_path) in month_dirs {
500            let day_dirs = collect_dirs_desc(&month_path, |s| s.parse::<u8>().ok()).await?;
501            for (_day, day_path) in day_dirs {
502                let day_files = collect_rollout_files_sorted(&day_path).await?;
503                for path in day_files {
504                    let Some(name) = path.file_name().and_then(|s| s.to_str()) else {
505                        continue;
506                    };
507                    let Some((_ts, uuid)) = parse_timestamp_and_uuid(name) else {
508                        continue;
509                    };
510                    if uuid != session_id {
511                        continue;
512                    }
513
514                    let file = fs::File::open(&path)
515                        .await
516                        .with_context(|| format!("failed to open session file {:?}", path))?;
517                    let reader = BufReader::new(file);
518                    let mut lines = reader.lines();
519                    while let Some(line) = lines.next_line().await? {
520                        let line = line.trim();
521                        if line.is_empty() {
522                            continue;
523                        }
524                        let value: Value = match serde_json::from_str(line) {
525                            Ok(v) => v,
526                            Err(_) => continue,
527                        };
528                        if let Some(meta) = parse_session_meta(&value) {
529                            return Ok(meta.cwd);
530                        }
531                    }
532
533                    return Ok(None);
534                }
535            }
536        }
537    }
538
539    Ok(None)
540}
541
542/// Best-effort: locate a Codex session JSONL file by session id.
543///
544/// We first try to match the UUID suffix in the `rollout-...-<uuid>.jsonl` filename (fast path),
545/// then fall back to scanning session_meta records to match `payload.id`.
546pub async fn find_codex_session_file_by_id(session_id: &str) -> Result<Option<PathBuf>> {
547    Ok(find_codex_session_files_by_ids(&[session_id.to_string()])
548        .await?
549        .remove(session_id))
550}
551
552pub async fn find_codex_session_files_by_ids(
553    session_ids: &[String],
554) -> Result<HashMap<String, PathBuf>> {
555    find_codex_session_files_by_ids_in_dir(&codex_sessions_dir(), session_ids).await
556}
557
558async fn find_codex_session_files_by_ids_in_dir(
559    root: &Path,
560    session_ids: &[String],
561) -> Result<HashMap<String, PathBuf>> {
562    if !root.exists() || session_ids.is_empty() {
563        return Ok(HashMap::new());
564    }
565
566    let mut remaining = session_ids
567        .iter()
568        .map(|sid| sid.trim())
569        .filter(|sid| !sid.is_empty())
570        .map(ToOwned::to_owned)
571        .collect::<std::collections::HashSet<_>>();
572    if remaining.is_empty() {
573        return Ok(HashMap::new());
574    }
575
576    let mut found = HashMap::new();
577    let mut scanned_files: usize = 0;
578    let year_dirs = collect_dirs_desc(root, |s| s.parse::<u32>().ok()).await?;
579
580    'outer: for (_year, year_path) in year_dirs {
581        let month_dirs = collect_dirs_desc(&year_path, |s| s.parse::<u8>().ok()).await?;
582        for (_month, month_path) in month_dirs {
583            let day_dirs = collect_dirs_desc(&month_path, |s| s.parse::<u8>().ok()).await?;
584            for (_day, day_path) in day_dirs {
585                let day_files = collect_rollout_files_sorted(&day_path).await?;
586                for path in day_files {
587                    if scanned_files >= MAX_SCAN_FILES || remaining.is_empty() {
588                        break 'outer;
589                    }
590                    scanned_files += 1;
591
592                    if let Some(name) = path.file_name().and_then(|s| s.to_str())
593                        && let Some((_ts, uuid)) = parse_timestamp_and_uuid(name)
594                        && remaining.remove(&uuid)
595                    {
596                        found.insert(uuid.to_string(), path.clone());
597                        if remaining.is_empty() {
598                            break 'outer;
599                        }
600                        continue;
601                    }
602
603                    if let Some(meta) = read_codex_session_meta(&path).await?
604                        && remaining.remove(meta.id.as_str())
605                    {
606                        found.insert(meta.id, path);
607                        if remaining.is_empty() {
608                            break 'outer;
609                        }
610                    }
611                }
612            }
613        }
614    }
615
616    Ok(found)
617}
618
619/// Read the `session_meta` record from a Codex session JSONL file (best-effort).
620pub async fn read_codex_session_meta(path: &Path) -> Result<Option<SessionMeta>> {
621    let file = fs::File::open(path)
622        .await
623        .with_context(|| format!("failed to open session file {:?}", path))?;
624    let reader = BufReader::new(file);
625    let mut lines = reader.lines();
626
627    let mut lines_scanned = 0usize;
628    while let Some(line) = lines.next_line().await? {
629        let trimmed = line.trim();
630        if trimmed.is_empty() {
631            continue;
632        }
633        lines_scanned += 1;
634        if lines_scanned > HEAD_SCAN_LINES {
635            break;
636        }
637
638        let value: Value = match serde_json::from_str(trimmed) {
639            Ok(v) => v,
640            Err(_) => continue,
641        };
642
643        if let Some(meta) = parse_session_meta(&value) {
644            return Ok(Some(SessionMeta {
645                id: meta.id,
646                cwd: meta.cwd,
647                created_at: meta.created_at,
648            }));
649        }
650    }
651
652    Ok(None)
653}
654
655#[cfg(test)]
656async fn summarize_session_for_current_dir(
657    path: &Path,
658    cwd: &Path,
659) -> Result<Option<SessionSummary>> {
660    let header_opt = read_session_header(path, cwd).await?;
661    let Some(header) = header_opt else {
662        return Ok(None);
663    };
664    Ok(Some(expand_header_to_summary_uncached(header).await?))
665}
666
667struct SessionMetaInfo {
668    id: String,
669    cwd: Option<String>,
670    created_at: Option<String>,
671}
672
673#[derive(Debug, Clone)]
674struct SessionHeader {
675    id: String,
676    path: PathBuf,
677    cwd: Option<String>,
678    created_at: Option<String>,
679    /// File modified time in milliseconds since epoch (used for cheap recency sorting).
680    mtime_ms: u64,
681    /// Best-effort: timestamp of the most recent JSONL record (from the file tail; only computed for displayed rows).
682    updated_hint: Option<String>,
683    first_user_message: String,
684    is_cwd_match: bool,
685}
686
687fn parse_session_meta(value: &Value) -> Option<SessionMetaInfo> {
688    let obj = value.as_object()?;
689    let type_str = obj.get("type")?.as_str()?;
690    if type_str != "session_meta" {
691        return None;
692    }
693
694    let payload = obj.get("payload")?.as_object()?;
695    let id = payload.get("id").and_then(|v| v.as_str())?.to_string();
696    let cwd = payload
697        .get("cwd")
698        .and_then(|v| v.as_str())
699        .map(|s| s.to_string());
700    let created_at = payload
701        .get("timestamp")
702        .and_then(|v| v.as_str())
703        .map(|s| s.to_string())
704        .or_else(|| {
705            obj.get("timestamp")
706                .and_then(|v| v.as_str())
707                .map(|s| s.to_string())
708        });
709
710    Some(SessionMetaInfo {
711        id,
712        cwd,
713        created_at,
714    })
715}
716
717fn user_message_text(value: &Value) -> Option<&str> {
718    let obj = value.as_object()?;
719    let type_str = obj.get("type")?.as_str()?;
720    if type_str != "event_msg" {
721        return None;
722    }
723    let payload = obj.get("payload")?.as_object()?;
724    let payload_type = payload.get("type")?.as_str()?;
725    if payload_type != "user_message" {
726        return None;
727    }
728    payload.get("message").and_then(|v| v.as_str())
729}
730
731fn contains_bytes(haystack: &[u8], needle: &[u8]) -> bool {
732    if needle.is_empty() {
733        return true;
734    }
735    if haystack.len() < needle.len() {
736        return false;
737    }
738    haystack.windows(needle.len()).any(|w| w == needle)
739}
740
741async fn read_session_header(path: &Path, cwd: &Path) -> Result<Option<SessionHeader>> {
742    let meta = fs::metadata(path)
743        .await
744        .with_context(|| format!("failed to stat session file {:?}", path))?;
745    let mtime_ms = meta
746        .modified()
747        .ok()
748        .and_then(|t| t.duration_since(UNIX_EPOCH).ok())
749        .map(|d| d.as_millis() as u64)
750        .unwrap_or(0);
751
752    let file = fs::File::open(path)
753        .await
754        .with_context(|| format!("failed to open session file {:?}", path))?;
755    let reader = BufReader::new(file);
756    let mut lines = reader.lines();
757
758    let mut session_id: Option<String> = None;
759    let mut cwd_str: Option<String> = None;
760    let mut created_at: Option<String> = None;
761    let mut first_user_message: Option<String> = None;
762
763    let mut lines_scanned = 0usize;
764    while let Some(line) = lines.next_line().await? {
765        let trimmed = line.trim();
766        if trimmed.is_empty() {
767            continue;
768        }
769        lines_scanned += 1;
770        if lines_scanned > HEAD_SCAN_LINES {
771            break;
772        }
773        let value: Value = match serde_json::from_str(trimmed) {
774            Ok(v) => v,
775            Err(_) => continue,
776        };
777
778        if session_id.is_none()
779            && let Some(meta) = parse_session_meta(&value)
780        {
781            session_id = Some(meta.id);
782            cwd_str = meta.cwd;
783            created_at = meta.created_at;
784        }
785
786        if first_user_message.is_none()
787            && let Some(msg) = user_message_text(&value)
788        {
789            first_user_message = Some(msg.to_string());
790        }
791
792        if session_id.is_some() && first_user_message.is_some() {
793            break;
794        }
795    }
796
797    let Some(id) = session_id else {
798        return Ok(None);
799    };
800    let Some(first_user_message) = first_user_message else {
801        return Ok(None);
802    };
803
804    let cwd_value = cwd_str.clone();
805    let is_cwd_match = cwd_value
806        .as_deref()
807        .map(|s| path_matches_current_dir(s, cwd))
808        .unwrap_or(false);
809
810    Ok(Some(SessionHeader {
811        id,
812        path: path.to_path_buf(),
813        cwd: cwd_value,
814        created_at,
815        mtime_ms,
816        updated_hint: None,
817        first_user_message,
818        is_cwd_match,
819    }))
820}
821
822async fn select_and_expand_headers(
823    matched: Vec<SessionHeader>,
824    others: Vec<SessionHeader>,
825    limit: usize,
826) -> Result<Vec<SessionSummary>> {
827    if limit == 0 {
828        return Ok(Vec::new());
829    }
830
831    let mut chosen = if !matched.is_empty() { matched } else { others };
832    // Use file mtime for cheap recency ordering; this correctly surfaces sessions that were resumed
833    // (older filename timestamp but recently appended to).
834    chosen.sort_by_key(|item| Reverse(item.mtime_ms));
835    if chosen.len() > limit {
836        chosen.truncate(limit);
837    }
838
839    let cache = Arc::new(Mutex::new(SessionStatsCache::load_default().await));
840    let mut out: Vec<SessionSummary> = Vec::with_capacity(chosen.len().min(limit));
841    let mut stream = stream::iter(chosen)
842        .map(|header| {
843            let cache = Arc::clone(&cache);
844            async move { expand_header_to_summary_cached(cache, header).await }
845        })
846        .buffer_unordered(SESSION_IO_CONCURRENCY);
847
848    while let Some(summary) = stream.next().await {
849        out.push(summary?);
850    }
851
852    drop(stream);
853    let mut cache = Arc::try_unwrap(cache)
854        .map_err(|_| anyhow!("session stats cache still has active workers"))?
855        .into_inner()
856        .map_err(|_| anyhow!("session stats cache lock poisoned"))?;
857    cache.save_if_dirty().await?;
858
859    sort_by_updated_desc(&mut out);
860    out.truncate(limit);
861    Ok(out)
862}
863
864fn build_summary_from_stats(
865    header: SessionHeader,
866    user_turns: usize,
867    assistant_turns: usize,
868    last_response_at: Option<String>,
869) -> SessionSummary {
870    let rounds = user_turns.min(assistant_turns);
871    let updated_at = last_response_at
872        .clone()
873        .or_else(|| header.updated_hint.clone())
874        .or_else(|| header.created_at.clone());
875
876    SessionSummary {
877        id: header.id,
878        path: header.path,
879        cwd: header.cwd,
880        created_at: header.created_at,
881        updated_at,
882        last_response_at,
883        user_turns,
884        assistant_turns,
885        rounds,
886        first_user_message: Some(header.first_user_message),
887        source: SessionSummarySource::LocalFile,
888        sort_hint_ms: None,
889    }
890}
891
892async fn expand_header_to_summary_cached(
893    cache: Arc<Mutex<SessionStatsCache>>,
894    mut header: SessionHeader,
895) -> Result<SessionSummary> {
896    let path = header.path.clone();
897    let key = path.to_string_lossy().to_string();
898    let meta = fs::metadata(&path)
899        .await
900        .with_context(|| format!("failed to stat session file {:?}", path))?;
901    let size = meta.len();
902    let mtime_ms = meta
903        .modified()
904        .ok()
905        .and_then(|t| t.duration_since(UNIX_EPOCH).ok())
906        .map(|d| d.as_millis() as u64)
907        .unwrap_or(0);
908
909    let cached = {
910        let cache = cache
911            .lock()
912            .map_err(|_| anyhow!("session stats cache lock poisoned"))?;
913        cache.lookup(&key, mtime_ms, size)
914    };
915
916    let stats = if let Some(stats) = cached {
917        if stats.last_response_at.is_none() && header.updated_hint.is_none() {
918            header.updated_hint = read_last_timestamp_from_tail(&path)
919                .await?
920                .or_else(|| header.created_at.clone());
921        }
922        stats
923    } else {
924        let (counts, tail) = tokio::join!(
925            count_turns_in_file(&path),
926            read_tail_timestamps(&path, true)
927        );
928        let (user_turns, assistant_turns) = counts?;
929        let tail = tail?;
930        header.updated_hint = tail.last_record_at.or_else(|| header.created_at.clone());
931
932        let stats = SessionStatsSnapshot {
933            user_turns,
934            assistant_turns,
935            last_response_at: tail.last_assistant_at,
936        };
937        {
938            let mut cache = cache
939                .lock()
940                .map_err(|_| anyhow!("session stats cache lock poisoned"))?;
941            cache.insert(key, mtime_ms, size, &stats);
942        }
943        stats
944    };
945
946    Ok(build_summary_from_stats(
947        header,
948        stats.user_turns,
949        stats.assistant_turns,
950        stats.last_response_at,
951    ))
952}
953
954#[cfg(test)]
955async fn expand_header_to_summary_uncached(header: SessionHeader) -> Result<SessionSummary> {
956    let (user_turns, assistant_turns) = count_turns_in_file(&header.path).await?;
957    let last_response_at = read_last_assistant_timestamp_from_tail(&header.path).await?;
958    Ok(build_summary_from_stats(
959        header,
960        user_turns,
961        assistant_turns,
962        last_response_at,
963    ))
964}
965
966async fn count_turns_in_file(path: &Path) -> Result<(usize, usize)> {
967    const USER_TURN_NEEDLE: &[u8] = br#""payload":{"type":"user_message""#;
968    const ASSISTANT_TURN_NEEDLE: &[u8] = br#""role":"assistant""#;
969
970    let mut file = fs::File::open(path)
971        .await
972        .with_context(|| format!("failed to open session file {:?}", path))?;
973
974    let mut buf = vec![0u8; IO_CHUNK_SIZE];
975    let mut user_carry: Vec<u8> = Vec::new();
976    let mut assistant_carry: Vec<u8> = Vec::new();
977    let mut user_total = 0usize;
978    let mut assistant_total = 0usize;
979    let mut user_window: Vec<u8> = Vec::with_capacity(IO_CHUNK_SIZE + USER_TURN_NEEDLE.len());
980    let mut assistant_window: Vec<u8> =
981        Vec::with_capacity(IO_CHUNK_SIZE + ASSISTANT_TURN_NEEDLE.len());
982
983    loop {
984        let n = file.read(&mut buf).await?;
985        if n == 0 {
986            break;
987        }
988
989        user_window.clear();
990        user_window.extend_from_slice(&user_carry);
991        user_window.extend_from_slice(&buf[..n]);
992        user_total = user_total.saturating_add(count_subslice(&user_window, USER_TURN_NEEDLE));
993
994        assistant_window.clear();
995        assistant_window.extend_from_slice(&assistant_carry);
996        assistant_window.extend_from_slice(&buf[..n]);
997        assistant_total = assistant_total
998            .saturating_add(count_subslice(&assistant_window, ASSISTANT_TURN_NEEDLE));
999
1000        let user_keep = USER_TURN_NEEDLE.len().saturating_sub(1);
1001        user_carry = if user_keep > 0 && user_window.len() >= user_keep {
1002            user_window[user_window.len() - user_keep..].to_vec()
1003        } else {
1004            Vec::new()
1005        };
1006
1007        let assistant_keep = ASSISTANT_TURN_NEEDLE.len().saturating_sub(1);
1008        assistant_carry = if assistant_keep > 0 && assistant_window.len() >= assistant_keep {
1009            assistant_window[assistant_window.len() - assistant_keep..].to_vec()
1010        } else {
1011            Vec::new()
1012        };
1013    }
1014
1015    Ok((user_total, assistant_total))
1016}
1017
1018fn count_subslice(haystack: &[u8], needle: &[u8]) -> usize {
1019    if needle.is_empty() {
1020        return 0;
1021    }
1022    if haystack.len() < needle.len() {
1023        return 0;
1024    }
1025    haystack
1026        .windows(needle.len())
1027        .filter(|w| *w == needle)
1028        .count()
1029}
1030
1031#[derive(Debug, Default)]
1032struct TailTimestamps {
1033    last_record_at: Option<String>,
1034    last_assistant_at: Option<String>,
1035}
1036
1037async fn read_last_timestamp_from_tail(path: &Path) -> Result<Option<String>> {
1038    Ok(read_tail_timestamps(path, false).await?.last_record_at)
1039}
1040
1041#[cfg(test)]
1042async fn read_last_assistant_timestamp_from_tail(path: &Path) -> Result<Option<String>> {
1043    Ok(read_tail_timestamps(path, true).await?.last_assistant_at)
1044}
1045
1046async fn read_tail_timestamps(path: &Path, include_assistant: bool) -> Result<TailTimestamps> {
1047    const ASSISTANT_ROLE_NEEDLE: &[u8] = br#""role":"assistant""#;
1048
1049    let mut file = fs::File::open(path)
1050        .await
1051        .with_context(|| format!("failed to open session file {:?}", path))?;
1052    let meta = file
1053        .metadata()
1054        .await
1055        .with_context(|| format!("failed to stat session file {:?}", path))?;
1056    let mut pos = meta.len();
1057    if pos == 0 {
1058        return Ok(TailTimestamps::default());
1059    }
1060
1061    let mut scanned = 0usize;
1062    let mut carry: Vec<u8> = Vec::new();
1063    let chunk_size = IO_CHUNK_SIZE as u64;
1064    let mut found = TailTimestamps::default();
1065
1066    while pos > 0 && scanned < TAIL_SCAN_MAX_BYTES {
1067        let start = pos.saturating_sub(chunk_size);
1068        let size = (pos - start) as usize;
1069        file.seek(std::io::SeekFrom::Start(start)).await?;
1070
1071        let mut chunk = vec![0u8; size];
1072        file.read_exact(&mut chunk).await?;
1073        scanned = scanned.saturating_add(size);
1074
1075        if !carry.is_empty() {
1076            chunk.extend_from_slice(&carry);
1077        }
1078
1079        // Iterate lines from the end.
1080        let mut end = chunk.len();
1081        while end > 0 {
1082            let mut begin = end;
1083            while begin > 0 && chunk[begin - 1] != b'\n' {
1084                begin -= 1;
1085            }
1086            let line = chunk[begin..end].trim_ascii();
1087            end = begin.saturating_sub(1);
1088
1089            if line.is_empty() {
1090                continue;
1091            }
1092
1093            let wants_record = found.last_record_at.is_none();
1094            let wants_assistant = include_assistant
1095                && found.last_assistant_at.is_none()
1096                && contains_bytes(line, ASSISTANT_ROLE_NEEDLE);
1097            if !wants_record && !wants_assistant {
1098                continue;
1099            }
1100
1101            let value: Value = match serde_json::from_slice(line) {
1102                Ok(v) => v,
1103                Err(_) => continue,
1104            };
1105            if let Some(ts) = value.get("timestamp").and_then(|v| v.as_str()) {
1106                let ts = ts.to_string();
1107                if wants_record {
1108                    found.last_record_at = Some(ts.clone());
1109                }
1110                if wants_assistant {
1111                    found.last_assistant_at = Some(ts);
1112                }
1113                if found.last_record_at.is_some()
1114                    && (!include_assistant || found.last_assistant_at.is_some())
1115                {
1116                    return Ok(found);
1117                }
1118            }
1119        }
1120
1121        // Keep the partial first line for the next iteration.
1122        if let Some(first_nl) = chunk.iter().position(|b| *b == b'\n') {
1123            carry = chunk[..first_nl].to_vec();
1124        } else {
1125            carry = chunk;
1126        }
1127
1128        pos = start;
1129    }
1130
1131    Ok(found)
1132}
1133
1134fn path_matches_current_dir(session_cwd: &str, current_dir: &Path) -> bool {
1135    let session_path = PathBuf::from(session_cwd);
1136    if !session_path.is_absolute() {
1137        return false;
1138    }
1139
1140    let current = std::fs::canonicalize(current_dir).unwrap_or_else(|_| current_dir.to_path_buf());
1141    let cwd = std::fs::canonicalize(&session_path).unwrap_or(session_path);
1142
1143    current == cwd || current.starts_with(&cwd) || cwd.starts_with(&current)
1144}
1145
1146async fn collect_dirs_desc<T, F>(parent: &Path, parse: F) -> std::io::Result<Vec<(T, PathBuf)>>
1147where
1148    T: Ord + Copy,
1149    F: Fn(&str) -> Option<T>,
1150{
1151    let mut dir = fs::read_dir(parent).await?;
1152    let mut vec: Vec<(T, PathBuf)> = Vec::new();
1153    while let Some(entry) = dir.next_entry().await? {
1154        if entry
1155            .file_type()
1156            .await
1157            .map(|ft| ft.is_dir())
1158            .unwrap_or(false)
1159            && let Some(s) = entry.file_name().to_str()
1160            && let Some(v) = parse(s)
1161        {
1162            vec.push((v, entry.path()));
1163        }
1164    }
1165    vec.sort_by_key(|(v, _)| Reverse(*v));
1166    Ok(vec)
1167}
1168
1169async fn collect_rollout_files_sorted(parent: &Path) -> std::io::Result<Vec<PathBuf>> {
1170    let mut dir = fs::read_dir(parent).await?;
1171    let mut records: Vec<(String, String, PathBuf)> = Vec::new();
1172
1173    while let Some(entry) = dir.next_entry().await? {
1174        if entry
1175            .file_type()
1176            .await
1177            .map(|ft| ft.is_file())
1178            .unwrap_or(false)
1179        {
1180            let name_os = entry.file_name();
1181            let Some(name) = name_os.to_str() else {
1182                continue;
1183            };
1184            if !name.starts_with("rollout-") || !name.ends_with(".jsonl") {
1185                continue;
1186            }
1187            if let Some((ts, uuid)) = parse_timestamp_and_uuid(name) {
1188                records.push((ts, uuid, entry.path()));
1189            }
1190        }
1191    }
1192
1193    records.sort_by(|a, b| {
1194        // Sort by timestamp desc, then UUID desc.
1195        match b.0.cmp(&a.0) {
1196            Ordering::Equal => b.1.cmp(&a.1),
1197            other => other,
1198        }
1199    });
1200
1201    Ok(records.into_iter().map(|(_, _, path)| path).collect())
1202}
1203
1204fn parse_timestamp_and_uuid(name: &str) -> Option<(String, String)> {
1205    // Expected: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl
1206    let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
1207
1208    // Timestamp format is stable and has a fixed width: "YYYY-MM-DDThh-mm-ss" (19 chars).
1209    const TS_LEN: usize = 19;
1210    if core.len() <= TS_LEN + 1 {
1211        return None;
1212    }
1213    let (ts, rest) = core.split_at(TS_LEN);
1214    let uuid = rest.strip_prefix('-')?;
1215    if uuid.is_empty() {
1216        return None;
1217    }
1218    Some((ts.to_string(), uuid.to_string()))
1219}
1220
1221fn sort_by_updated_desc(vec: &mut [SessionSummary]) {
1222    vec.sort_by(|a, b| {
1223        let ta = a.updated_at.as_deref();
1224        let tb = b.updated_at.as_deref();
1225        match (ta, tb) {
1226            (Some(ta), Some(tb)) => tb.cmp(ta),
1227            (Some(_), None) => Ordering::Less,
1228            (None, Some(_)) => Ordering::Greater,
1229            (None, None) => Ordering::Equal,
1230        }
1231    });
1232}
1233
1234#[cfg(test)]
1235mod tests;