use async_trait::async_trait;
use std::cmp::Reverse;
use std::ffi::OsStr;
use std::io::{self};
use std::num::NonZero;
use std::ops::ControlFlow;
use std::path::Path;
use std::path::PathBuf;
use time::OffsetDateTime;
use time::PrimitiveDateTime;
use time::format_description::FormatItem;
use time::format_description::well_known::Rfc3339;
use time::macros::format_description;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use uuid::Uuid;
use super::ARCHIVED_SESSIONS_SUBDIR;
use super::SESSIONS_SUBDIR;
use super::is_unsupported_rollout_schema_error;
use super::recorder::is_supported_rollout_schema_version;
use super::recorder::validate_rollout_line_schema_version;
use crate::product::agent::path_utils;
use crate::product::agent::protocol::EventMsg;
use crate::product::agent::state_db;
use crate::product::file_search;
use crate::product::protocol::ThreadId;
use crate::product::protocol::protocol::RolloutItem;
use crate::product::protocol::protocol::RolloutLine;
use crate::product::protocol::protocol::SessionMetaLine;
use crate::product::protocol::protocol::SessionSource;
use tracing::warn;
#[derive(Debug, Default, PartialEq)]
pub struct ThreadsPage {
pub items: Vec<ThreadItem>,
pub next_cursor: Option<Cursor>,
pub num_scanned_files: usize,
pub reached_scan_cap: bool,
}
#[derive(Debug, PartialEq)]
pub struct ThreadItem {
pub path: PathBuf,
pub head: Vec<serde_json::Value>,
pub cwd: Option<PathBuf>,
pub created_at: Option<String>,
pub updated_at: Option<String>,
}
#[derive(Default)]
struct HeadTailSummary {
head: Vec<serde_json::Value>,
saw_session_meta: bool,
saw_user_event: bool,
session_cwd: Option<PathBuf>,
source: Option<SessionSource>,
model_provider: Option<String>,
created_at: Option<String>,
updated_at: Option<String>,
}
const MAX_SCAN_FILES: usize = 10000;
const HEAD_RECORD_LIMIT: usize = 10;
const USER_EVENT_SCAN_LIMIT: usize = 200;
const TAIL_SCAN_CHUNK_SIZE: usize = 8 * 1024;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ThreadSortKey {
CreatedAt,
UpdatedAt,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ThreadListLayout {
NestedByDate,
Flat,
}
pub(crate) struct ThreadListConfig<'a> {
pub(crate) allowed_sources: &'a [SessionSource],
pub(crate) model_providers: Option<&'a [String]>,
pub(crate) cwd_filter: Option<&'a Path>,
pub(crate) default_provider: &'a str,
pub(crate) layout: ThreadListLayout,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Cursor {
ts: OffsetDateTime,
id: Uuid,
}
impl Cursor {
fn new(ts: OffsetDateTime, id: Uuid) -> Self {
Self { ts, id }
}
}
struct AnchorState {
ts: OffsetDateTime,
id: Uuid,
passed: bool,
}
impl AnchorState {
fn new(anchor: Option<Cursor>) -> Self {
match anchor {
Some(cursor) => Self {
ts: cursor.ts,
id: cursor.id,
passed: false,
},
None => Self {
ts: OffsetDateTime::UNIX_EPOCH,
id: Uuid::nil(),
passed: true,
},
}
}
fn should_skip(&mut self, ts: OffsetDateTime, id: Uuid) -> bool {
if self.passed {
return false;
}
if ts < self.ts || (ts == self.ts && id < self.id) {
self.passed = true;
false
} else {
true
}
}
}
#[async_trait]
trait RolloutFileVisitor {
async fn visit(
&mut self,
ts: OffsetDateTime,
id: Uuid,
path: PathBuf,
scanned: usize,
) -> ControlFlow<()>;
}
struct FilesByCreatedAtVisitor<'a> {
items: &'a mut Vec<ThreadItem>,
page_size: usize,
anchor_state: AnchorState,
more_matches_available: bool,
allowed_sources: &'a [SessionSource],
provider_matcher: Option<&'a ProviderMatcher<'a>>,
cwd_filter: Option<&'a Path>,
}
#[async_trait]
impl<'a> RolloutFileVisitor for FilesByCreatedAtVisitor<'a> {
async fn visit(
&mut self,
ts: OffsetDateTime,
id: Uuid,
path: PathBuf,
scanned: usize,
) -> ControlFlow<()> {
if scanned >= MAX_SCAN_FILES && self.items.len() >= self.page_size {
self.more_matches_available = true;
return ControlFlow::Break(());
}
if self.anchor_state.should_skip(ts, id) {
return ControlFlow::Continue(());
}
if self.items.len() == self.page_size {
self.more_matches_available = true;
return ControlFlow::Break(());
}
let updated_at = file_modified_time(&path)
.await
.unwrap_or(None)
.and_then(format_rfc3339);
if let Some(item) = build_thread_item(
path,
self.allowed_sources,
self.provider_matcher,
self.cwd_filter,
updated_at,
)
.await
{
self.items.push(item);
}
ControlFlow::Continue(())
}
}
struct FilesByUpdatedAtVisitor<'a> {
candidates: &'a mut Vec<ThreadCandidate>,
}
#[async_trait]
impl<'a> RolloutFileVisitor for FilesByUpdatedAtVisitor<'a> {
async fn visit(
&mut self,
_ts: OffsetDateTime,
id: Uuid,
path: PathBuf,
_scanned: usize,
) -> ControlFlow<()> {
let updated_at = file_modified_time(&path).await.unwrap_or(None);
self.candidates.push(ThreadCandidate {
path,
id,
updated_at,
});
ControlFlow::Continue(())
}
}
impl serde::Serialize for Cursor {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let ts_str = self
.ts
.format(&format_description!(
"[year]-[month]-[day]T[hour]-[minute]-[second]"
))
.map_err(|e| serde::ser::Error::custom(format!("format error: {e}")))?;
serializer.serialize_str(&format!("{ts_str}|{}", self.id))
}
}
impl<'de> serde::Deserialize<'de> for Cursor {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
parse_cursor(&s).ok_or_else(|| serde::de::Error::custom("invalid cursor"))
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn get_threads(
lha_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
default_provider: &str,
cwd_filter: Option<&Path>,
) -> io::Result<ThreadsPage> {
let root = lha_home.join(SESSIONS_SUBDIR);
get_threads_in_root(
root,
page_size,
cursor,
sort_key,
ThreadListConfig {
allowed_sources,
model_providers,
cwd_filter,
default_provider,
layout: ThreadListLayout::NestedByDate,
},
)
.await
}
pub(crate) async fn get_threads_in_root(
root: PathBuf,
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
config: ThreadListConfig<'_>,
) -> io::Result<ThreadsPage> {
if !root.exists() {
return Ok(ThreadsPage {
items: Vec::new(),
next_cursor: None,
num_scanned_files: 0,
reached_scan_cap: false,
});
}
let anchor = cursor.cloned();
let provider_matcher = config
.model_providers
.and_then(|filters| ProviderMatcher::new(filters, config.default_provider));
let result = match config.layout {
ThreadListLayout::NestedByDate => {
traverse_directories_for_paths(
root.clone(),
page_size,
anchor,
sort_key,
config.allowed_sources,
provider_matcher.as_ref(),
config.cwd_filter,
)
.await?
}
ThreadListLayout::Flat => {
traverse_flat_paths(
root.clone(),
page_size,
anchor,
sort_key,
config.allowed_sources,
provider_matcher.as_ref(),
config.cwd_filter,
)
.await?
}
};
Ok(result)
}
async fn traverse_directories_for_paths(
root: PathBuf,
page_size: usize,
anchor: Option<Cursor>,
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
cwd_filter: Option<&Path>,
) -> io::Result<ThreadsPage> {
match sort_key {
ThreadSortKey::CreatedAt => {
traverse_directories_for_paths_created(
root,
page_size,
anchor,
allowed_sources,
provider_matcher,
cwd_filter,
)
.await
}
ThreadSortKey::UpdatedAt => {
traverse_directories_for_paths_updated(
root,
page_size,
anchor,
allowed_sources,
provider_matcher,
cwd_filter,
)
.await
}
}
}
async fn traverse_flat_paths(
root: PathBuf,
page_size: usize,
anchor: Option<Cursor>,
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
cwd_filter: Option<&Path>,
) -> io::Result<ThreadsPage> {
match sort_key {
ThreadSortKey::CreatedAt => {
traverse_flat_paths_created(
root,
page_size,
anchor,
allowed_sources,
provider_matcher,
cwd_filter,
)
.await
}
ThreadSortKey::UpdatedAt => {
traverse_flat_paths_updated(
root,
page_size,
anchor,
allowed_sources,
provider_matcher,
cwd_filter,
)
.await
}
}
}
async fn traverse_directories_for_paths_created(
root: PathBuf,
page_size: usize,
anchor: Option<Cursor>,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
cwd_filter: Option<&Path>,
) -> io::Result<ThreadsPage> {
let mut items: Vec<ThreadItem> = Vec::with_capacity(page_size);
let mut scanned_files = 0usize;
let mut more_matches_available = false;
let mut visitor = FilesByCreatedAtVisitor {
items: &mut items,
page_size,
anchor_state: AnchorState::new(anchor),
more_matches_available,
allowed_sources,
provider_matcher,
cwd_filter,
};
walk_rollout_files(&root, &mut scanned_files, &mut visitor).await?;
more_matches_available = visitor.more_matches_available;
let reached_scan_cap = scanned_files >= MAX_SCAN_FILES;
if reached_scan_cap && !items.is_empty() {
more_matches_available = true;
}
let next = if more_matches_available {
build_next_cursor(&items, ThreadSortKey::CreatedAt)
} else {
None
};
Ok(ThreadsPage {
items,
next_cursor: next,
num_scanned_files: scanned_files,
reached_scan_cap,
})
}
async fn traverse_directories_for_paths_updated(
root: PathBuf,
page_size: usize,
anchor: Option<Cursor>,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
cwd_filter: Option<&Path>,
) -> io::Result<ThreadsPage> {
let mut items: Vec<ThreadItem> = Vec::with_capacity(page_size);
let mut scanned_files = 0usize;
let mut anchor_state = AnchorState::new(anchor);
let mut more_matches_available = false;
let candidates = collect_files_by_updated_at(&root, &mut scanned_files).await?;
let mut candidates = candidates;
candidates.sort_by_key(|candidate| {
let ts = candidate.updated_at.unwrap_or(OffsetDateTime::UNIX_EPOCH);
(Reverse(ts), Reverse(candidate.id))
});
for candidate in candidates.into_iter() {
let ts = candidate.updated_at.unwrap_or(OffsetDateTime::UNIX_EPOCH);
if anchor_state.should_skip(ts, candidate.id) {
continue;
}
if items.len() == page_size {
more_matches_available = true;
break;
}
let updated_at_fallback = candidate.updated_at.and_then(format_rfc3339);
if let Some(item) = build_thread_item(
candidate.path,
allowed_sources,
provider_matcher,
cwd_filter,
updated_at_fallback,
)
.await
{
items.push(item);
}
}
let reached_scan_cap = scanned_files >= MAX_SCAN_FILES;
if reached_scan_cap && !items.is_empty() {
more_matches_available = true;
}
let next = if more_matches_available {
build_next_cursor(&items, ThreadSortKey::UpdatedAt)
} else {
None
};
Ok(ThreadsPage {
items,
next_cursor: next,
num_scanned_files: scanned_files,
reached_scan_cap,
})
}
async fn traverse_flat_paths_created(
root: PathBuf,
page_size: usize,
anchor: Option<Cursor>,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
cwd_filter: Option<&Path>,
) -> io::Result<ThreadsPage> {
let mut items: Vec<ThreadItem> = Vec::with_capacity(page_size);
let mut scanned_files = 0usize;
let mut anchor_state = AnchorState::new(anchor);
let mut more_matches_available = false;
let files = collect_flat_rollout_files(&root, &mut scanned_files).await?;
for (ts, id, path) in files.into_iter() {
if anchor_state.should_skip(ts, id) {
continue;
}
if items.len() == page_size {
more_matches_available = true;
break;
}
let updated_at = file_modified_time(&path)
.await
.unwrap_or(None)
.and_then(format_rfc3339);
if let Some(item) = build_thread_item(
path,
allowed_sources,
provider_matcher,
cwd_filter,
updated_at,
)
.await
{
items.push(item);
}
}
let reached_scan_cap = scanned_files >= MAX_SCAN_FILES;
if reached_scan_cap && !items.is_empty() {
more_matches_available = true;
}
let next = if more_matches_available {
build_next_cursor(&items, ThreadSortKey::CreatedAt)
} else {
None
};
Ok(ThreadsPage {
items,
next_cursor: next,
num_scanned_files: scanned_files,
reached_scan_cap,
})
}
async fn traverse_flat_paths_updated(
root: PathBuf,
page_size: usize,
anchor: Option<Cursor>,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
cwd_filter: Option<&Path>,
) -> io::Result<ThreadsPage> {
let mut items: Vec<ThreadItem> = Vec::with_capacity(page_size);
let mut scanned_files = 0usize;
let mut anchor_state = AnchorState::new(anchor);
let mut more_matches_available = false;
let candidates = collect_flat_files_by_updated_at(&root, &mut scanned_files).await?;
let mut candidates = candidates;
candidates.sort_by_key(|candidate| {
let ts = candidate.updated_at.unwrap_or(OffsetDateTime::UNIX_EPOCH);
(Reverse(ts), Reverse(candidate.id))
});
for candidate in candidates.into_iter() {
let ts = candidate.updated_at.unwrap_or(OffsetDateTime::UNIX_EPOCH);
if anchor_state.should_skip(ts, candidate.id) {
continue;
}
if items.len() == page_size {
more_matches_available = true;
break;
}
let updated_at_fallback = candidate.updated_at.and_then(format_rfc3339);
if let Some(item) = build_thread_item(
candidate.path,
allowed_sources,
provider_matcher,
cwd_filter,
updated_at_fallback,
)
.await
{
items.push(item);
}
}
let reached_scan_cap = scanned_files >= MAX_SCAN_FILES;
if reached_scan_cap && !items.is_empty() {
more_matches_available = true;
}
let next = if more_matches_available {
build_next_cursor(&items, ThreadSortKey::UpdatedAt)
} else {
None
};
Ok(ThreadsPage {
items,
next_cursor: next,
num_scanned_files: scanned_files,
reached_scan_cap,
})
}
pub fn parse_cursor(token: &str) -> Option<Cursor> {
let (file_ts, uuid_str) = token.split_once('|')?;
let Ok(uuid) = Uuid::parse_str(uuid_str) else {
return None;
};
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let ts = PrimitiveDateTime::parse(file_ts, format).ok()?.assume_utc();
Some(Cursor::new(ts, uuid))
}
fn build_next_cursor(items: &[ThreadItem], sort_key: ThreadSortKey) -> Option<Cursor> {
let last = items.last()?;
let file_name = last.path.file_name()?.to_string_lossy();
let (created_ts, id) = parse_timestamp_uuid_from_filename(&file_name)?;
let ts = match sort_key {
ThreadSortKey::CreatedAt => created_ts,
ThreadSortKey::UpdatedAt => {
let updated_at = last.updated_at.as_deref()?;
OffsetDateTime::parse(updated_at, &Rfc3339).ok()?
}
};
Some(Cursor::new(ts, id))
}
async fn build_thread_item(
path: PathBuf,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
cwd_filter: Option<&Path>,
updated_at: Option<String>,
) -> Option<ThreadItem> {
let summary = match read_head_summary(&path, HEAD_RECORD_LIMIT).await {
Ok(summary) => summary,
Err(err) => {
if is_unsupported_rollout_schema_error(&err) {
warn!("skipping unsupported legacy rollout {}", path.display());
}
return None;
}
};
if !allowed_sources.is_empty()
&& !summary
.source
.is_some_and(|source| allowed_sources.contains(&source))
{
return None;
}
if let Some(matcher) = provider_matcher
&& !matcher.matches(summary.model_provider.as_deref())
{
return None;
}
if summary.saw_session_meta && summary.saw_user_event {
let HeadTailSummary {
head,
session_cwd,
created_at,
updated_at: mut summary_updated_at,
..
} = summary;
let cwd = read_effective_thread_cwd_with_fallback(&path, session_cwd.as_deref())
.await
.ok()
.flatten()
.or(session_cwd);
if let Some(filter_cwd) = cwd_filter
&& !cwd
.as_deref()
.is_some_and(|thread_cwd| paths_match(thread_cwd, filter_cwd))
{
return None;
}
if summary_updated_at.is_none() {
summary_updated_at = updated_at.or_else(|| created_at.clone());
}
return Some(ThreadItem {
path,
head,
cwd,
created_at,
updated_at: summary_updated_at,
});
}
None
}
async fn collect_dirs_desc<T, F>(parent: &Path, parse: F) -> io::Result<Vec<(T, PathBuf)>>
where
T: Ord + Copy,
F: Fn(&str) -> Option<T>,
{
let mut dir = tokio::fs::read_dir(parent).await?;
let mut vec: Vec<(T, PathBuf)> = Vec::new();
while let Some(entry) = dir.next_entry().await? {
if entry
.file_type()
.await
.map(|ft| ft.is_dir())
.unwrap_or(false)
&& let Some(s) = entry.file_name().to_str()
&& let Some(v) = parse(s)
{
vec.push((v, entry.path()));
}
}
vec.sort_by_key(|(v, _)| Reverse(*v));
Ok(vec)
}
async fn collect_files<T, F>(parent: &Path, parse: F) -> io::Result<Vec<T>>
where
F: Fn(&str, &Path) -> Option<T>,
{
let mut dir = tokio::fs::read_dir(parent).await?;
let mut collected: Vec<T> = Vec::new();
while let Some(entry) = dir.next_entry().await? {
if entry
.file_type()
.await
.map(|ft| ft.is_file())
.unwrap_or(false)
&& let Some(s) = entry.file_name().to_str()
&& let Some(v) = parse(s, &entry.path())
{
collected.push(v);
}
}
Ok(collected)
}
async fn collect_flat_rollout_files(
root: &Path,
scanned_files: &mut usize,
) -> io::Result<Vec<(OffsetDateTime, Uuid, PathBuf)>> {
let mut dir = tokio::fs::read_dir(root).await?;
let mut collected = Vec::new();
while let Some(entry) = dir.next_entry().await? {
if *scanned_files >= MAX_SCAN_FILES {
break;
}
if !entry
.file_type()
.await
.map(|ft| ft.is_file())
.unwrap_or(false)
{
continue;
}
let file_name = entry.file_name();
let Some(name_str) = file_name.to_str() else {
continue;
};
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
continue;
}
let Some((ts, id)) = parse_timestamp_uuid_from_filename(name_str) else {
continue;
};
*scanned_files += 1;
if *scanned_files > MAX_SCAN_FILES {
break;
}
collected.push((ts, id, entry.path()));
}
collected.sort_by_key(|(ts, sid, _path)| (Reverse(*ts), Reverse(*sid)));
Ok(collected)
}
async fn collect_rollout_day_files(
day_path: &Path,
) -> io::Result<Vec<(OffsetDateTime, Uuid, PathBuf)>> {
let mut day_files = collect_files(day_path, |name_str, path| {
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
return None;
}
parse_timestamp_uuid_from_filename(name_str).map(|(ts, id)| (ts, id, path.to_path_buf()))
})
.await?;
day_files.sort_by_key(|(ts, sid, _path)| (Reverse(*ts), Reverse(*sid)));
Ok(day_files)
}
pub(crate) fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uuid)> {
let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
let (sep_idx, uuid) = core
.match_indices('-')
.rev()
.find_map(|(i, _)| Uuid::parse_str(&core[i + 1..]).ok().map(|u| (i, u)))?;
let ts_str = &core[..sep_idx];
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let ts = PrimitiveDateTime::parse(ts_str, format).ok()?.assume_utc();
Some((ts, uuid))
}
struct ThreadCandidate {
path: PathBuf,
id: Uuid,
updated_at: Option<OffsetDateTime>,
}
async fn collect_files_by_updated_at(
root: &Path,
scanned_files: &mut usize,
) -> io::Result<Vec<ThreadCandidate>> {
let mut candidates = Vec::new();
let mut visitor = FilesByUpdatedAtVisitor {
candidates: &mut candidates,
};
walk_rollout_files(root, scanned_files, &mut visitor).await?;
Ok(candidates)
}
async fn collect_flat_files_by_updated_at(
root: &Path,
scanned_files: &mut usize,
) -> io::Result<Vec<ThreadCandidate>> {
let mut candidates = Vec::new();
let mut dir = tokio::fs::read_dir(root).await?;
while let Some(entry) = dir.next_entry().await? {
if *scanned_files >= MAX_SCAN_FILES {
break;
}
if !entry
.file_type()
.await
.map(|ft| ft.is_file())
.unwrap_or(false)
{
continue;
}
let file_name = entry.file_name();
let Some(name_str) = file_name.to_str() else {
continue;
};
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
continue;
}
let Some((_ts, id)) = parse_timestamp_uuid_from_filename(name_str) else {
continue;
};
*scanned_files += 1;
if *scanned_files > MAX_SCAN_FILES {
break;
}
let updated_at = file_modified_time(&entry.path()).await.unwrap_or(None);
candidates.push(ThreadCandidate {
path: entry.path(),
id,
updated_at,
});
}
Ok(candidates)
}
async fn walk_rollout_files(
root: &Path,
scanned_files: &mut usize,
visitor: &mut impl RolloutFileVisitor,
) -> io::Result<()> {
let year_dirs = collect_dirs_desc(root, |s| s.parse::<u16>().ok()).await?;
'outer: for (_year, year_path) in year_dirs.iter() {
if *scanned_files >= MAX_SCAN_FILES {
break;
}
let month_dirs = collect_dirs_desc(year_path, |s| s.parse::<u8>().ok()).await?;
for (_month, month_path) in month_dirs.iter() {
if *scanned_files >= MAX_SCAN_FILES {
break 'outer;
}
let day_dirs = collect_dirs_desc(month_path, |s| s.parse::<u8>().ok()).await?;
for (_day, day_path) in day_dirs.iter() {
if *scanned_files >= MAX_SCAN_FILES {
break 'outer;
}
let day_files = collect_rollout_day_files(day_path).await?;
for (ts, id, path) in day_files.into_iter() {
*scanned_files += 1;
if *scanned_files > MAX_SCAN_FILES {
break 'outer;
}
if let ControlFlow::Break(()) =
visitor.visit(ts, id, path, *scanned_files).await
{
break 'outer;
}
}
}
}
}
Ok(())
}
struct ProviderMatcher<'a> {
filters: &'a [String],
matches_default_provider: bool,
}
impl<'a> ProviderMatcher<'a> {
fn new(filters: &'a [String], default_provider: &'a str) -> Option<Self> {
if filters.is_empty() {
return None;
}
let matches_default_provider = filters.iter().any(|provider| provider == default_provider);
Some(Self {
filters,
matches_default_provider,
})
}
fn matches(&self, session_provider: Option<&str>) -> bool {
match session_provider {
Some(provider) => self.filters.iter().any(|candidate| candidate == provider),
None => self.matches_default_provider,
}
}
}
async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTailSummary> {
use tokio::io::AsyncBufReadExt;
let file = tokio::fs::File::open(path).await?;
let reader = tokio::io::BufReader::new(file);
let mut lines = reader.lines();
let mut summary = HeadTailSummary::default();
let mut lines_scanned = 0usize;
let mut checked_schema_version = false;
while lines_scanned < head_limit
|| (summary.saw_session_meta
&& !summary.saw_user_event
&& lines_scanned < head_limit + USER_EVENT_SCAN_LIMIT)
{
let line_opt = lines.next_line().await?;
let Some(line) = line_opt else { break };
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
lines_scanned += 1;
let parsed: Result<serde_json::Value, _> = serde_json::from_str(trimmed);
let Ok(value) = parsed else { continue };
if !checked_schema_version {
validate_rollout_line_schema_version(&value)?;
checked_schema_version = true;
}
let parsed: Result<RolloutLine, _> = serde_json::from_value(value);
let Ok(rollout_line) = parsed else { continue };
match rollout_line.item {
RolloutItem::SessionMeta(session_meta_line) => {
if !is_supported_rollout_schema_version(
session_meta_line.meta.rollout_schema_version,
) {
return Err(super::recorder::unsupported_rollout_schema_error(
session_meta_line.meta.rollout_schema_version,
));
}
summary.source = Some(session_meta_line.meta.source.clone());
summary.model_provider = session_meta_line.meta.model_provider.clone();
summary.session_cwd = Some(session_meta_line.meta.cwd.clone());
summary.created_at = summary
.created_at
.clone()
.or_else(|| Some(rollout_line.timestamp.clone()));
summary.saw_session_meta = true;
if summary.head.len() < head_limit
&& let Ok(val) = serde_json::to_value(session_meta_line)
{
summary.head.push(val);
}
}
RolloutItem::TranscriptItem(item) => {
summary.created_at = summary
.created_at
.clone()
.or_else(|| Some(rollout_line.timestamp.clone()));
if summary.head.len() < head_limit
&& let Ok(val) = serde_json::to_value(item)
{
summary.head.push(val);
}
}
RolloutItem::TurnContext(_) => {
}
RolloutItem::GhostSnapshot(_)
| RolloutItem::Compacted(_)
| RolloutItem::InputSlimmingStoredInput(_)
| RolloutItem::Workflow(_) => {
}
RolloutItem::EventMsg(ev) => {
if matches!(ev, EventMsg::UserMessage(_)) {
summary.saw_user_event = true;
}
}
}
if summary.saw_session_meta && summary.saw_user_event {
break;
}
}
Ok(summary)
}
pub async fn read_head_for_summary(path: &Path) -> io::Result<Vec<serde_json::Value>> {
let summary = read_head_summary(path, HEAD_RECORD_LIMIT).await?;
Ok(summary.head)
}
pub async fn read_effective_thread_cwd(path: &Path) -> io::Result<Option<PathBuf>> {
let session_cwd = read_session_meta_line(path)
.await
.ok()
.map(|meta| meta.meta.cwd);
read_effective_thread_cwd_with_fallback(path, session_cwd.as_deref()).await
}
pub async fn read_session_meta_line(path: &Path) -> io::Result<SessionMetaLine> {
let head = read_head_for_summary(path).await?;
let Some(first) = head.first() else {
return Err(io::Error::other(format!(
"rollout at {} is empty",
path.display()
)));
};
serde_json::from_value::<SessionMetaLine>(first.clone()).map_err(|_| {
io::Error::other(format!(
"rollout at {} does not start with session metadata",
path.display()
))
})
}
async fn read_effective_thread_cwd_with_fallback(
path: &Path,
fallback_cwd: Option<&Path>,
) -> io::Result<Option<PathBuf>> {
if let Some(cwd) = read_latest_turn_context_cwd(path).await? {
return Ok(Some(cwd));
}
Ok(fallback_cwd.map(Path::to_path_buf))
}
async fn read_latest_turn_context_cwd(path: &Path) -> io::Result<Option<PathBuf>> {
let mut file = tokio::fs::File::open(path).await?;
let mut offset = file.metadata().await?.len();
let mut trailing = Vec::new();
while offset > 0 {
let chunk_size = offset.min(TAIL_SCAN_CHUNK_SIZE as u64) as usize;
offset -= chunk_size as u64;
file.seek(std::io::SeekFrom::Start(offset)).await?;
let mut chunk = vec![0; chunk_size];
file.read_exact(&mut chunk).await?;
chunk.extend_from_slice(&trailing);
let mut line_end = chunk.len();
while let Some(newline_idx) = chunk[..line_end].iter().rposition(|byte| *byte == b'\n') {
if let Some(cwd) = parse_turn_context_cwd(&chunk[newline_idx + 1..line_end]) {
return Ok(Some(cwd));
}
line_end = newline_idx;
}
trailing = chunk[..line_end].to_vec();
}
Ok(parse_turn_context_cwd(&trailing))
}
fn parse_turn_context_cwd(line: &[u8]) -> Option<PathBuf> {
let line = std::str::from_utf8(line).ok()?;
let trimmed = line.trim();
if trimmed.is_empty() {
return None;
}
let rollout_line = serde_json::from_str::<RolloutLine>(trimmed).ok()?;
let RolloutItem::TurnContext(item) = rollout_line.item else {
return None;
};
Some(item.cwd)
}
fn paths_match(a: &Path, b: &Path) -> bool {
if let (Ok(canonical_a), Ok(canonical_b)) = (
path_utils::normalize_for_path_comparison(a),
path_utils::normalize_for_path_comparison(b),
) {
return canonical_a == canonical_b;
}
a == b
}
async fn file_modified_time(path: &Path) -> io::Result<Option<OffsetDateTime>> {
let meta = tokio::fs::metadata(path).await?;
let modified = meta.modified().ok();
let Some(modified) = modified else {
return Ok(None);
};
let dt = OffsetDateTime::from(modified);
Ok(truncate_to_seconds(dt))
}
fn format_rfc3339(dt: OffsetDateTime) -> Option<String> {
dt.format(&Rfc3339).ok()
}
fn truncate_to_seconds(dt: OffsetDateTime) -> Option<OffsetDateTime> {
dt.replace_nanosecond(0).ok()
}
async fn find_thread_path_by_id_str_in_subdir(
lha_home: &Path,
subdir: &str,
id_str: &str,
) -> io::Result<Option<PathBuf>> {
if Uuid::parse_str(id_str).is_err() {
return Ok(None);
}
let mut root = lha_home.to_path_buf();
root.push(subdir);
if !root.exists() {
return Ok(None);
}
#[allow(clippy::unwrap_used)]
let limit = NonZero::new(1).unwrap();
let options = file_search::FileSearchOptions {
limit,
compute_indices: false,
respect_gitignore: false,
..Default::default()
};
let results = file_search::run(id_str, vec![root], options, None)
.map_err(|e| io::Error::other(format!("file search failed: {e}")))?;
let found = results.matches.into_iter().next().map(|m| m.full_path());
let archived_only = match subdir {
SESSIONS_SUBDIR => Some(false),
ARCHIVED_SESSIONS_SUBDIR => Some(true),
_ => None,
};
let state_db_ctx = state_db::open_if_present(lha_home, "").await;
if let Some(state_db_ctx) = state_db_ctx.as_deref()
&& let Ok(thread_id) = ThreadId::from_string(id_str)
{
let db_path = state_db::find_rollout_path_by_id(
Some(state_db_ctx),
thread_id,
archived_only,
"find_path_query",
)
.await;
let canonical_path = found.as_deref();
if db_path.as_deref() != canonical_path {
tracing::warn!(
"state db path mismatch for thread {thread_id:?}: canonical={canonical_path:?} db={db_path:?}"
);
state_db::record_discrepancy("find_thread_path_by_id_str_in_subdir", "path_mismatch");
}
}
Ok(found)
}
pub async fn find_thread_path_by_id_str(
lha_home: &Path,
id_str: &str,
) -> io::Result<Option<PathBuf>> {
find_thread_path_by_id_str_in_subdir(lha_home, SESSIONS_SUBDIR, id_str).await
}
pub async fn find_archived_thread_path_by_id_str(
lha_home: &Path,
id_str: &str,
) -> io::Result<Option<PathBuf>> {
find_thread_path_by_id_str_in_subdir(lha_home, ARCHIVED_SESSIONS_SUBDIR, id_str).await
}
pub fn rollout_date_parts(file_name: &OsStr) -> Option<(String, String, String)> {
let name = file_name.to_string_lossy();
let date = name.strip_prefix("rollout-")?.get(..10)?;
let year = date.get(..4)?.to_string();
let month = date.get(5..7)?.to_string();
let day = date.get(8..10)?.to_string();
Some((year, month, day))
}