use std::fs;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use chrono::{DateTime, Utc};
use clap::Subcommand;
use indicatif::{ProgressBar, ProgressStyle};
use netsky_db::{Db, TokenUsageRecord};
use serde_json::{Value, json};
const BATCH_ROWS: usize = 500;
#[derive(Subcommand, Debug)]
pub enum IngestCommand {
ClaudeSessions {
#[arg(long, value_name = "DIR")]
root: Option<PathBuf>,
#[arg(long)]
json: bool,
},
}
#[derive(Debug, Default)]
pub struct IngestStats {
pub files: usize,
pub turns: usize,
pub elapsed: Duration,
}
pub fn run(cmd: IngestCommand) -> netsky_core::Result<()> {
match cmd {
IngestCommand::ClaudeSessions { root, json } => {
let root = root.map_or_else(default_claude_projects_root, Ok)?;
let db = super::db_diag::with_lock_retry(Db::open)
.map_err(super::db_diag::wrap_open_retry_error)?;
super::db_diag::with_lock_retry(|| db.migrate())
.map_err(super::db_diag::wrap_open_retry_error)?;
let stats = super::db_diag::with_lock_retry(|| ingest_claude_sessions(&db, &root))
.map_err(super::db_diag::wrap_retry_error)?;
let secs = stats.elapsed.as_secs_f64().max(0.001);
let rows_per_sec = stats.turns as f64 / secs;
if json {
let envelope = json!({
"command": "ingest claude-sessions",
"status": "green",
"summary": format!(
"ingested {} turns from {} files in {:.2}s",
stats.turns, stats.files, secs
),
"generated_at": chrono::Utc::now().to_rfc3339(),
"data": {
"turns": stats.turns,
"files": stats.files,
"root": root.display().to_string(),
"elapsed_secs": secs,
"rows_per_sec": rows_per_sec,
},
});
println!(
"{}",
serde_json::to_string_pretty(&envelope).unwrap_or_default()
);
} else {
println!(
"ingested {turns} claude session turns from {files} files under {root} \
in {secs:.2}s ({rps:.0} rows/s)",
turns = stats.turns,
files = stats.files,
root = root.display(),
secs = secs,
rps = rows_per_sec,
);
}
Ok(())
}
}
}
pub fn ingest_claude_sessions(db: &Db, root: &Path) -> netsky_core::Result<IngestStats> {
let mut stats = IngestStats::default();
if !root.exists() {
return Ok(stats);
}
let started = Instant::now();
let files = collect_jsonl(root)?;
stats.files = files.len();
let total_bytes: u64 = files
.iter()
.filter_map(|path| fs::metadata(path).ok().map(|m| m.len()))
.sum();
let pb = make_progress_bar(total_bytes);
let mut pending: Vec<OwnedTokenRecord> = Vec::with_capacity(BATCH_ROWS);
for path in &files {
let bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
let turns = read_file_into(path, &mut pending)?;
stats.turns += turns;
while pending.len() >= BATCH_ROWS {
let chunk: Vec<_> = pending.drain(..BATCH_ROWS).collect();
flush(db, &chunk)?;
}
pb.inc(bytes);
pb.set_message(format!("{} turns", stats.turns));
}
if !pending.is_empty() {
flush(db, &pending)?;
pending.clear();
}
stats.elapsed = started.elapsed();
pb.finish_with_message(format!(
"{} turns in {:.1}s",
stats.turns,
stats.elapsed.as_secs_f64()
));
Ok(stats)
}
fn make_progress_bar(total_bytes: u64) -> ProgressBar {
let pb = ProgressBar::new(total_bytes);
let style = ProgressStyle::with_template(
"{spinner:.green} [{elapsed_precise}] [{bar:32.cyan/blue}] \
{bytes}/{total_bytes} ({bytes_per_sec}, eta {eta}) {msg}",
)
.unwrap_or_else(|_| ProgressStyle::default_bar())
.progress_chars("=>-");
pb.set_style(style);
pb.enable_steady_tick(Duration::from_millis(120));
pb
}
fn collect_jsonl(root: &Path) -> netsky_core::Result<Vec<PathBuf>> {
let mut out = Vec::new();
walk_jsonl(root, &mut out)?;
out.sort();
Ok(out)
}
fn walk_jsonl(dir: &Path, out: &mut Vec<PathBuf>) -> netsky_core::Result<()> {
let entries = fs::read_dir(dir)?.collect::<Result<Vec<_>, _>>()?;
for entry in entries {
let path = entry.path();
if path.is_dir() {
walk_jsonl(&path, out)?;
} else if path.extension().is_some_and(|ext| ext == "jsonl") {
out.push(path);
}
}
Ok(())
}
fn flush(db: &Db, batch: &[OwnedTokenRecord]) -> netsky_core::Result<()> {
if batch.is_empty() {
return Ok(());
}
let records: Vec<TokenUsageRecord<'_>> =
batch.iter().map(OwnedTokenRecord::as_db_record).collect();
db.record_token_usage_batch(records)
.map_err(|e| netsky_core::anyhow!("record token usage batch ({}): {e}", batch.len()))?;
Ok(())
}
fn default_claude_projects_root() -> netsky_core::Result<PathBuf> {
let home = dirs::home_dir().ok_or_else(|| netsky_core::anyhow!("home dir missing"))?;
Ok(home.join(".claude").join("projects"))
}
fn read_file_into(path: &Path, pending: &mut Vec<OwnedTokenRecord>) -> netsky_core::Result<usize> {
let file = fs::File::open(path)?;
let reader = BufReader::new(file);
let mut turns = 0;
for (idx, line) in reader.lines().enumerate() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let value: Value = serde_json::from_str(&line).map_err(|e| {
netsky_core::anyhow!("parse {}:{} as JSONL: {e}", path.display(), idx + 1)
})?;
if let Some(record) = token_record(&value, path, idx + 1) {
pending.push(record);
turns += 1;
}
}
Ok(turns)
}
struct OwnedTokenRecord {
ts_utc: DateTime<Utc>,
session_id: Option<String>,
agent: Option<String>,
model: Option<String>,
input_tokens: Option<i64>,
output_tokens: Option<i64>,
cached_input_tokens: Option<i64>,
detail_json: String,
}
impl OwnedTokenRecord {
fn as_db_record(&self) -> TokenUsageRecord<'_> {
TokenUsageRecord {
ts_utc: self.ts_utc,
session_id: self.session_id.as_deref(),
agent: self.agent.as_deref(),
runtime: Some("claude"),
model: self.model.as_deref(),
input_tokens: self.input_tokens,
output_tokens: self.output_tokens,
cached_input_tokens: self.cached_input_tokens,
cost_usd_micros: None,
detail_json: Some(&self.detail_json),
}
}
}
fn token_record(value: &Value, path: &Path, line: usize) -> Option<OwnedTokenRecord> {
let usage = value
.get("usage")
.or_else(|| value.pointer("/message/usage"))?;
let input_tokens = int_field(usage, "input_tokens");
let output_tokens = int_field(usage, "output_tokens");
let cached_input_tokens = cached_tokens(usage);
if input_tokens.is_none() && output_tokens.is_none() && cached_input_tokens.is_none() {
return None;
}
let detail_json = serde_json::json!({
"source": "claude-sessions",
"path": path.display().to_string(),
"line": line,
})
.to_string();
Some(OwnedTokenRecord {
ts_utc: timestamp(value),
session_id: string_field(value, "session_id").or_else(|| string_field(value, "sessionId")),
agent: string_field(value, "agent").or_else(|| string_field(value, "agent_id")),
model: string_field(value, "model").or_else(|| {
value
.pointer("/message/model")?
.as_str()
.map(str::to_string)
}),
input_tokens,
output_tokens,
cached_input_tokens,
detail_json,
})
}
fn int_field(value: &Value, key: &str) -> Option<i64> {
value.get(key)?.as_i64()
}
fn string_field(value: &Value, key: &str) -> Option<String> {
value.get(key)?.as_str().map(str::to_string)
}
fn cached_tokens(usage: &Value) -> Option<i64> {
let total = [
"cached_input_tokens",
"cache_creation_input_tokens",
"cache_read_input_tokens",
]
.into_iter()
.filter_map(|key| int_field(usage, key))
.sum::<i64>();
(total > 0).then_some(total)
}
fn timestamp(value: &Value) -> DateTime<Utc> {
["ts_utc", "timestamp", "created_at"]
.into_iter()
.find_map(|key| value.get(key)?.as_str())
.and_then(|raw| DateTime::parse_from_rfc3339(raw).ok())
.map_or_else(Utc::now, |ts| ts.with_timezone(&Utc))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn claude_sessions_ingest_records_two_usage_turns() {
let dir = tempfile::tempdir().expect("tempdir");
let projects = dir.path().join(".claude").join("projects").join("repo");
fs::create_dir_all(&projects).expect("projects");
fs::write(
projects.join("session.jsonl"),
concat!(
r#"{"timestamp":"2026-04-17T10:00:00Z","sessionId":"s1","agent":"agent7","message":{"model":"claude-sonnet-4","usage":{"input_tokens":100,"output_tokens":20,"cache_read_input_tokens":5}}}"#,
"\n",
r#"{"timestamp":"2026-04-17T10:01:00Z","sessionId":"s1","agent":"agent7","usage":{"input_tokens":200,"output_tokens":40,"cached_input_tokens":10}}"#,
"\n",
),
)
.expect("write jsonl");
let db = Db::open_path(dir.path().join("meta.db")).expect("db");
db.migrate().expect("migrate");
let stats = ingest_claude_sessions(&db, &dir.path().join(".claude").join("projects"))
.expect("ingest");
assert_eq!(stats.files, 1);
assert_eq!(stats.turns, 2);
let output = db
.query(
"SELECT COUNT(*) AS c, SUM(input_tokens) AS input, \
SUM(output_tokens) AS output, SUM(cached_input_tokens) AS cached \
FROM token_usage",
)
.expect("query");
assert!(output.contains("2"), "{output}");
assert!(output.contains("300"), "{output}");
assert!(output.contains("60"), "{output}");
assert!(output.contains("15"), "{output}");
}
#[test]
fn claude_sessions_ingest_batches_many_rows() {
let dir = tempfile::tempdir().expect("tempdir");
let projects = dir.path().join(".claude").join("projects").join("repo");
fs::create_dir_all(&projects).expect("projects");
let total_rows = BATCH_ROWS * 2 + 17;
let mut buf = String::new();
for i in 0..total_rows {
buf.push_str(&format!(
r#"{{"timestamp":"2026-04-17T10:00:00Z","sessionId":"s{i}","usage":{{"input_tokens":{i},"output_tokens":1}}}}"#,
));
buf.push('\n');
}
fs::write(projects.join("session.jsonl"), buf).expect("write");
let db = Db::open_path(dir.path().join("meta.db")).expect("db");
db.migrate().expect("migrate");
let stats = ingest_claude_sessions(&db, &dir.path().join(".claude").join("projects"))
.expect("ingest");
assert_eq!(stats.turns, total_rows);
let out = db
.query("SELECT COUNT(*) AS c, MAX(id) AS hi FROM token_usage")
.expect("query");
assert!(out.contains(&total_rows.to_string()), "{out}");
}
#[test]
fn claude_sessions_ingest_is_idempotent_on_replay() {
let dir = tempfile::tempdir().expect("tempdir");
let projects = dir.path().join(".claude").join("projects").join("repo");
fs::create_dir_all(&projects).expect("projects");
fs::write(
projects.join("session.jsonl"),
concat!(
r#"{"timestamp":"2026-04-17T10:00:00Z","sessionId":"s1","agent":"agent7","message":{"model":"claude-sonnet-4","usage":{"input_tokens":100,"output_tokens":20}}}"#,
"\n",
r#"{"timestamp":"2026-04-17T10:01:00Z","sessionId":"s1","agent":"agent7","usage":{"input_tokens":200,"output_tokens":40,"cached_input_tokens":10}}"#,
"\n",
),
)
.expect("write jsonl");
let db = Db::open_path(dir.path().join("meta.db")).expect("db");
db.migrate().expect("migrate");
let first = ingest_claude_sessions(&db, &dir.path().join(".claude").join("projects"))
.expect("first ingest");
let after_first = db
.query("SELECT COUNT(*) AS c FROM token_usage")
.expect("count after first ingest");
let second = ingest_claude_sessions(&db, &dir.path().join(".claude").join("projects"))
.expect("second ingest");
let after_second = db
.query("SELECT COUNT(*) AS c FROM token_usage")
.expect("count after second ingest");
assert_eq!(first.turns, 2);
assert_eq!(second.turns, 2);
assert!(after_first.contains('2'), "{after_first}");
assert_eq!(after_first, after_second);
}
}