#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::ffi::OsStr;
use std::fs::File;
use std::fs::FileTimes;
use std::fs::{self};
use std::io::Write;
use std::path::Path;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use time::Duration;
use time::OffsetDateTime;
use time::PrimitiveDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
use uuid::Uuid;
use crate::product::agent::rollout::INTERACTIVE_SESSION_SOURCES;
use crate::product::agent::rollout::RolloutRecorder;
use crate::product::agent::rollout::is_unsupported_rollout_schema_error;
use crate::product::agent::rollout::list::Cursor;
use crate::product::agent::rollout::list::ThreadItem;
use crate::product::agent::rollout::list::ThreadSortKey;
use crate::product::agent::rollout::list::ThreadsPage;
use crate::product::agent::rollout::list::get_threads;
use crate::product::agent::rollout::list::read_effective_thread_cwd;
use crate::product::agent::rollout::list::read_head_for_summary;
use crate::product::agent::rollout::rollout_date_parts;
use crate::product::protocol::ThreadId;
use crate::product::protocol::config_types::ReasoningSummary;
use crate::product::protocol::models::ContentItem;
use crate::product::protocol::models::TranscriptItem;
use crate::product::protocol::protocol::AskForApproval;
use crate::product::protocol::protocol::EventMsg;
use crate::product::protocol::protocol::ROLLOUT_SCHEMA_VERSION_V3;
use crate::product::protocol::protocol::RolloutItem;
use crate::product::protocol::protocol::RolloutLine;
use crate::product::protocol::protocol::SandboxPolicy;
use crate::product::protocol::protocol::SessionMeta;
use crate::product::protocol::protocol::SessionMetaLine;
use crate::product::protocol::protocol::SessionSource;
use crate::product::protocol::protocol::TurnContextItem;
use crate::product::protocol::protocol::UserMessageEvent;
use anyhow::Result;
const NO_SOURCE_FILTER: &[SessionSource] = &[];
const TEST_PROVIDER: &str = "test-provider";
fn session_meta_payload(mut payload: serde_json::Value) -> serde_json::Value {
payload["rollout_schema_version"] = serde_json::Value::from(ROLLOUT_SCHEMA_VERSION_V3);
payload
}
fn provider_vec(providers: &[&str]) -> Vec<String> {
providers
.iter()
.map(std::string::ToString::to_string)
.collect()
}
#[test]
fn rollout_date_parts_extracts_directory_components() {
let file_name = OsStr::new("rollout-2025-03-01T09-00-00-123.jsonl");
let parts = rollout_date_parts(file_name);
assert_eq!(
parts,
Some(("2025".to_string(), "03".to_string(), "01".to_string()))
);
}
fn write_session_file(
root: &Path,
ts_str: &str,
uuid: Uuid,
num_records: usize,
source: Option<SessionSource>,
) -> std::io::Result<(OffsetDateTime, Uuid)> {
write_session_file_with_provider(
root,
ts_str,
uuid,
num_records,
source,
Some("test-provider"),
)
}
fn write_session_file_with_provider(
root: &Path,
ts_str: &str,
uuid: Uuid,
num_records: usize,
source: Option<SessionSource>,
model_provider: Option<&str>,
) -> std::io::Result<(OffsetDateTime, Uuid)> {
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let dt = PrimitiveDateTime::parse(ts_str, format)
.unwrap()
.assume_utc();
let dir = root
.join("sessions")
.join(format!("{:04}", dt.year()))
.join(format!("{:02}", u8::from(dt.month())))
.join(format!("{:02}", dt.day()));
fs::create_dir_all(&dir)?;
let filename = format!("rollout-{ts_str}-{uuid}.jsonl");
let file_path = dir.join(filename);
let mut file = File::create(file_path)?;
let mut payload = serde_json::json!({
"id": uuid,
"timestamp": ts_str,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"base_instructions": null,
});
if let Some(source) = source {
payload["source"] = serde_json::to_value(source).unwrap();
}
if let Some(provider) = model_provider {
payload["model_provider"] = serde_json::Value::String(provider.to_string());
}
let meta = serde_json::json!({
"timestamp": ts_str,
"type": "session_meta",
"payload": session_meta_payload(payload),
});
writeln!(file, "{meta}")?;
let user_event = serde_json::json!({
"timestamp": ts_str,
"type": "event_msg",
"payload": {
"type": "user_message",
"message": "Hello from user",
"kind": "plain"
}
});
writeln!(file, "{user_event}")?;
for i in 0..num_records {
let rec = serde_json::json!({
"record_type": "response",
"index": i
});
writeln!(file, "{rec}")?;
}
let times = FileTimes::new().set_modified(dt.into());
file.set_times(times)?;
Ok((dt, uuid))
}
fn write_session_file_with_cwds(
root: &Path,
ts_str: &str,
uuid: Uuid,
session_cwd: &Path,
latest_turn_context_cwd: Option<&Path>,
model_provider: Option<&str>,
) -> std::io::Result<(OffsetDateTime, Uuid)> {
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let dt = PrimitiveDateTime::parse(ts_str, format)
.unwrap()
.assume_utc();
let dir = root
.join("sessions")
.join(format!("{:04}", dt.year()))
.join(format!("{:02}", u8::from(dt.month())))
.join(format!("{:02}", dt.day()));
fs::create_dir_all(&dir)?;
let filename = format!("rollout-{ts_str}-{uuid}.jsonl");
let file_path = dir.join(filename);
let mut file = File::create(file_path)?;
let mut payload = serde_json::json!({
"id": uuid,
"timestamp": ts_str,
"cwd": session_cwd,
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"base_instructions": null,
});
if let Some(provider) = model_provider {
payload["model_provider"] = serde_json::Value::String(provider.to_string());
}
let meta = serde_json::json!({
"timestamp": ts_str,
"type": "session_meta",
"payload": session_meta_payload(payload),
});
writeln!(file, "{meta}")?;
let user_event = serde_json::json!({
"timestamp": ts_str,
"type": "event_msg",
"payload": {
"type": "user_message",
"message": "Hello from user",
"kind": "plain"
}
});
writeln!(file, "{user_event}")?;
if let Some(turn_context_cwd) = latest_turn_context_cwd {
let turn_context = RolloutLine {
timestamp: ts_str.to_string(),
item: RolloutItem::TurnContext(TurnContextItem {
cwd: turn_context_cwd.to_path_buf(),
approval_policy: AskForApproval::OnRequest,
sandbox_policy: SandboxPolicy::ReadOnly,
model: "test-model".to_string(),
personality: None,
identity: None,
effort: None,
summary: ReasoningSummary::Auto,
user_instructions: None,
developer_instructions: None,
final_output_json_schema: None,
truncation_policy: None,
}),
};
writeln!(file, "{}", serde_json::to_string(&turn_context).unwrap())?;
}
let times = FileTimes::new().set_modified(dt.into());
file.set_times(times)?;
Ok((dt, uuid))
}
fn write_session_file_with_delayed_user_event(
root: &Path,
ts_str: &str,
uuid: Uuid,
meta_lines_before_user: usize,
) -> std::io::Result<()> {
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let dt = PrimitiveDateTime::parse(ts_str, format)
.unwrap()
.assume_utc();
let dir = root
.join("sessions")
.join(format!("{:04}", dt.year()))
.join(format!("{:02}", u8::from(dt.month())))
.join(format!("{:02}", dt.day()));
fs::create_dir_all(&dir)?;
let filename = format!("rollout-{ts_str}-{uuid}.jsonl");
let file_path = dir.join(filename);
let mut file = File::create(file_path)?;
for i in 0..meta_lines_before_user {
let id = if i == 0 {
uuid
} else {
Uuid::from_u128(100 + i as u128)
};
let payload = serde_json::json!({
"id": id,
"timestamp": ts_str,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
});
let meta = serde_json::json!({
"timestamp": ts_str,
"type": "session_meta",
"payload": session_meta_payload(payload),
});
writeln!(file, "{meta}")?;
}
let user_event = serde_json::json!({
"timestamp": ts_str,
"type": "event_msg",
"payload": {"type": "user_message", "message": "Hello from user", "kind": "plain"}
});
writeln!(file, "{user_event}")?;
let times = FileTimes::new().set_modified(dt.into());
file.set_times(times)?;
Ok(())
}
fn write_session_file_with_meta_payload(
root: &Path,
ts_str: &str,
uuid: Uuid,
payload: serde_json::Value,
) -> std::io::Result<()> {
write_session_file_with_meta_payload_and_schema_version(
root,
ts_str,
uuid,
payload,
Some(ROLLOUT_SCHEMA_VERSION_V3),
)
.map(|_| ())
}
fn write_session_file_with_meta_payload_and_schema_version(
root: &Path,
ts_str: &str,
uuid: Uuid,
mut payload: serde_json::Value,
schema_version: Option<u32>,
) -> std::io::Result<std::path::PathBuf> {
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let dt = PrimitiveDateTime::parse(ts_str, format)
.unwrap()
.assume_utc();
let dir = root
.join("sessions")
.join(format!("{:04}", dt.year()))
.join(format!("{:02}", u8::from(dt.month())))
.join(format!("{:02}", dt.day()));
fs::create_dir_all(&dir)?;
let filename = format!("rollout-{ts_str}-{uuid}.jsonl");
let file_path = dir.join(filename);
let mut file = File::create(file_path)?;
if let Some(schema_version) = schema_version {
payload["rollout_schema_version"] = serde_json::Value::from(schema_version);
}
let meta = serde_json::json!({
"timestamp": ts_str,
"type": "session_meta",
"payload": payload,
});
writeln!(file, "{meta}")?;
let user_event = serde_json::json!({
"timestamp": ts_str,
"type": "event_msg",
"payload": {"type": "user_message", "message": "Hello from user", "kind": "plain"}
});
writeln!(file, "{user_event}")?;
let times = FileTimes::new().set_modified(dt.into());
file.set_times(times)?;
Ok(dir.join(format!("rollout-{ts_str}-{uuid}.jsonl")))
}
fn minimal_meta_payload(uuid: Uuid, ts_str: &str) -> serde_json::Value {
serde_json::json!({
"id": uuid,
"timestamp": ts_str,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
"base_instructions": null,
})
}
#[tokio::test]
async fn load_rollout_items_accepts_v3_schema_version() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let ts = "2025-04-03T10-30-00";
let uuid = Uuid::from_u128(102);
let path = write_session_file_with_meta_payload_and_schema_version(
home,
ts,
uuid,
minimal_meta_payload(uuid, ts),
Some(ROLLOUT_SCHEMA_VERSION_V3),
)
.unwrap();
let history = RolloutRecorder::get_rollout_history(&path)
.await
.expect("v3 rollout should load");
assert!(matches!(
history,
crate::product::protocol::protocol::InitialHistory::Resumed(_)
));
}
#[tokio::test]
async fn load_rollout_items_rejects_v2_schema_version() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let ts = "2025-04-04T10-30-00";
let uuid = Uuid::from_u128(103);
let path = write_session_file_with_meta_payload_and_schema_version(
home,
ts,
uuid,
minimal_meta_payload(uuid, ts),
Some(2),
)
.unwrap();
let err = RolloutRecorder::get_rollout_history(&path)
.await
.expect_err("v2 rollout should be unsupported");
assert!(is_unsupported_rollout_schema_error(&err));
}
#[tokio::test]
async fn load_rollout_items_rejects_missing_schema_version() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let ts = "2025-04-05T10-30-00";
let uuid = Uuid::from_u128(104);
let path = write_session_file_with_meta_payload_and_schema_version(
home,
ts,
uuid,
minimal_meta_payload(uuid, ts),
None,
)
.unwrap();
let err = RolloutRecorder::get_rollout_history(&path)
.await
.expect_err("missing rollout schema should be unsupported");
assert!(is_unsupported_rollout_schema_error(&err));
}
#[tokio::test]
async fn read_head_for_summary_rejects_v2_schema_version() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let ts = "2025-04-06T10-30-00";
let uuid = Uuid::from_u128(105);
let path = write_session_file_with_meta_payload_and_schema_version(
home,
ts,
uuid,
minimal_meta_payload(uuid, ts),
Some(2),
)
.unwrap();
let err = read_head_for_summary(&path)
.await
.expect_err("v2 rollout should be unsupported");
assert!(is_unsupported_rollout_schema_error(&err));
}
#[tokio::test]
async fn read_head_for_summary_rejects_missing_schema_version() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let ts = "2025-04-07T10-30-00";
let uuid = Uuid::from_u128(106);
let path = write_session_file_with_meta_payload_and_schema_version(
home,
ts,
uuid,
minimal_meta_payload(uuid, ts),
None,
)
.unwrap();
let err = read_head_for_summary(&path)
.await
.expect_err("missing rollout schema should be unsupported");
assert!(is_unsupported_rollout_schema_error(&err));
}
#[tokio::test]
async fn thread_list_skips_unsupported_rollout_schema_versions() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let valid_uuid = Uuid::from_u128(107);
let v2_uuid = Uuid::from_u128(108);
let missing_uuid = Uuid::from_u128(109);
write_session_file_with_meta_payload_and_schema_version(
home,
"2025-04-08T10-30-00",
valid_uuid,
minimal_meta_payload(valid_uuid, "2025-04-08T10-30-00"),
Some(ROLLOUT_SCHEMA_VERSION_V3),
)
.unwrap();
write_session_file_with_meta_payload_and_schema_version(
home,
"2025-04-09T10-30-00",
v2_uuid,
minimal_meta_payload(v2_uuid, "2025-04-09T10-30-00"),
Some(2),
)
.unwrap();
write_session_file_with_meta_payload_and_schema_version(
home,
"2025-04-10T10-30-00",
missing_uuid,
minimal_meta_payload(missing_uuid, "2025-04-10T10-30-00"),
None,
)
.unwrap();
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page = get_threads(
home,
10,
None,
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
None,
)
.await
.unwrap();
assert_eq!(page.items.len(), 1);
assert!(
page.items[0]
.path
.file_name()
.and_then(|name| name.to_str())
.is_some_and(|name| name.contains(&valid_uuid.to_string()))
);
}
#[tokio::test]
async fn test_list_conversations_latest_first() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let u1 = Uuid::from_u128(1);
let u2 = Uuid::from_u128(2);
let u3 = Uuid::from_u128(3);
write_session_file(
home,
"2025-01-01T12-00-00",
u1,
3,
Some(SessionSource::VSCode),
)
.unwrap();
write_session_file(
home,
"2025-01-02T12-00-00",
u2,
3,
Some(SessionSource::VSCode),
)
.unwrap();
write_session_file(
home,
"2025-01-03T12-00-00",
u3,
3,
Some(SessionSource::VSCode),
)
.unwrap();
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page = get_threads(
home,
10,
None,
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
None,
)
.await
.unwrap();
let p1 = home
.join("sessions")
.join("2025")
.join("01")
.join("03")
.join(format!("rollout-2025-01-03T12-00-00-{u3}.jsonl"));
let p2 = home
.join("sessions")
.join("2025")
.join("01")
.join("02")
.join(format!("rollout-2025-01-02T12-00-00-{u2}.jsonl"));
let p3 = home
.join("sessions")
.join("2025")
.join("01")
.join("01")
.join(format!("rollout-2025-01-01T12-00-00-{u1}.jsonl"));
let head_3 = vec![serde_json::json!({
"id": u3,
"timestamp": "2025-01-03T12-00-00",
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
"base_instructions": null,
"rollout_schema_version": ROLLOUT_SCHEMA_VERSION_V3,
})];
let head_2 = vec![serde_json::json!({
"id": u2,
"timestamp": "2025-01-02T12-00-00",
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
"base_instructions": null,
"rollout_schema_version": ROLLOUT_SCHEMA_VERSION_V3,
})];
let head_1 = vec![serde_json::json!({
"id": u1,
"timestamp": "2025-01-01T12-00-00",
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
"base_instructions": null,
"rollout_schema_version": ROLLOUT_SCHEMA_VERSION_V3,
})];
let updated_times: Vec<Option<String>> =
page.items.iter().map(|i| i.updated_at.clone()).collect();
let expected = ThreadsPage {
items: vec![
ThreadItem {
path: p1,
head: head_3,
cwd: Some(".".into()),
created_at: Some("2025-01-03T12-00-00".into()),
updated_at: updated_times.first().cloned().flatten(),
},
ThreadItem {
path: p2,
head: head_2,
cwd: Some(".".into()),
created_at: Some("2025-01-02T12-00-00".into()),
updated_at: updated_times.get(1).cloned().flatten(),
},
ThreadItem {
path: p3,
head: head_1,
cwd: Some(".".into()),
created_at: Some("2025-01-01T12-00-00".into()),
updated_at: updated_times.get(2).cloned().flatten(),
},
],
next_cursor: None,
num_scanned_files: 3,
reached_scan_cap: false,
};
assert_eq!(page, expected);
}
#[tokio::test]
async fn test_pagination_cursor() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let u1 = Uuid::from_u128(11);
let u2 = Uuid::from_u128(22);
let u3 = Uuid::from_u128(33);
let u4 = Uuid::from_u128(44);
let u5 = Uuid::from_u128(55);
write_session_file(
home,
"2025-03-01T09-00-00",
u1,
1,
Some(SessionSource::VSCode),
)
.unwrap();
write_session_file(
home,
"2025-03-02T09-00-00",
u2,
1,
Some(SessionSource::VSCode),
)
.unwrap();
write_session_file(
home,
"2025-03-03T09-00-00",
u3,
1,
Some(SessionSource::VSCode),
)
.unwrap();
write_session_file(
home,
"2025-03-04T09-00-00",
u4,
1,
Some(SessionSource::VSCode),
)
.unwrap();
write_session_file(
home,
"2025-03-05T09-00-00",
u5,
1,
Some(SessionSource::VSCode),
)
.unwrap();
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page1 = get_threads(
home,
2,
None,
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
None,
)
.await
.unwrap();
let p5 = home
.join("sessions")
.join("2025")
.join("03")
.join("05")
.join(format!("rollout-2025-03-05T09-00-00-{u5}.jsonl"));
let p4 = home
.join("sessions")
.join("2025")
.join("03")
.join("04")
.join(format!("rollout-2025-03-04T09-00-00-{u4}.jsonl"));
let head_5 = vec![serde_json::json!({
"id": u5,
"timestamp": "2025-03-05T09-00-00",
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
"base_instructions": null,
"rollout_schema_version": ROLLOUT_SCHEMA_VERSION_V3,
})];
let head_4 = vec![serde_json::json!({
"id": u4,
"timestamp": "2025-03-04T09-00-00",
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
"base_instructions": null,
"rollout_schema_version": ROLLOUT_SCHEMA_VERSION_V3,
})];
let updated_page1: Vec<Option<String>> =
page1.items.iter().map(|i| i.updated_at.clone()).collect();
let expected_cursor1: Cursor =
serde_json::from_str(&format!("\"2025-03-04T09-00-00|{u4}\"")).unwrap();
let expected_page1 = ThreadsPage {
items: vec![
ThreadItem {
path: p5,
head: head_5,
cwd: Some(".".into()),
created_at: Some("2025-03-05T09-00-00".into()),
updated_at: updated_page1.first().cloned().flatten(),
},
ThreadItem {
path: p4,
head: head_4,
cwd: Some(".".into()),
created_at: Some("2025-03-04T09-00-00".into()),
updated_at: updated_page1.get(1).cloned().flatten(),
},
],
next_cursor: Some(expected_cursor1.clone()),
num_scanned_files: 3, reached_scan_cap: false,
};
assert_eq!(page1, expected_page1);
let page2 = get_threads(
home,
2,
page1.next_cursor.as_ref(),
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
None,
)
.await
.unwrap();
let p3 = home
.join("sessions")
.join("2025")
.join("03")
.join("03")
.join(format!("rollout-2025-03-03T09-00-00-{u3}.jsonl"));
let p2 = home
.join("sessions")
.join("2025")
.join("03")
.join("02")
.join(format!("rollout-2025-03-02T09-00-00-{u2}.jsonl"));
let head_3 = vec![serde_json::json!({
"id": u3,
"timestamp": "2025-03-03T09-00-00",
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
"base_instructions": null,
"rollout_schema_version": ROLLOUT_SCHEMA_VERSION_V3,
})];
let head_2 = vec![serde_json::json!({
"id": u2,
"timestamp": "2025-03-02T09-00-00",
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
"base_instructions": null,
"rollout_schema_version": ROLLOUT_SCHEMA_VERSION_V3,
})];
let updated_page2: Vec<Option<String>> =
page2.items.iter().map(|i| i.updated_at.clone()).collect();
let expected_cursor2: Cursor =
serde_json::from_str(&format!("\"2025-03-02T09-00-00|{u2}\"")).unwrap();
let expected_page2 = ThreadsPage {
items: vec![
ThreadItem {
path: p3,
head: head_3,
cwd: Some(".".into()),
created_at: Some("2025-03-03T09-00-00".into()),
updated_at: updated_page2.first().cloned().flatten(),
},
ThreadItem {
path: p2,
head: head_2,
cwd: Some(".".into()),
created_at: Some("2025-03-02T09-00-00".into()),
updated_at: updated_page2.get(1).cloned().flatten(),
},
],
next_cursor: Some(expected_cursor2.clone()),
num_scanned_files: 5, reached_scan_cap: false,
};
assert_eq!(page2, expected_page2);
let page3 = get_threads(
home,
2,
page2.next_cursor.as_ref(),
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
None,
)
.await
.unwrap();
let p1 = home
.join("sessions")
.join("2025")
.join("03")
.join("01")
.join(format!("rollout-2025-03-01T09-00-00-{u1}.jsonl"));
let head_1 = vec![serde_json::json!({
"id": u1,
"timestamp": "2025-03-01T09-00-00",
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
"base_instructions": null,
"rollout_schema_version": ROLLOUT_SCHEMA_VERSION_V3,
})];
let updated_page3: Vec<Option<String>> =
page3.items.iter().map(|i| i.updated_at.clone()).collect();
let expected_page3 = ThreadsPage {
items: vec![ThreadItem {
path: p1,
head: head_1,
cwd: Some(".".into()),
created_at: Some("2025-03-01T09-00-00".into()),
updated_at: updated_page3.first().cloned().flatten(),
}],
next_cursor: None,
num_scanned_files: 5, reached_scan_cap: false,
};
assert_eq!(page3, expected_page3);
}
#[tokio::test]
async fn test_list_threads_scans_past_head_for_user_event() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let uuid = Uuid::from_u128(99);
let ts = "2025-05-01T10-30-00";
write_session_file_with_delayed_user_event(home, ts, uuid, 12).unwrap();
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page = get_threads(
home,
10,
None,
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
None,
)
.await
.unwrap();
assert_eq!(page.items.len(), 1);
}
#[tokio::test]
async fn test_get_thread_contents() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let uuid = Uuid::new_v4();
let ts = "2025-04-01T10-30-00";
write_session_file(home, ts, uuid, 2, Some(SessionSource::VSCode)).unwrap();
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page = get_threads(
home,
1,
None,
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
None,
)
.await
.unwrap();
let path = &page.items[0].path;
let content = tokio::fs::read_to_string(path).await.unwrap();
let expected_path = home
.join("sessions")
.join("2025")
.join("04")
.join("01")
.join(format!("rollout-2025-04-01T10-30-00-{uuid}.jsonl"));
let expected_head = vec![serde_json::json!({
"id": uuid,
"timestamp": ts,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
"base_instructions": null,
"rollout_schema_version": ROLLOUT_SCHEMA_VERSION_V3,
})];
let expected_page = ThreadsPage {
items: vec![ThreadItem {
path: expected_path,
head: expected_head,
cwd: Some(".".into()),
created_at: Some(ts.into()),
updated_at: page.items[0].updated_at.clone(),
}],
next_cursor: None,
num_scanned_files: 1,
reached_scan_cap: false,
};
assert_eq!(page, expected_page);
let meta = serde_json::json!({
"timestamp": ts,
"type": "session_meta",
"payload": {
"id": uuid,
"timestamp": ts,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"base_instructions": null,
"source": "vscode",
"model_provider": "test-provider",
"rollout_schema_version": ROLLOUT_SCHEMA_VERSION_V3,
}
});
let user_event = serde_json::json!({
"timestamp": ts,
"type": "event_msg",
"payload": {"type": "user_message", "message": "Hello from user", "kind": "plain"}
});
let rec0 = serde_json::json!({"record_type": "response", "index": 0});
let rec1 = serde_json::json!({"record_type": "response", "index": 1});
let expected_content = format!("{meta}\n{user_event}\n{rec0}\n{rec1}\n");
assert_eq!(content, expected_content);
}
#[tokio::test]
async fn test_base_instructions_missing_in_meta_defaults_to_null() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let ts = "2025-04-02T10-30-00";
let uuid = Uuid::from_u128(101);
let payload = serde_json::json!({
"id": uuid,
"timestamp": ts,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
});
write_session_file_with_meta_payload(home, ts, uuid, payload).unwrap();
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page = get_threads(
home,
1,
None,
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
None,
)
.await
.unwrap();
let head = page
.items
.first()
.and_then(|item| item.head.first())
.expect("session meta head");
assert_eq!(
head.get("base_instructions"),
Some(&serde_json::Value::Null)
);
}
#[tokio::test]
async fn test_base_instructions_present_in_meta_is_preserved() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let ts = "2025-04-03T10-30-00";
let uuid = Uuid::from_u128(102);
let base_text = "Custom base instructions";
let payload = serde_json::json!({
"id": uuid,
"timestamp": ts,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
"base_instructions": {"text": base_text},
});
write_session_file_with_meta_payload(home, ts, uuid, payload).unwrap();
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page = get_threads(
home,
1,
None,
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
None,
)
.await
.unwrap();
let head = page
.items
.first()
.and_then(|item| item.head.first())
.expect("session meta head");
let base = head
.get("base_instructions")
.and_then(|value| value.get("text"))
.and_then(serde_json::Value::as_str);
assert_eq!(base, Some(base_text));
}
#[tokio::test]
async fn test_created_at_sort_uses_file_mtime_for_updated_at() -> Result<()> {
let temp = TempDir::new().unwrap();
let home = temp.path();
let ts = "2025-06-01T08-00-00";
let uuid = Uuid::from_u128(43);
write_session_file(home, ts, uuid, 0, Some(SessionSource::VSCode)).unwrap();
let created = PrimitiveDateTime::parse(
ts,
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]"),
)?
.assume_utc();
let updated = created + Duration::hours(2);
let expected_updated = updated.format(&time::format_description::well_known::Rfc3339)?;
let file_path = home
.join("sessions")
.join("2025")
.join("06")
.join("01")
.join(format!("rollout-{ts}-{uuid}.jsonl"));
let file = std::fs::OpenOptions::new().write(true).open(&file_path)?;
let times = FileTimes::new().set_modified(updated.into());
file.set_times(times)?;
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page = get_threads(
home,
1,
None,
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
None,
)
.await?;
let item = page.items.first().expect("conversation item");
assert_eq!(item.created_at.as_deref(), Some(ts));
assert_eq!(item.updated_at.as_deref(), Some(expected_updated.as_str()));
Ok(())
}
#[tokio::test]
async fn test_updated_at_uses_file_mtime() -> Result<()> {
let temp = TempDir::new().unwrap();
let home = temp.path();
let ts = "2025-06-01T08-00-00";
let uuid = Uuid::from_u128(42);
let day_dir = home.join("sessions").join("2025").join("06").join("01");
fs::create_dir_all(&day_dir)?;
let file_path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl"));
let mut file = File::create(&file_path)?;
let conversation_id = ThreadId::from_string(&uuid.to_string())?;
let meta_line = RolloutLine {
timestamp: ts.to_string(),
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
id: conversation_id,
forked_from_id: None,
timestamp: ts.to_string(),
cwd: ".".into(),
originator: "test_originator".into(),
cli_version: "test_version".into(),
rollout_schema_version:
crate::product::protocol::protocol::ROLLOUT_SCHEMA_VERSION_V3,
source: SessionSource::VSCode,
model_provider: Some("test-provider".into()),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
},
git: None,
}),
};
writeln!(file, "{}", serde_json::to_string(&meta_line)?)?;
let user_event_line = RolloutLine {
timestamp: ts.to_string(),
item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "hello".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
};
writeln!(file, "{}", serde_json::to_string(&user_event_line)?)?;
let total_messages = 12usize;
for idx in 0..total_messages {
let response_line = RolloutLine {
timestamp: format!("{ts}-{idx:02}"),
item: RolloutItem::TranscriptItem(TranscriptItem::Message {
id: None,
role: "assistant".into(),
content: vec![ContentItem::OutputText {
text: format!("reply-{idx}"),
}],
end_turn: None,
}),
};
writeln!(file, "{}", serde_json::to_string(&response_line)?)?;
}
drop(file);
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page = get_threads(
home,
1,
None,
ThreadSortKey::UpdatedAt,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
None,
)
.await?;
let item = page.items.first().expect("conversation item");
assert_eq!(item.created_at.as_deref(), Some(ts));
let updated = item
.updated_at
.as_deref()
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc))
.expect("updated_at set from file mtime");
let now = chrono::Utc::now();
let age = now - updated;
assert!(age.num_seconds().abs() < 30);
Ok(())
}
#[tokio::test]
async fn test_stable_ordering_same_second_pagination() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let ts = "2025-07-01T00-00-00";
let u1 = Uuid::from_u128(1);
let u2 = Uuid::from_u128(2);
let u3 = Uuid::from_u128(3);
write_session_file(home, ts, u1, 0, Some(SessionSource::VSCode)).unwrap();
write_session_file(home, ts, u2, 0, Some(SessionSource::VSCode)).unwrap();
write_session_file(home, ts, u3, 0, Some(SessionSource::VSCode)).unwrap();
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page1 = get_threads(
home,
2,
None,
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
None,
)
.await
.unwrap();
let p3 = home
.join("sessions")
.join("2025")
.join("07")
.join("01")
.join(format!("rollout-2025-07-01T00-00-00-{u3}.jsonl"));
let p2 = home
.join("sessions")
.join("2025")
.join("07")
.join("01")
.join(format!("rollout-2025-07-01T00-00-00-{u2}.jsonl"));
let head = |u: Uuid| -> Vec<serde_json::Value> {
vec![serde_json::json!({
"id": u,
"timestamp": ts,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
"base_instructions": null,
"rollout_schema_version": ROLLOUT_SCHEMA_VERSION_V3,
})]
};
let updated_page1: Vec<Option<String>> =
page1.items.iter().map(|i| i.updated_at.clone()).collect();
let expected_cursor1: Cursor = serde_json::from_str(&format!("\"{ts}|{u2}\"")).unwrap();
let expected_page1 = ThreadsPage {
items: vec![
ThreadItem {
path: p3,
head: head(u3),
cwd: Some(".".into()),
created_at: Some(ts.to_string()),
updated_at: updated_page1.first().cloned().flatten(),
},
ThreadItem {
path: p2,
head: head(u2),
cwd: Some(".".into()),
created_at: Some(ts.to_string()),
updated_at: updated_page1.get(1).cloned().flatten(),
},
],
next_cursor: Some(expected_cursor1.clone()),
num_scanned_files: 3, reached_scan_cap: false,
};
assert_eq!(page1, expected_page1);
let page2 = get_threads(
home,
2,
page1.next_cursor.as_ref(),
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
None,
)
.await
.unwrap();
let p1 = home
.join("sessions")
.join("2025")
.join("07")
.join("01")
.join(format!("rollout-2025-07-01T00-00-00-{u1}.jsonl"));
let updated_page2: Vec<Option<String>> =
page2.items.iter().map(|i| i.updated_at.clone()).collect();
let expected_page2 = ThreadsPage {
items: vec![ThreadItem {
path: p1,
head: head(u1),
cwd: Some(".".into()),
created_at: Some(ts.to_string()),
updated_at: updated_page2.first().cloned().flatten(),
}],
next_cursor: None,
num_scanned_files: 3, reached_scan_cap: false,
};
assert_eq!(page2, expected_page2);
}
#[tokio::test]
async fn test_source_filter_excludes_non_matching_sessions() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let interactive_id = Uuid::from_u128(42);
let non_interactive_id = Uuid::from_u128(77);
write_session_file(
home,
"2025-08-02T10-00-00",
interactive_id,
2,
Some(SessionSource::Cli),
)
.unwrap();
write_session_file(
home,
"2025-08-01T10-00-00",
non_interactive_id,
2,
Some(SessionSource::Exec),
)
.unwrap();
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let interactive_only = get_threads(
home,
10,
None,
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
None,
)
.await
.unwrap();
let paths: Vec<_> = interactive_only
.items
.iter()
.map(|item| item.path.as_path())
.collect();
assert_eq!(paths.len(), 1);
assert!(paths.iter().all(|path| {
path.ends_with("rollout-2025-08-02T10-00-00-00000000-0000-0000-0000-00000000002a.jsonl")
}));
let all_sessions = get_threads(
home,
10,
None,
ThreadSortKey::CreatedAt,
NO_SOURCE_FILTER,
None,
TEST_PROVIDER,
None,
)
.await
.unwrap();
let all_paths: Vec<_> = all_sessions
.items
.into_iter()
.map(|item| item.path)
.collect();
assert_eq!(all_paths.len(), 2);
assert!(all_paths.iter().any(|path| {
path.ends_with("rollout-2025-08-02T10-00-00-00000000-0000-0000-0000-00000000002a.jsonl")
}));
assert!(all_paths.iter().any(|path| {
path.ends_with("rollout-2025-08-01T10-00-00-00000000-0000-0000-0000-00000000004d.jsonl")
}));
}
#[tokio::test]
async fn test_model_provider_filter_selects_only_matching_sessions() -> Result<()> {
let temp = TempDir::new().unwrap();
let home = temp.path();
let openai_id = Uuid::from_u128(1);
let beta_id = Uuid::from_u128(2);
let none_id = Uuid::from_u128(3);
write_session_file_with_provider(
home,
"2025-09-01T12-00-00",
openai_id,
1,
Some(SessionSource::VSCode),
Some("openai"),
)?;
write_session_file_with_provider(
home,
"2025-09-01T11-00-00",
beta_id,
1,
Some(SessionSource::VSCode),
Some("beta"),
)?;
write_session_file_with_provider(
home,
"2025-09-01T10-00-00",
none_id,
1,
Some(SessionSource::VSCode),
None,
)?;
let openai_id_str = openai_id.to_string();
let none_id_str = none_id.to_string();
let openai_filter = provider_vec(&["openai"]);
let openai_sessions = get_threads(
home,
10,
None,
ThreadSortKey::CreatedAt,
NO_SOURCE_FILTER,
Some(openai_filter.as_slice()),
"openai",
None,
)
.await?;
assert_eq!(openai_sessions.items.len(), 2);
let openai_ids: Vec<_> = openai_sessions
.items
.iter()
.filter_map(|item| {
item.head
.first()
.and_then(|value| value.get("id"))
.and_then(serde_json::Value::as_str)
.map(str::to_string)
})
.collect();
assert!(openai_ids.contains(&openai_id_str));
assert!(openai_ids.contains(&none_id_str));
let beta_filter = provider_vec(&["beta"]);
let beta_sessions = get_threads(
home,
10,
None,
ThreadSortKey::CreatedAt,
NO_SOURCE_FILTER,
Some(beta_filter.as_slice()),
"openai",
None,
)
.await?;
assert_eq!(beta_sessions.items.len(), 1);
let beta_id_str = beta_id.to_string();
let beta_head = beta_sessions
.items
.first()
.and_then(|item| item.head.first())
.and_then(|value| value.get("id"))
.and_then(serde_json::Value::as_str);
assert_eq!(beta_head, Some(beta_id_str.as_str()));
let unknown_filter = provider_vec(&["unknown"]);
let unknown_sessions = get_threads(
home,
10,
None,
ThreadSortKey::CreatedAt,
NO_SOURCE_FILTER,
Some(unknown_filter.as_slice()),
"openai",
None,
)
.await?;
assert!(unknown_sessions.items.is_empty());
let all_sessions = get_threads(
home,
10,
None,
ThreadSortKey::CreatedAt,
NO_SOURCE_FILTER,
None,
"openai",
None,
)
.await?;
assert_eq!(all_sessions.items.len(), 3);
Ok(())
}
#[tokio::test]
async fn test_cwd_filter_selects_sessions_across_providers() -> Result<()> {
let temp = TempDir::new().unwrap();
let home = temp.path();
let current_cwd = temp.path().join("project");
let other_cwd = temp.path().join("other");
fs::create_dir_all(¤t_cwd)?;
fs::create_dir_all(&other_cwd)?;
let current_provider_id = Uuid::from_u128(10);
let other_provider_id = Uuid::from_u128(11);
let other_cwd_id = Uuid::from_u128(12);
write_session_file_with_cwds(
home,
"2025-10-01T10-00-00",
current_provider_id,
current_cwd.as_path(),
None,
Some("openai"),
)?;
write_session_file_with_cwds(
home,
"2025-10-01T11-00-00",
other_provider_id,
current_cwd.as_path(),
None,
Some("anthropic"),
)?;
write_session_file_with_cwds(
home,
"2025-10-01T12-00-00",
other_cwd_id,
other_cwd.as_path(),
None,
Some("anthropic"),
)?;
let page = get_threads(
home,
10,
None,
ThreadSortKey::CreatedAt,
NO_SOURCE_FILTER,
None,
"openai",
Some(current_cwd.as_path()),
)
.await?;
let ids: Vec<_> = page
.items
.iter()
.filter_map(|item| {
item.head
.first()
.and_then(|value| value.get("id"))
.and_then(serde_json::Value::as_str)
.map(str::to_string)
})
.collect();
assert_eq!(
ids,
vec![
other_provider_id.to_string(),
current_provider_id.to_string()
]
);
assert!(
page.items
.iter()
.all(|item| item.cwd.as_deref() == Some(current_cwd.as_path()))
);
Ok(())
}
#[tokio::test]
async fn test_effective_thread_cwd_prefers_latest_turn_context() -> Result<()> {
let temp = TempDir::new().unwrap();
let home = temp.path();
let session_cwd = temp.path().join("session");
let latest_cwd = temp.path().join("latest");
fs::create_dir_all(&session_cwd)?;
fs::create_dir_all(&latest_cwd)?;
let uuid = Uuid::from_u128(20);
let (_dt, _uuid) = write_session_file_with_cwds(
home,
"2025-10-02T10-00-00",
uuid,
session_cwd.as_path(),
Some(latest_cwd.as_path()),
Some("openai"),
)?;
let rollout_path = home
.join("sessions")
.join("2025")
.join("10")
.join("02")
.join(format!("rollout-2025-10-02T10-00-00-{uuid}.jsonl"));
let cwd = read_effective_thread_cwd(&rollout_path).await?;
assert_eq!(cwd, Some(latest_cwd));
Ok(())
}