1use std::cmp::{Ordering, Reverse};
2use std::collections::HashMap;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use anyhow::{Context, Result, anyhow};
8use futures_util::StreamExt;
9use futures_util::stream;
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use tokio::fs;
13use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, BufReader};
14
15use crate::config::codex_sessions_dir;
16use crate::file_replace::write_bytes_file_async;
17
18mod stats_cache;
19mod transcript;
20
21use stats_cache::{SessionStatsCache, SessionStatsSnapshot};
22pub use transcript::{codex_session_transcript_tail_contains_query, read_codex_session_transcript};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
26pub enum SessionSummarySource {
27 #[default]
28 LocalFile,
29 ObservedOnly,
30}
31
32#[derive(Debug, Clone)]
33pub struct SessionSummary {
34 pub id: String,
35 pub path: PathBuf,
36 pub cwd: Option<String>,
37 pub created_at: Option<String>,
38 pub updated_at: Option<String>,
39 pub last_response_at: Option<String>,
41 pub user_turns: usize,
43 pub assistant_turns: usize,
45 pub rounds: usize,
47 pub first_user_message: Option<String>,
48 pub source: SessionSummarySource,
49 pub sort_hint_ms: Option<u64>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct SessionMeta {
55 pub id: String,
56 pub cwd: Option<String>,
57 pub created_at: Option<String>,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct SessionTranscriptMessage {
63 pub timestamp: Option<String>,
64 pub role: String,
65 pub text: String,
66}
67
68#[derive(Debug, Clone)]
70pub struct RecentSession {
71 pub id: String,
72 pub cwd: Option<String>,
73 pub mtime_ms: u64,
74}
75
76#[cfg(feature = "gui")]
77#[derive(Debug, Clone)]
78pub struct SessionDayDir {
79 pub date: String,
80 pub path: PathBuf,
81}
82
83#[cfg(feature = "gui")]
84#[derive(Debug, Clone)]
85pub struct SessionIndexItem {
86 pub id: String,
87 pub path: PathBuf,
88 pub cwd: Option<String>,
89 pub created_at: Option<String>,
90 pub updated_hint: Option<String>,
91 pub mtime_ms: u64,
92 pub first_user_message: Option<String>,
93}
94
95pub fn infer_project_root_from_cwd(cwd: &str) -> String {
96 let path = std::path::PathBuf::from(cwd);
97 if !path.is_absolute() {
98 return cwd.to_string();
99 }
100
101 let canonical = std::fs::canonicalize(&path).unwrap_or(path);
102 let mut cur = canonical.clone();
103 loop {
104 if cur.join(".git").exists() {
105 return cur.to_string_lossy().to_string();
106 }
107 if !cur.pop() {
108 break;
109 }
110 }
111 canonical.to_string_lossy().to_string()
112}
113
114const MAX_SCAN_FILES: usize = 10_000;
115const HEAD_SCAN_LINES: usize = 512;
116const IO_CHUNK_SIZE: usize = 64 * 1024;
117const TAIL_SCAN_MAX_BYTES: usize = 1024 * 1024;
118const SESSION_IO_CONCURRENCY: usize = 8;
119
120const MAX_SCAN_FILES_RECENT: usize = 200_000;
121
122pub async fn find_codex_sessions_for_dir(
125 root_dir: &Path,
126 limit: usize,
127) -> Result<Vec<SessionSummary>> {
128 let root = codex_sessions_dir();
129 if !root.exists() {
130 return Ok(Vec::new());
131 }
132
133 let mut matched: Vec<SessionHeader> = Vec::new();
134 let mut others: Vec<SessionHeader> = Vec::new();
135 let mut scanned_files: usize = 0;
136
137 let year_dirs = collect_dirs_desc(&root, |s| s.parse::<u32>().ok()).await?;
138
139 'outer: for (_year, year_path) in year_dirs {
140 let month_dirs = collect_dirs_desc(&year_path, |s| s.parse::<u8>().ok()).await?;
141 for (_month, month_path) in month_dirs {
142 let day_dirs = collect_dirs_desc(&month_path, |s| s.parse::<u8>().ok()).await?;
143 for (_day, day_path) in day_dirs {
144 let day_files = collect_rollout_files_sorted(&day_path).await?;
145 for path in day_files {
146 if scanned_files >= MAX_SCAN_FILES {
147 break 'outer;
148 }
149 scanned_files += 1;
150
151 let header_opt = read_session_header(&path, root_dir).await?;
152 let Some(header) = header_opt else {
153 continue;
154 };
155
156 if header.is_cwd_match {
157 matched.push(header);
158 } else {
159 others.push(header);
160 }
161 }
162 }
163 }
164 }
165
166 select_and_expand_headers(matched, others, limit).await
167}
168
169pub async fn search_codex_sessions_for_dir(
172 root_dir: &Path,
173 query: &str,
174 limit: usize,
175) -> Result<Vec<SessionSummary>> {
176 let needle = query.to_lowercase();
177
178 let root = codex_sessions_dir();
179 if !root.exists() {
180 return Ok(Vec::new());
181 }
182
183 let mut matched: Vec<SessionHeader> = Vec::new();
184 let mut others: Vec<SessionHeader> = Vec::new();
185 let mut scanned_files: usize = 0;
186
187 let year_dirs = collect_dirs_desc(&root, |s| s.parse::<u32>().ok()).await?;
188
189 'outer: for (_year, year_path) in year_dirs {
190 let month_dirs = collect_dirs_desc(&year_path, |s| s.parse::<u8>().ok()).await?;
191 for (_month, month_path) in month_dirs {
192 let day_dirs = collect_dirs_desc(&month_path, |s| s.parse::<u8>().ok()).await?;
193 for (_day, day_path) in day_dirs {
194 let day_files = collect_rollout_files_sorted(&day_path).await?;
195 for path in day_files {
196 if scanned_files >= MAX_SCAN_FILES {
197 break 'outer;
198 }
199 scanned_files += 1;
200
201 let header_opt = read_session_header(&path, root_dir).await?;
202 let Some(header) = header_opt else {
203 continue;
204 };
205 if !header
206 .first_user_message
207 .to_lowercase()
208 .contains(needle.as_str())
209 {
210 continue;
211 }
212
213 if header.is_cwd_match {
214 matched.push(header);
215 } else {
216 others.push(header);
217 }
218 }
219 }
220 }
221 }
222
223 select_and_expand_headers(matched, others, limit).await
224}
225
226pub async fn find_codex_sessions_for_current_dir(limit: usize) -> Result<Vec<SessionSummary>> {
228 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
229 find_codex_sessions_for_dir(&cwd, limit).await
230}
231
232pub async fn search_codex_sessions_for_current_dir(
234 query: &str,
235 limit: usize,
236) -> Result<Vec<SessionSummary>> {
237 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
238 search_codex_sessions_for_dir(&cwd, query, limit).await
239}
240
241pub async fn find_recent_codex_sessions(
246 since: Duration,
247 limit: usize,
248) -> Result<Vec<RecentSession>> {
249 let root = codex_sessions_dir();
250 find_recent_codex_sessions_in_dir(&root, since, limit).await
251}
252
253#[cfg(feature = "gui")]
254pub async fn find_recent_codex_session_summaries(
255 since: Duration,
256 limit: usize,
257) -> Result<Vec<SessionSummary>> {
258 if limit == 0 {
259 return Ok(Vec::new());
260 }
261 let sessions_dir = codex_sessions_dir();
262 if !sessions_dir.exists() {
263 return Ok(Vec::new());
264 }
265
266 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
267
268 let now_ms = SystemTime::now()
269 .duration_since(UNIX_EPOCH)
270 .unwrap_or_default()
271 .as_millis()
272 .min(u64::MAX as u128) as u64;
273 let since_ms = since.as_millis().min(u64::MAX as u128) as u64;
274 let threshold_ms = now_ms.saturating_sub(since_ms);
275
276 let mut headers: Vec<SessionHeader> = Vec::new();
277 let mut scanned_files: usize = 0;
278
279 let year_dirs = collect_dirs_desc(&sessions_dir, |s| s.parse::<u32>().ok()).await?;
280 'outer: for (_year, year_path) in year_dirs {
281 let month_dirs = collect_dirs_desc(&year_path, |s| s.parse::<u8>().ok()).await?;
282 for (_month, month_path) in month_dirs {
283 let day_dirs = collect_dirs_desc(&month_path, |s| s.parse::<u8>().ok()).await?;
284 for (_day, day_path) in day_dirs {
285 let day_files = collect_rollout_files_sorted(&day_path).await?;
286 for path in day_files {
287 if scanned_files >= MAX_SCAN_FILES_RECENT {
288 break 'outer;
289 }
290 scanned_files += 1;
291
292 let meta = match fs::metadata(&path).await {
293 Ok(m) => m,
294 Err(_) => continue,
295 };
296 let mtime_ms = meta
297 .modified()
298 .ok()
299 .and_then(|t| t.duration_since(UNIX_EPOCH).ok())
300 .map(|d| d.as_millis().min(u64::MAX as u128) as u64)
301 .unwrap_or(0);
302 if mtime_ms < threshold_ms {
303 continue;
304 }
305
306 let header_opt = read_session_header(&path, &cwd).await?;
307 let Some(header) = header_opt else {
308 continue;
309 };
310 headers.push(header);
311 }
312 }
313 }
314 }
315
316 select_and_expand_headers(Vec::new(), headers, limit).await
317}
318
319#[cfg(feature = "gui")]
320pub async fn list_codex_session_day_dirs(limit: usize) -> Result<Vec<SessionDayDir>> {
321 if limit == 0 {
322 return Ok(Vec::new());
323 }
324 let root = codex_sessions_dir();
325 if !root.exists() {
326 return Ok(Vec::new());
327 }
328
329 let mut out: Vec<SessionDayDir> = Vec::new();
330 let year_dirs = collect_dirs_desc(&root, |s| s.parse::<u32>().ok()).await?;
331 'outer: for (year, year_path) in year_dirs {
332 let month_dirs = collect_dirs_desc(&year_path, |s| s.parse::<u8>().ok()).await?;
333 for (month, month_path) in month_dirs {
334 let day_dirs = collect_dirs_desc(&month_path, |s| s.parse::<u8>().ok()).await?;
335 for (day, day_path) in day_dirs {
336 out.push(SessionDayDir {
337 date: format!("{year:04}-{month:02}-{day:02}"),
338 path: day_path,
339 });
340 if out.len() >= limit {
341 break 'outer;
342 }
343 }
344 }
345 }
346 Ok(out)
347}
348
349#[cfg(feature = "gui")]
350pub async fn list_codex_sessions_in_day_dir(
351 day_dir: &Path,
352 limit: usize,
353) -> Result<Vec<SessionIndexItem>> {
354 if limit == 0 {
355 return Ok(Vec::new());
356 }
357 if !day_dir.exists() {
358 return Ok(Vec::new());
359 }
360
361 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
362 let day_files = collect_rollout_files_sorted(day_dir).await?;
363 let mut out: Vec<SessionIndexItem> = Vec::new();
364 for chunk in day_files.chunks(SESSION_IO_CONCURRENCY) {
365 let cwd = cwd.clone();
366 let mut stream = stream::iter(chunk.iter().cloned())
367 .map(move |path| {
368 let cwd = cwd.clone();
369 async move { read_session_index_item(path, cwd).await }
370 })
371 .buffer_unordered(SESSION_IO_CONCURRENCY);
372
373 while let Some(item) = stream.next().await {
374 if let Some(item) = item? {
375 out.push(item);
376 }
377 }
378 if out.len() >= limit {
379 break;
380 }
381 }
382
383 out.sort_by_key(|item| Reverse(item.mtime_ms));
384 out.truncate(limit);
385 Ok(out)
386}
387
388#[cfg(feature = "gui")]
389async fn read_session_index_item(path: PathBuf, cwd: PathBuf) -> Result<Option<SessionIndexItem>> {
390 let header_opt = read_session_header(&path, &cwd).await?;
391 let Some(mut header) = header_opt else {
392 return Ok(None);
393 };
394 header.updated_hint = read_last_timestamp_from_tail(&header.path)
395 .await?
396 .or_else(|| header.created_at.clone());
397 Ok(Some(SessionIndexItem {
398 id: header.id,
399 path: header.path,
400 cwd: header.cwd,
401 created_at: header.created_at,
402 updated_hint: header.updated_hint,
403 mtime_ms: header.mtime_ms,
404 first_user_message: Some(header.first_user_message),
405 }))
406}
407
408async fn find_recent_codex_sessions_in_dir(
409 sessions_dir: &Path,
410 since: Duration,
411 limit: usize,
412) -> Result<Vec<RecentSession>> {
413 if limit == 0 {
414 return Ok(Vec::new());
415 }
416 if since.is_zero() {
417 return Ok(Vec::new());
418 }
419 if !sessions_dir.exists() {
420 return Ok(Vec::new());
421 }
422
423 let now_ms = SystemTime::now()
424 .duration_since(UNIX_EPOCH)
425 .unwrap_or_default()
426 .as_millis()
427 .min(u64::MAX as u128) as u64;
428 let since_ms = since.as_millis().min(u64::MAX as u128) as u64;
429 let threshold_ms = now_ms.saturating_sub(since_ms);
430
431 let mut out: Vec<RecentSession> = Vec::new();
432 let mut scanned_files: usize = 0;
433
434 let year_dirs = collect_dirs_desc(sessions_dir, |s| s.parse::<u32>().ok()).await?;
435 'outer: for (_year, year_path) in year_dirs {
436 let month_dirs = collect_dirs_desc(&year_path, |s| s.parse::<u8>().ok()).await?;
437 for (_month, month_path) in month_dirs {
438 let day_dirs = collect_dirs_desc(&month_path, |s| s.parse::<u8>().ok()).await?;
439 for (_day, day_path) in day_dirs {
440 let day_files = collect_rollout_files_sorted(&day_path).await?;
441 for path in day_files {
442 if scanned_files >= MAX_SCAN_FILES_RECENT {
443 break 'outer;
444 }
445 scanned_files += 1;
446
447 let meta = match fs::metadata(&path).await {
448 Ok(m) => m,
449 Err(_) => continue,
450 };
451 let mtime_ms = meta
452 .modified()
453 .ok()
454 .and_then(|t| t.duration_since(UNIX_EPOCH).ok())
455 .map(|d| d.as_millis().min(u64::MAX as u128) as u64)
456 .unwrap_or(0);
457 if mtime_ms < threshold_ms {
458 continue;
459 }
460
461 let file_id = path
462 .file_name()
463 .and_then(|s| s.to_str())
464 .and_then(parse_timestamp_and_uuid)
465 .map(|(_, uuid)| uuid);
466
467 let meta = read_codex_session_meta(&path).await?;
468 let (id, cwd) = if let Some(meta) = meta {
469 (meta.id, meta.cwd)
470 } else if let Some(id) = file_id {
471 (id, None)
472 } else {
473 continue;
474 };
475
476 out.push(RecentSession { id, cwd, mtime_ms });
477 }
478 }
479 }
480 }
481
482 out.sort_by_key(|item| Reverse((item.mtime_ms, item.id.clone())));
483 out.truncate(limit);
484 Ok(out)
485}
486
487pub async fn find_codex_session_cwd_by_id(session_id: &str) -> Result<Option<String>> {
491 let root = codex_sessions_dir();
492 if !root.exists() {
493 return Ok(None);
494 }
495
496 let year_dirs = collect_dirs_desc(&root, |s| s.parse::<u32>().ok()).await?;
497 for (_year, year_path) in year_dirs {
498 let month_dirs = collect_dirs_desc(&year_path, |s| s.parse::<u8>().ok()).await?;
499 for (_month, month_path) in month_dirs {
500 let day_dirs = collect_dirs_desc(&month_path, |s| s.parse::<u8>().ok()).await?;
501 for (_day, day_path) in day_dirs {
502 let day_files = collect_rollout_files_sorted(&day_path).await?;
503 for path in day_files {
504 let Some(name) = path.file_name().and_then(|s| s.to_str()) else {
505 continue;
506 };
507 let Some((_ts, uuid)) = parse_timestamp_and_uuid(name) else {
508 continue;
509 };
510 if uuid != session_id {
511 continue;
512 }
513
514 let file = fs::File::open(&path)
515 .await
516 .with_context(|| format!("failed to open session file {:?}", path))?;
517 let reader = BufReader::new(file);
518 let mut lines = reader.lines();
519 while let Some(line) = lines.next_line().await? {
520 let line = line.trim();
521 if line.is_empty() {
522 continue;
523 }
524 let value: Value = match serde_json::from_str(line) {
525 Ok(v) => v,
526 Err(_) => continue,
527 };
528 if let Some(meta) = parse_session_meta(&value) {
529 return Ok(meta.cwd);
530 }
531 }
532
533 return Ok(None);
534 }
535 }
536 }
537 }
538
539 Ok(None)
540}
541
542pub async fn find_codex_session_file_by_id(session_id: &str) -> Result<Option<PathBuf>> {
547 Ok(find_codex_session_files_by_ids(&[session_id.to_string()])
548 .await?
549 .remove(session_id))
550}
551
552pub async fn find_codex_session_files_by_ids(
553 session_ids: &[String],
554) -> Result<HashMap<String, PathBuf>> {
555 find_codex_session_files_by_ids_in_dir(&codex_sessions_dir(), session_ids).await
556}
557
558async fn find_codex_session_files_by_ids_in_dir(
559 root: &Path,
560 session_ids: &[String],
561) -> Result<HashMap<String, PathBuf>> {
562 if !root.exists() || session_ids.is_empty() {
563 return Ok(HashMap::new());
564 }
565
566 let mut remaining = session_ids
567 .iter()
568 .map(|sid| sid.trim())
569 .filter(|sid| !sid.is_empty())
570 .map(ToOwned::to_owned)
571 .collect::<std::collections::HashSet<_>>();
572 if remaining.is_empty() {
573 return Ok(HashMap::new());
574 }
575
576 let mut found = HashMap::new();
577 let mut scanned_files: usize = 0;
578 let year_dirs = collect_dirs_desc(root, |s| s.parse::<u32>().ok()).await?;
579
580 'outer: for (_year, year_path) in year_dirs {
581 let month_dirs = collect_dirs_desc(&year_path, |s| s.parse::<u8>().ok()).await?;
582 for (_month, month_path) in month_dirs {
583 let day_dirs = collect_dirs_desc(&month_path, |s| s.parse::<u8>().ok()).await?;
584 for (_day, day_path) in day_dirs {
585 let day_files = collect_rollout_files_sorted(&day_path).await?;
586 for path in day_files {
587 if scanned_files >= MAX_SCAN_FILES || remaining.is_empty() {
588 break 'outer;
589 }
590 scanned_files += 1;
591
592 if let Some(name) = path.file_name().and_then(|s| s.to_str())
593 && let Some((_ts, uuid)) = parse_timestamp_and_uuid(name)
594 && remaining.remove(&uuid)
595 {
596 found.insert(uuid.to_string(), path.clone());
597 if remaining.is_empty() {
598 break 'outer;
599 }
600 continue;
601 }
602
603 if let Some(meta) = read_codex_session_meta(&path).await?
604 && remaining.remove(meta.id.as_str())
605 {
606 found.insert(meta.id, path);
607 if remaining.is_empty() {
608 break 'outer;
609 }
610 }
611 }
612 }
613 }
614 }
615
616 Ok(found)
617}
618
619pub async fn read_codex_session_meta(path: &Path) -> Result<Option<SessionMeta>> {
621 let file = fs::File::open(path)
622 .await
623 .with_context(|| format!("failed to open session file {:?}", path))?;
624 let reader = BufReader::new(file);
625 let mut lines = reader.lines();
626
627 let mut lines_scanned = 0usize;
628 while let Some(line) = lines.next_line().await? {
629 let trimmed = line.trim();
630 if trimmed.is_empty() {
631 continue;
632 }
633 lines_scanned += 1;
634 if lines_scanned > HEAD_SCAN_LINES {
635 break;
636 }
637
638 let value: Value = match serde_json::from_str(trimmed) {
639 Ok(v) => v,
640 Err(_) => continue,
641 };
642
643 if let Some(meta) = parse_session_meta(&value) {
644 return Ok(Some(SessionMeta {
645 id: meta.id,
646 cwd: meta.cwd,
647 created_at: meta.created_at,
648 }));
649 }
650 }
651
652 Ok(None)
653}
654
655#[cfg(test)]
656async fn summarize_session_for_current_dir(
657 path: &Path,
658 cwd: &Path,
659) -> Result<Option<SessionSummary>> {
660 let header_opt = read_session_header(path, cwd).await?;
661 let Some(header) = header_opt else {
662 return Ok(None);
663 };
664 Ok(Some(expand_header_to_summary_uncached(header).await?))
665}
666
667struct SessionMetaInfo {
668 id: String,
669 cwd: Option<String>,
670 created_at: Option<String>,
671}
672
673#[derive(Debug, Clone)]
674struct SessionHeader {
675 id: String,
676 path: PathBuf,
677 cwd: Option<String>,
678 created_at: Option<String>,
679 mtime_ms: u64,
681 updated_hint: Option<String>,
683 first_user_message: String,
684 is_cwd_match: bool,
685}
686
687fn parse_session_meta(value: &Value) -> Option<SessionMetaInfo> {
688 let obj = value.as_object()?;
689 let type_str = obj.get("type")?.as_str()?;
690 if type_str != "session_meta" {
691 return None;
692 }
693
694 let payload = obj.get("payload")?.as_object()?;
695 let id = payload.get("id").and_then(|v| v.as_str())?.to_string();
696 let cwd = payload
697 .get("cwd")
698 .and_then(|v| v.as_str())
699 .map(|s| s.to_string());
700 let created_at = payload
701 .get("timestamp")
702 .and_then(|v| v.as_str())
703 .map(|s| s.to_string())
704 .or_else(|| {
705 obj.get("timestamp")
706 .and_then(|v| v.as_str())
707 .map(|s| s.to_string())
708 });
709
710 Some(SessionMetaInfo {
711 id,
712 cwd,
713 created_at,
714 })
715}
716
717fn user_message_text(value: &Value) -> Option<&str> {
718 let obj = value.as_object()?;
719 let type_str = obj.get("type")?.as_str()?;
720 if type_str != "event_msg" {
721 return None;
722 }
723 let payload = obj.get("payload")?.as_object()?;
724 let payload_type = payload.get("type")?.as_str()?;
725 if payload_type != "user_message" {
726 return None;
727 }
728 payload.get("message").and_then(|v| v.as_str())
729}
730
731fn contains_bytes(haystack: &[u8], needle: &[u8]) -> bool {
732 if needle.is_empty() {
733 return true;
734 }
735 if haystack.len() < needle.len() {
736 return false;
737 }
738 haystack.windows(needle.len()).any(|w| w == needle)
739}
740
741async fn read_session_header(path: &Path, cwd: &Path) -> Result<Option<SessionHeader>> {
742 let meta = fs::metadata(path)
743 .await
744 .with_context(|| format!("failed to stat session file {:?}", path))?;
745 let mtime_ms = meta
746 .modified()
747 .ok()
748 .and_then(|t| t.duration_since(UNIX_EPOCH).ok())
749 .map(|d| d.as_millis() as u64)
750 .unwrap_or(0);
751
752 let file = fs::File::open(path)
753 .await
754 .with_context(|| format!("failed to open session file {:?}", path))?;
755 let reader = BufReader::new(file);
756 let mut lines = reader.lines();
757
758 let mut session_id: Option<String> = None;
759 let mut cwd_str: Option<String> = None;
760 let mut created_at: Option<String> = None;
761 let mut first_user_message: Option<String> = None;
762
763 let mut lines_scanned = 0usize;
764 while let Some(line) = lines.next_line().await? {
765 let trimmed = line.trim();
766 if trimmed.is_empty() {
767 continue;
768 }
769 lines_scanned += 1;
770 if lines_scanned > HEAD_SCAN_LINES {
771 break;
772 }
773 let value: Value = match serde_json::from_str(trimmed) {
774 Ok(v) => v,
775 Err(_) => continue,
776 };
777
778 if session_id.is_none()
779 && let Some(meta) = parse_session_meta(&value)
780 {
781 session_id = Some(meta.id);
782 cwd_str = meta.cwd;
783 created_at = meta.created_at;
784 }
785
786 if first_user_message.is_none()
787 && let Some(msg) = user_message_text(&value)
788 {
789 first_user_message = Some(msg.to_string());
790 }
791
792 if session_id.is_some() && first_user_message.is_some() {
793 break;
794 }
795 }
796
797 let Some(id) = session_id else {
798 return Ok(None);
799 };
800 let Some(first_user_message) = first_user_message else {
801 return Ok(None);
802 };
803
804 let cwd_value = cwd_str.clone();
805 let is_cwd_match = cwd_value
806 .as_deref()
807 .map(|s| path_matches_current_dir(s, cwd))
808 .unwrap_or(false);
809
810 Ok(Some(SessionHeader {
811 id,
812 path: path.to_path_buf(),
813 cwd: cwd_value,
814 created_at,
815 mtime_ms,
816 updated_hint: None,
817 first_user_message,
818 is_cwd_match,
819 }))
820}
821
822async fn select_and_expand_headers(
823 matched: Vec<SessionHeader>,
824 others: Vec<SessionHeader>,
825 limit: usize,
826) -> Result<Vec<SessionSummary>> {
827 if limit == 0 {
828 return Ok(Vec::new());
829 }
830
831 let mut chosen = if !matched.is_empty() { matched } else { others };
832 chosen.sort_by_key(|item| Reverse(item.mtime_ms));
835 if chosen.len() > limit {
836 chosen.truncate(limit);
837 }
838
839 let cache = Arc::new(Mutex::new(SessionStatsCache::load_default().await));
840 let mut out: Vec<SessionSummary> = Vec::with_capacity(chosen.len().min(limit));
841 let mut stream = stream::iter(chosen)
842 .map(|header| {
843 let cache = Arc::clone(&cache);
844 async move { expand_header_to_summary_cached(cache, header).await }
845 })
846 .buffer_unordered(SESSION_IO_CONCURRENCY);
847
848 while let Some(summary) = stream.next().await {
849 out.push(summary?);
850 }
851
852 drop(stream);
853 let mut cache = Arc::try_unwrap(cache)
854 .map_err(|_| anyhow!("session stats cache still has active workers"))?
855 .into_inner()
856 .map_err(|_| anyhow!("session stats cache lock poisoned"))?;
857 cache.save_if_dirty().await?;
858
859 sort_by_updated_desc(&mut out);
860 out.truncate(limit);
861 Ok(out)
862}
863
864fn build_summary_from_stats(
865 header: SessionHeader,
866 user_turns: usize,
867 assistant_turns: usize,
868 last_response_at: Option<String>,
869) -> SessionSummary {
870 let rounds = user_turns.min(assistant_turns);
871 let updated_at = last_response_at
872 .clone()
873 .or_else(|| header.updated_hint.clone())
874 .or_else(|| header.created_at.clone());
875
876 SessionSummary {
877 id: header.id,
878 path: header.path,
879 cwd: header.cwd,
880 created_at: header.created_at,
881 updated_at,
882 last_response_at,
883 user_turns,
884 assistant_turns,
885 rounds,
886 first_user_message: Some(header.first_user_message),
887 source: SessionSummarySource::LocalFile,
888 sort_hint_ms: None,
889 }
890}
891
892async fn expand_header_to_summary_cached(
893 cache: Arc<Mutex<SessionStatsCache>>,
894 mut header: SessionHeader,
895) -> Result<SessionSummary> {
896 let path = header.path.clone();
897 let key = path.to_string_lossy().to_string();
898 let meta = fs::metadata(&path)
899 .await
900 .with_context(|| format!("failed to stat session file {:?}", path))?;
901 let size = meta.len();
902 let mtime_ms = meta
903 .modified()
904 .ok()
905 .and_then(|t| t.duration_since(UNIX_EPOCH).ok())
906 .map(|d| d.as_millis() as u64)
907 .unwrap_or(0);
908
909 let cached = {
910 let cache = cache
911 .lock()
912 .map_err(|_| anyhow!("session stats cache lock poisoned"))?;
913 cache.lookup(&key, mtime_ms, size)
914 };
915
916 let stats = if let Some(stats) = cached {
917 if stats.last_response_at.is_none() && header.updated_hint.is_none() {
918 header.updated_hint = read_last_timestamp_from_tail(&path)
919 .await?
920 .or_else(|| header.created_at.clone());
921 }
922 stats
923 } else {
924 let (counts, tail) = tokio::join!(
925 count_turns_in_file(&path),
926 read_tail_timestamps(&path, true)
927 );
928 let (user_turns, assistant_turns) = counts?;
929 let tail = tail?;
930 header.updated_hint = tail.last_record_at.or_else(|| header.created_at.clone());
931
932 let stats = SessionStatsSnapshot {
933 user_turns,
934 assistant_turns,
935 last_response_at: tail.last_assistant_at,
936 };
937 {
938 let mut cache = cache
939 .lock()
940 .map_err(|_| anyhow!("session stats cache lock poisoned"))?;
941 cache.insert(key, mtime_ms, size, &stats);
942 }
943 stats
944 };
945
946 Ok(build_summary_from_stats(
947 header,
948 stats.user_turns,
949 stats.assistant_turns,
950 stats.last_response_at,
951 ))
952}
953
954#[cfg(test)]
955async fn expand_header_to_summary_uncached(header: SessionHeader) -> Result<SessionSummary> {
956 let (user_turns, assistant_turns) = count_turns_in_file(&header.path).await?;
957 let last_response_at = read_last_assistant_timestamp_from_tail(&header.path).await?;
958 Ok(build_summary_from_stats(
959 header,
960 user_turns,
961 assistant_turns,
962 last_response_at,
963 ))
964}
965
966async fn count_turns_in_file(path: &Path) -> Result<(usize, usize)> {
967 const USER_TURN_NEEDLE: &[u8] = br#""payload":{"type":"user_message""#;
968 const ASSISTANT_TURN_NEEDLE: &[u8] = br#""role":"assistant""#;
969
970 let mut file = fs::File::open(path)
971 .await
972 .with_context(|| format!("failed to open session file {:?}", path))?;
973
974 let mut buf = vec![0u8; IO_CHUNK_SIZE];
975 let mut user_carry: Vec<u8> = Vec::new();
976 let mut assistant_carry: Vec<u8> = Vec::new();
977 let mut user_total = 0usize;
978 let mut assistant_total = 0usize;
979 let mut user_window: Vec<u8> = Vec::with_capacity(IO_CHUNK_SIZE + USER_TURN_NEEDLE.len());
980 let mut assistant_window: Vec<u8> =
981 Vec::with_capacity(IO_CHUNK_SIZE + ASSISTANT_TURN_NEEDLE.len());
982
983 loop {
984 let n = file.read(&mut buf).await?;
985 if n == 0 {
986 break;
987 }
988
989 user_window.clear();
990 user_window.extend_from_slice(&user_carry);
991 user_window.extend_from_slice(&buf[..n]);
992 user_total = user_total.saturating_add(count_subslice(&user_window, USER_TURN_NEEDLE));
993
994 assistant_window.clear();
995 assistant_window.extend_from_slice(&assistant_carry);
996 assistant_window.extend_from_slice(&buf[..n]);
997 assistant_total = assistant_total
998 .saturating_add(count_subslice(&assistant_window, ASSISTANT_TURN_NEEDLE));
999
1000 let user_keep = USER_TURN_NEEDLE.len().saturating_sub(1);
1001 user_carry = if user_keep > 0 && user_window.len() >= user_keep {
1002 user_window[user_window.len() - user_keep..].to_vec()
1003 } else {
1004 Vec::new()
1005 };
1006
1007 let assistant_keep = ASSISTANT_TURN_NEEDLE.len().saturating_sub(1);
1008 assistant_carry = if assistant_keep > 0 && assistant_window.len() >= assistant_keep {
1009 assistant_window[assistant_window.len() - assistant_keep..].to_vec()
1010 } else {
1011 Vec::new()
1012 };
1013 }
1014
1015 Ok((user_total, assistant_total))
1016}
1017
1018fn count_subslice(haystack: &[u8], needle: &[u8]) -> usize {
1019 if needle.is_empty() {
1020 return 0;
1021 }
1022 if haystack.len() < needle.len() {
1023 return 0;
1024 }
1025 haystack
1026 .windows(needle.len())
1027 .filter(|w| *w == needle)
1028 .count()
1029}
1030
1031#[derive(Debug, Default)]
1032struct TailTimestamps {
1033 last_record_at: Option<String>,
1034 last_assistant_at: Option<String>,
1035}
1036
1037async fn read_last_timestamp_from_tail(path: &Path) -> Result<Option<String>> {
1038 Ok(read_tail_timestamps(path, false).await?.last_record_at)
1039}
1040
1041#[cfg(test)]
1042async fn read_last_assistant_timestamp_from_tail(path: &Path) -> Result<Option<String>> {
1043 Ok(read_tail_timestamps(path, true).await?.last_assistant_at)
1044}
1045
1046async fn read_tail_timestamps(path: &Path, include_assistant: bool) -> Result<TailTimestamps> {
1047 const ASSISTANT_ROLE_NEEDLE: &[u8] = br#""role":"assistant""#;
1048
1049 let mut file = fs::File::open(path)
1050 .await
1051 .with_context(|| format!("failed to open session file {:?}", path))?;
1052 let meta = file
1053 .metadata()
1054 .await
1055 .with_context(|| format!("failed to stat session file {:?}", path))?;
1056 let mut pos = meta.len();
1057 if pos == 0 {
1058 return Ok(TailTimestamps::default());
1059 }
1060
1061 let mut scanned = 0usize;
1062 let mut carry: Vec<u8> = Vec::new();
1063 let chunk_size = IO_CHUNK_SIZE as u64;
1064 let mut found = TailTimestamps::default();
1065
1066 while pos > 0 && scanned < TAIL_SCAN_MAX_BYTES {
1067 let start = pos.saturating_sub(chunk_size);
1068 let size = (pos - start) as usize;
1069 file.seek(std::io::SeekFrom::Start(start)).await?;
1070
1071 let mut chunk = vec![0u8; size];
1072 file.read_exact(&mut chunk).await?;
1073 scanned = scanned.saturating_add(size);
1074
1075 if !carry.is_empty() {
1076 chunk.extend_from_slice(&carry);
1077 }
1078
1079 let mut end = chunk.len();
1081 while end > 0 {
1082 let mut begin = end;
1083 while begin > 0 && chunk[begin - 1] != b'\n' {
1084 begin -= 1;
1085 }
1086 let line = chunk[begin..end].trim_ascii();
1087 end = begin.saturating_sub(1);
1088
1089 if line.is_empty() {
1090 continue;
1091 }
1092
1093 let wants_record = found.last_record_at.is_none();
1094 let wants_assistant = include_assistant
1095 && found.last_assistant_at.is_none()
1096 && contains_bytes(line, ASSISTANT_ROLE_NEEDLE);
1097 if !wants_record && !wants_assistant {
1098 continue;
1099 }
1100
1101 let value: Value = match serde_json::from_slice(line) {
1102 Ok(v) => v,
1103 Err(_) => continue,
1104 };
1105 if let Some(ts) = value.get("timestamp").and_then(|v| v.as_str()) {
1106 let ts = ts.to_string();
1107 if wants_record {
1108 found.last_record_at = Some(ts.clone());
1109 }
1110 if wants_assistant {
1111 found.last_assistant_at = Some(ts);
1112 }
1113 if found.last_record_at.is_some()
1114 && (!include_assistant || found.last_assistant_at.is_some())
1115 {
1116 return Ok(found);
1117 }
1118 }
1119 }
1120
1121 if let Some(first_nl) = chunk.iter().position(|b| *b == b'\n') {
1123 carry = chunk[..first_nl].to_vec();
1124 } else {
1125 carry = chunk;
1126 }
1127
1128 pos = start;
1129 }
1130
1131 Ok(found)
1132}
1133
1134fn path_matches_current_dir(session_cwd: &str, current_dir: &Path) -> bool {
1135 let session_path = PathBuf::from(session_cwd);
1136 if !session_path.is_absolute() {
1137 return false;
1138 }
1139
1140 let current = std::fs::canonicalize(current_dir).unwrap_or_else(|_| current_dir.to_path_buf());
1141 let cwd = std::fs::canonicalize(&session_path).unwrap_or(session_path);
1142
1143 current == cwd || current.starts_with(&cwd) || cwd.starts_with(¤t)
1144}
1145
1146async fn collect_dirs_desc<T, F>(parent: &Path, parse: F) -> std::io::Result<Vec<(T, PathBuf)>>
1147where
1148 T: Ord + Copy,
1149 F: Fn(&str) -> Option<T>,
1150{
1151 let mut dir = fs::read_dir(parent).await?;
1152 let mut vec: Vec<(T, PathBuf)> = Vec::new();
1153 while let Some(entry) = dir.next_entry().await? {
1154 if entry
1155 .file_type()
1156 .await
1157 .map(|ft| ft.is_dir())
1158 .unwrap_or(false)
1159 && let Some(s) = entry.file_name().to_str()
1160 && let Some(v) = parse(s)
1161 {
1162 vec.push((v, entry.path()));
1163 }
1164 }
1165 vec.sort_by_key(|(v, _)| Reverse(*v));
1166 Ok(vec)
1167}
1168
1169async fn collect_rollout_files_sorted(parent: &Path) -> std::io::Result<Vec<PathBuf>> {
1170 let mut dir = fs::read_dir(parent).await?;
1171 let mut records: Vec<(String, String, PathBuf)> = Vec::new();
1172
1173 while let Some(entry) = dir.next_entry().await? {
1174 if entry
1175 .file_type()
1176 .await
1177 .map(|ft| ft.is_file())
1178 .unwrap_or(false)
1179 {
1180 let name_os = entry.file_name();
1181 let Some(name) = name_os.to_str() else {
1182 continue;
1183 };
1184 if !name.starts_with("rollout-") || !name.ends_with(".jsonl") {
1185 continue;
1186 }
1187 if let Some((ts, uuid)) = parse_timestamp_and_uuid(name) {
1188 records.push((ts, uuid, entry.path()));
1189 }
1190 }
1191 }
1192
1193 records.sort_by(|a, b| {
1194 match b.0.cmp(&a.0) {
1196 Ordering::Equal => b.1.cmp(&a.1),
1197 other => other,
1198 }
1199 });
1200
1201 Ok(records.into_iter().map(|(_, _, path)| path).collect())
1202}
1203
1204fn parse_timestamp_and_uuid(name: &str) -> Option<(String, String)> {
1205 let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
1207
1208 const TS_LEN: usize = 19;
1210 if core.len() <= TS_LEN + 1 {
1211 return None;
1212 }
1213 let (ts, rest) = core.split_at(TS_LEN);
1214 let uuid = rest.strip_prefix('-')?;
1215 if uuid.is_empty() {
1216 return None;
1217 }
1218 Some((ts.to_string(), uuid.to_string()))
1219}
1220
1221fn sort_by_updated_desc(vec: &mut [SessionSummary]) {
1222 vec.sort_by(|a, b| {
1223 let ta = a.updated_at.as_deref();
1224 let tb = b.updated_at.as_deref();
1225 match (ta, tb) {
1226 (Some(ta), Some(tb)) => tb.cmp(ta),
1227 (Some(_), None) => Ordering::Less,
1228 (None, Some(_)) => Ordering::Greater,
1229 (None, None) => Ordering::Equal,
1230 }
1231 });
1232}
1233
1234#[cfg(test)]
1235mod tests;