use anyhow::Result;
use rusqlite::Connection;
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use super::utils::diff_line_counts;
use crate::db;
use crate::ingest_progress::IngestProgressObserver;
pub fn plan_session_files() -> Result<Vec<PathBuf>> {
let sessions_root = codex_sessions_dir()?;
if !sessions_root.exists() {
return Ok(Vec::new());
}
let mut files = find_jsonl_files(&sessions_root);
files.sort();
Ok(files)
}
pub fn ingest_planned_sessions(
db: &Connection,
session_files: &[PathBuf],
verbose: bool,
mut progress: Option<&mut dyn IngestProgressObserver>,
) -> Result<usize> {
let mut total_rows = 0;
for session_file in session_files {
if verbose {
eprint!(" {:?} ... ", session_file);
}
match ingest_session(session_file, db) {
Ok(0) => {
if verbose {
eprintln!("skipped (already ingested or empty)");
}
}
Ok(n) => {
if verbose {
eprintln!("wrote {} rows", n);
}
total_rows += n;
}
Err(e) => {
eprintln!("Warning: skipping {:?}: {}", session_file, e);
}
}
if let Some(observer) = progress.as_mut() {
observer.advance(&session_file.to_string_lossy());
}
}
Ok(total_rows)
}
fn codex_sessions_dir() -> Result<PathBuf> {
let home = dirs::home_dir().ok_or_else(|| anyhow::anyhow!("Home directory not found"))?;
Ok(home.join(".codex").join("sessions"))
}
fn find_jsonl_files(root: &PathBuf) -> Vec<PathBuf> {
let mut files = Vec::new();
if let Ok(entries) = std::fs::read_dir(root) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
files.extend(find_jsonl_files(&path));
} else if path.extension().and_then(|e| e.to_str()) == Some("jsonl") {
files.push(path);
}
}
}
files
}
#[derive(Deserialize)]
struct Line {
#[serde(rename = "type")]
kind: String,
timestamp: Option<String>,
payload: Value,
}
#[derive(Deserialize)]
struct SessionMetaPayload {
id: String,
timestamp: Option<String>,
cwd: Option<String>,
model_provider: Option<String>,
}
fn ingest_session(path: &PathBuf, db: &Connection) -> Result<usize> {
let file = std::fs::File::open(path)?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
let first_line = match lines.next() {
Some(Ok(l)) if !l.trim().is_empty() => l,
_ => return Ok(0),
};
let first: Line = match serde_json::from_str(&first_line) {
Ok(l) => l,
Err(_) => return Ok(0), };
if first.kind != "session_meta" {
return Ok(0);
}
let first_outer_ts = first.timestamp;
let meta: SessionMetaPayload = match serde_json::from_value(first.payload) {
Ok(m) => m,
Err(_) => return Ok(0),
};
let session_id = &meta.id;
let source_path = path.to_string_lossy().to_string();
let already_exists = db::session_exists(db, session_id)?;
let usage_already_exists = db::session_usage_exists(db, "codex", session_id)?;
if already_exists && usage_already_exists {
return Ok(0);
}
let session_start_ts = meta.timestamp.clone().or(first_outer_ts);
let mut pending_reads: HashMap<String, String> = HashMap::new();
let mut file_cache: HashMap<String, String> = HashMap::new();
let mut session_model: Option<String> = None;
let mut latest_usage: Option<CodexTokenUsage> = None;
db::begin_session_with_model(
db,
"codex",
session_id,
meta.cwd.as_deref(),
session_start_ts.as_deref(),
session_start_ts.as_deref(),
meta.model_provider.as_deref(),
None,
)?;
let mut written = if already_exists { 0usize } else { 1usize };
for line_result in lines {
let raw = line_result?;
if raw.trim().is_empty() {
continue;
}
let line: Line = match serde_json::from_str(&raw) {
Ok(l) => l,
Err(_) => continue,
};
let ts = line.timestamp.clone().or_else(|| session_start_ts.clone());
match line.kind.as_str() {
"turn_context" if session_model.is_none() => {
session_model = line
.payload
.get("model")
.and_then(|value| value.as_str())
.map(ToOwned::to_owned);
if session_model.is_some() {
db::upsert_metadata_session_with_model(
db,
"codex",
session_id,
meta.cwd.as_deref(),
session_start_ts.as_deref(),
ts.as_deref().or(session_start_ts.as_deref()),
Some(&source_path),
meta.model_provider.as_deref(),
session_model.as_deref(),
)?;
}
}
"event_msg" => {
if !usage_already_exists
&& line.payload.get("type").and_then(|v| v.as_str()) == Some("token_count")
&& let Some(usage) = parse_codex_token_usage(&line.payload)
{
latest_usage = Some(usage);
}
if !already_exists
&& line.payload.get("type").and_then(|v| v.as_str()) == Some("user_message")
&& let Some(msg) = line.payload.get("message").and_then(|v| v.as_str())
{
let content = msg.to_string();
let words = content.split_whitespace().count();
if words > 0 {
db::ingest_session_message(
db,
"codex",
session_id,
"user",
&content,
words as i64,
ts.as_deref(),
)?;
written += 1;
}
}
}
"response_item" => {
if already_exists {
continue;
}
let role = line
.payload
.get("role")
.and_then(|v| v.as_str())
.unwrap_or("");
let item_type = line
.payload
.get("type")
.and_then(|v| v.as_str())
.unwrap_or("");
if role == "assistant" {
if let Some(content_arr) =
line.payload.get("content").and_then(|v| v.as_array())
{
let text: String = content_arr
.iter()
.filter(|item| {
item.get("type").and_then(|t| t.as_str()) == Some("output_text")
})
.filter_map(|item| item.get("text").and_then(|t| t.as_str()))
.collect::<Vec<_>>()
.join("");
let words = text.split_whitespace().count();
if words > 0 {
db::ingest_session_message(
db,
"codex",
session_id,
"assistant",
&text,
words as i64,
ts.as_deref(),
)?;
written += 1;
}
}
}
if item_type == "function_call_output" {
let call_id = line
.payload
.get("call_id")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(file_path) = pending_reads.remove(call_id) {
let raw_output = line
.payload
.get("output")
.and_then(|v| v.as_str())
.unwrap_or("");
let content = if let Some(pos) = raw_output.find("\nOutput:\n") {
raw_output[pos + "\nOutput:\n".len()..].to_string()
} else {
raw_output.to_string()
};
file_cache.insert(file_path, content);
}
}
if item_type == "function_call" {
let name = line
.payload
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("");
if name == "exec_command" {
let args_str: Option<String> =
line.payload.get("arguments").map(|v| match v {
Value::String(s) => s.clone(),
other => other.to_string(),
});
let parsed_cmd: Option<String> = args_str.as_ref().and_then(|args| {
serde_json::from_str::<Value>(args).ok().and_then(|obj| {
obj.get("cmd").and_then(|v| v.as_str()).map(String::from)
})
});
if let Some(cmd) = &parsed_cmd {
let trimmed = cmd.trim();
if trimmed.starts_with("cat ") && !trimmed.contains('>') {
let path = trimmed[4..].trim().to_string();
if !path.is_empty() {
let call_id = line
.payload
.get("call_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if !call_id.is_empty() {
pending_reads.insert(call_id, path);
}
}
}
}
}
written +=
parse_tool_call(&line.payload, db, session_id, ts.as_deref(), &file_cache)?;
}
}
_ => {} }
}
if !usage_already_exists && let Some(usage) = latest_usage {
db::ingest_session_usage(
db,
"codex",
session_id,
session_start_ts.as_deref(),
session_model.as_deref(),
usage.input_tokens,
usage.cached_input_tokens,
0,
usage.output_tokens,
usage.reasoning_output_tokens,
usage.total_tokens,
None,
"estimated_from_tokens",
)?;
written += 1;
}
Ok(written)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct CodexTokenUsage {
input_tokens: i64,
cached_input_tokens: i64,
output_tokens: i64,
reasoning_output_tokens: i64,
total_tokens: i64,
}
fn parse_codex_token_usage(payload: &Value) -> Option<CodexTokenUsage> {
let usage = payload
.get("info")
.and_then(|info| info.get("total_token_usage"))
.or_else(|| {
payload
.get("info")
.and_then(|info| info.get("last_token_usage"))
})?;
let raw_input_tokens = usage
.get("input_tokens")
.and_then(Value::as_i64)
.unwrap_or(0);
let cached_input_tokens = usage
.get("cached_input_tokens")
.and_then(Value::as_i64)
.unwrap_or(0);
let output_tokens = usage
.get("output_tokens")
.and_then(Value::as_i64)
.unwrap_or(0);
let reasoning_output_tokens = usage
.get("reasoning_output_tokens")
.and_then(Value::as_i64)
.unwrap_or(0);
let total_tokens = usage
.get("total_tokens")
.and_then(Value::as_i64)
.unwrap_or(raw_input_tokens + output_tokens + reasoning_output_tokens);
let input_tokens = raw_input_tokens.saturating_sub(cached_input_tokens);
Some(CodexTokenUsage {
input_tokens,
cached_input_tokens,
output_tokens,
reasoning_output_tokens,
total_tokens,
})
}
fn parse_tool_call(
value: &Value,
db: &Connection,
session_id: &str,
timestamp: Option<&str>,
file_cache: &HashMap<String, String>,
) -> Result<usize> {
let name = value.get("name").and_then(|v| v.as_str()).unwrap_or("");
if name == "exec_command" {
let args_str: Option<String> = value.get("arguments").map(|v| match v {
Value::String(s) => s.clone(),
other => other.to_string(),
});
if let Some(args) = args_str {
let cmd: Option<String> = serde_json::from_str::<Value>(&args)
.ok()
.and_then(|obj| obj.get("cmd").and_then(|v| v.as_str()).map(String::from));
if let Some(cmd) = cmd
&& let Some((file_path, lines_added, lines_removed)) =
parse_exec_cmd_write(&cmd, file_cache)
{
db::ingest_accepted_code_change(
db,
"codex",
session_id,
&file_path,
lines_added,
lines_removed,
timestamp,
)?;
return Ok(1);
}
}
return Ok(0);
}
if name != "apply_patch" {
return Ok(0);
}
let args_str: Option<String> = value.get("arguments").map(|v| match v {
Value::String(s) => s.clone(),
other => other.to_string(),
});
let patch = match args_str {
Some(s) => {
if let Ok(obj) = serde_json::from_str::<Value>(&s) {
obj.get("patch")
.and_then(|p| p.as_str())
.unwrap_or(&s)
.to_string()
} else {
s
}
}
None => return Ok(0),
};
let mut written = 0usize;
for (file_path, added, removed) in parse_unified_diff(&patch) {
db::ingest_accepted_code_change(
db, "codex", session_id, &file_path, added, removed, timestamp,
)?;
written += 1;
}
Ok(written)
}
fn parse_exec_cmd_write(
cmd: &str,
file_cache: &HashMap<String, String>,
) -> Option<(String, i64, i64)> {
let cmd = cmd.trim();
if !cmd.starts_with("cat ") {
return None;
}
let after_cat = cmd[4..].trim_start();
let heredoc_pos = after_cat.find("<<")?;
let redirect_and_path = after_cat[..heredoc_pos].trim();
let file_path = if let Some(path) = redirect_and_path.strip_prefix(">>") {
path.trim()
} else if let Some(path) = redirect_and_path.strip_prefix('>') {
path.trim()
} else {
return None;
};
if file_path.is_empty() {
return None;
}
let raw_marker = after_cat[heredoc_pos + 2..].lines().next().unwrap_or("EOF");
let marker = raw_marker.trim().trim_matches('\'').trim_matches('"');
let newline_pos = cmd.find('\n')?;
let body = &cmd[newline_pos + 1..];
let end_pat = format!("\n{}", marker);
let content = if let Some(end) = body.rfind(end_pat.as_str()) {
&body[..end]
} else {
body
};
let (lines_added, lines_removed) = if let Some(before) = file_cache.get(file_path) {
diff_line_counts(before, content)
} else {
(content.lines().count() as i64, 0)
};
Some((file_path.to_string(), lines_added, lines_removed))
}
fn parse_unified_diff(diff: &str) -> Vec<(String, i64, i64)> {
let mut results = Vec::new();
let mut current_file: Option<String> = None;
let mut added: i64 = 0;
let mut removed: i64 = 0;
for line in diff.lines() {
if let Some(rest) = line.strip_prefix("+++ ") {
if let Some(f) = current_file.take() {
results.push((f, added, removed));
}
let path = rest.strip_prefix("b/").unwrap_or(rest).trim().to_string();
current_file = Some(path);
added = 0;
removed = 0;
} else if let Some(rest) = line.strip_prefix('+') {
if !rest.starts_with("++") {
added += 1;
}
} else if let Some(rest) = line.strip_prefix('-')
&& !rest.starts_with("--")
{
removed += 1;
}
}
if let Some(f) = current_file {
results.push((f, added, removed));
}
results
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn parses_token_count_payload() {
let payload = json!({
"type": "token_count",
"info": {
"total_token_usage": {
"input_tokens": 30000,
"cached_input_tokens": 20000,
"output_tokens": 900,
"reasoning_output_tokens": 120,
"total_tokens": 30900
},
"last_token_usage": {
"input_tokens": 18749,
"cached_input_tokens": 12672,
"output_tokens": 379,
"reasoning_output_tokens": 12,
"total_tokens": 19140
}
}
});
let usage = parse_codex_token_usage(&payload).expect("usage should parse");
assert_eq!(
usage,
CodexTokenUsage {
input_tokens: 10000,
cached_input_tokens: 20000,
output_tokens: 900,
reasoning_output_tokens: 120,
total_tokens: 30900,
}
);
}
}