netsky 0.2.0

netsky CLI: the viable system launcher and subcommand dispatcher
Documentation
use std::fs;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};

use chrono::{DateTime, Utc};
use clap::Subcommand;
use indicatif::{ProgressBar, ProgressStyle};
use netsky_db::{Db, TokenUsageRecord};
use serde_json::{Value, json};

/// Rows per `record_token_usage_batch` flush. Picked to keep each turso
/// transaction small enough to commit in well under a second on a hot
/// laptop SSD while amortising the per-batch connection setup cost.
const BATCH_ROWS: usize = 500;

#[derive(Subcommand, Debug)]
pub enum IngestCommand {
    /// Ingest token usage from Claude Code JSONL session logs.
    ClaudeSessions {
        /// Override the Claude projects root. Defaults to ~/.claude/projects.
        #[arg(long, value_name = "DIR")]
        root: Option<PathBuf>,
        /// Print a JSON envelope instead of operator text.
        #[arg(long)]
        json: bool,
    },
}

#[derive(Debug, Default)]
pub struct IngestStats {
    pub files: usize,
    pub turns: usize,
    pub elapsed: Duration,
}

pub fn run(cmd: IngestCommand) -> netsky_core::Result<()> {
    match cmd {
        IngestCommand::ClaudeSessions { root, json } => {
            let root = root.map_or_else(default_claude_projects_root, Ok)?;
            let db = super::db_diag::with_lock_retry(Db::open)
                .map_err(super::db_diag::wrap_open_retry_error)?;
            super::db_diag::with_lock_retry(|| db.migrate())
                .map_err(super::db_diag::wrap_open_retry_error)?;
            let stats = super::db_diag::with_lock_retry(|| ingest_claude_sessions(&db, &root))
                .map_err(super::db_diag::wrap_retry_error)?;
            let secs = stats.elapsed.as_secs_f64().max(0.001);
            let rows_per_sec = stats.turns as f64 / secs;
            if json {
                let envelope = json!({
                    "command": "ingest claude-sessions",
                    "status": "green",
                    "summary": format!(
                        "ingested {} turns from {} files in {:.2}s",
                        stats.turns, stats.files, secs
                    ),
                    "generated_at": chrono::Utc::now().to_rfc3339(),
                    "data": {
                        "turns": stats.turns,
                        "files": stats.files,
                        "root": root.display().to_string(),
                        "elapsed_secs": secs,
                        "rows_per_sec": rows_per_sec,
                    },
                });
                println!(
                    "{}",
                    serde_json::to_string_pretty(&envelope).unwrap_or_default()
                );
            } else {
                println!(
                    "ingested {turns} claude session turns from {files} files under {root} \
                     in {secs:.2}s ({rps:.0} rows/s)",
                    turns = stats.turns,
                    files = stats.files,
                    root = root.display(),
                    secs = secs,
                    rps = rows_per_sec,
                );
            }
            Ok(())
        }
    }
}

pub fn ingest_claude_sessions(db: &Db, root: &Path) -> netsky_core::Result<IngestStats> {
    let mut stats = IngestStats::default();
    if !root.exists() {
        return Ok(stats);
    }
    let started = Instant::now();
    let files = collect_jsonl(root)?;
    stats.files = files.len();
    let total_bytes: u64 = files
        .iter()
        .filter_map(|path| fs::metadata(path).ok().map(|m| m.len()))
        .sum();

    let pb = make_progress_bar(total_bytes);
    let mut pending: Vec<OwnedTokenRecord> = Vec::with_capacity(BATCH_ROWS);

    for path in &files {
        let bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
        let turns = read_file_into(path, &mut pending)?;
        stats.turns += turns;
        while pending.len() >= BATCH_ROWS {
            let chunk: Vec<_> = pending.drain(..BATCH_ROWS).collect();
            flush(db, &chunk)?;
        }
        pb.inc(bytes);
        pb.set_message(format!("{} turns", stats.turns));
    }
    if !pending.is_empty() {
        flush(db, &pending)?;
        pending.clear();
    }
    stats.elapsed = started.elapsed();
    pb.finish_with_message(format!(
        "{} turns in {:.1}s",
        stats.turns,
        stats.elapsed.as_secs_f64()
    ));
    Ok(stats)
}

fn make_progress_bar(total_bytes: u64) -> ProgressBar {
    let pb = ProgressBar::new(total_bytes);
    let style = ProgressStyle::with_template(
        "{spinner:.green} [{elapsed_precise}] [{bar:32.cyan/blue}] \
         {bytes}/{total_bytes} ({bytes_per_sec}, eta {eta}) {msg}",
    )
    .unwrap_or_else(|_| ProgressStyle::default_bar())
    .progress_chars("=>-");
    pb.set_style(style);
    pb.enable_steady_tick(Duration::from_millis(120));
    pb
}

fn collect_jsonl(root: &Path) -> netsky_core::Result<Vec<PathBuf>> {
    let mut out = Vec::new();
    walk_jsonl(root, &mut out)?;
    out.sort();
    Ok(out)
}

fn walk_jsonl(dir: &Path, out: &mut Vec<PathBuf>) -> netsky_core::Result<()> {
    let entries = fs::read_dir(dir)?.collect::<Result<Vec<_>, _>>()?;
    for entry in entries {
        let path = entry.path();
        if path.is_dir() {
            walk_jsonl(&path, out)?;
        } else if path.extension().is_some_and(|ext| ext == "jsonl") {
            out.push(path);
        }
    }
    Ok(())
}

fn flush(db: &Db, batch: &[OwnedTokenRecord]) -> netsky_core::Result<()> {
    if batch.is_empty() {
        return Ok(());
    }
    let records: Vec<TokenUsageRecord<'_>> =
        batch.iter().map(OwnedTokenRecord::as_db_record).collect();
    db.record_token_usage_batch(records)
        .map_err(|e| netsky_core::anyhow!("record token usage batch ({}): {e}", batch.len()))?;
    Ok(())
}

fn default_claude_projects_root() -> netsky_core::Result<PathBuf> {
    let home = dirs::home_dir().ok_or_else(|| netsky_core::anyhow!("home dir missing"))?;
    Ok(home.join(".claude").join("projects"))
}

fn read_file_into(path: &Path, pending: &mut Vec<OwnedTokenRecord>) -> netsky_core::Result<usize> {
    let file = fs::File::open(path)?;
    let reader = BufReader::new(file);
    let mut turns = 0;
    for (idx, line) in reader.lines().enumerate() {
        let line = line?;
        if line.trim().is_empty() {
            continue;
        }
        let value: Value = serde_json::from_str(&line).map_err(|e| {
            netsky_core::anyhow!("parse {}:{} as JSONL: {e}", path.display(), idx + 1)
        })?;
        if let Some(record) = token_record(&value, path, idx + 1) {
            pending.push(record);
            turns += 1;
        }
    }
    Ok(turns)
}

struct OwnedTokenRecord {
    ts_utc: DateTime<Utc>,
    session_id: Option<String>,
    agent: Option<String>,
    model: Option<String>,
    input_tokens: Option<i64>,
    output_tokens: Option<i64>,
    cached_input_tokens: Option<i64>,
    detail_json: String,
}

impl OwnedTokenRecord {
    fn as_db_record(&self) -> TokenUsageRecord<'_> {
        TokenUsageRecord {
            ts_utc: self.ts_utc,
            session_id: self.session_id.as_deref(),
            agent: self.agent.as_deref(),
            runtime: Some("claude"),
            model: self.model.as_deref(),
            input_tokens: self.input_tokens,
            output_tokens: self.output_tokens,
            cached_input_tokens: self.cached_input_tokens,
            cost_usd_micros: None,
            detail_json: Some(&self.detail_json),
        }
    }
}

fn token_record(value: &Value, path: &Path, line: usize) -> Option<OwnedTokenRecord> {
    let usage = value
        .get("usage")
        .or_else(|| value.pointer("/message/usage"))?;
    let input_tokens = int_field(usage, "input_tokens");
    let output_tokens = int_field(usage, "output_tokens");
    let cached_input_tokens = cached_tokens(usage);
    if input_tokens.is_none() && output_tokens.is_none() && cached_input_tokens.is_none() {
        return None;
    }
    let detail_json = serde_json::json!({
        "source": "claude-sessions",
        "path": path.display().to_string(),
        "line": line,
    })
    .to_string();
    Some(OwnedTokenRecord {
        ts_utc: timestamp(value),
        session_id: string_field(value, "session_id").or_else(|| string_field(value, "sessionId")),
        agent: string_field(value, "agent").or_else(|| string_field(value, "agent_id")),
        model: string_field(value, "model").or_else(|| {
            value
                .pointer("/message/model")?
                .as_str()
                .map(str::to_string)
        }),
        input_tokens,
        output_tokens,
        cached_input_tokens,
        detail_json,
    })
}

fn int_field(value: &Value, key: &str) -> Option<i64> {
    value.get(key)?.as_i64()
}

fn string_field(value: &Value, key: &str) -> Option<String> {
    value.get(key)?.as_str().map(str::to_string)
}

fn cached_tokens(usage: &Value) -> Option<i64> {
    let total = [
        "cached_input_tokens",
        "cache_creation_input_tokens",
        "cache_read_input_tokens",
    ]
    .into_iter()
    .filter_map(|key| int_field(usage, key))
    .sum::<i64>();
    (total > 0).then_some(total)
}

fn timestamp(value: &Value) -> DateTime<Utc> {
    ["ts_utc", "timestamp", "created_at"]
        .into_iter()
        .find_map(|key| value.get(key)?.as_str())
        .and_then(|raw| DateTime::parse_from_rfc3339(raw).ok())
        .map_or_else(Utc::now, |ts| ts.with_timezone(&Utc))
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn claude_sessions_ingest_records_two_usage_turns() {
        let dir = tempfile::tempdir().expect("tempdir");
        let projects = dir.path().join(".claude").join("projects").join("repo");
        fs::create_dir_all(&projects).expect("projects");
        fs::write(
            projects.join("session.jsonl"),
            concat!(
                r#"{"timestamp":"2026-04-17T10:00:00Z","sessionId":"s1","agent":"agent7","message":{"model":"claude-sonnet-4","usage":{"input_tokens":100,"output_tokens":20,"cache_read_input_tokens":5}}}"#,
                "\n",
                r#"{"timestamp":"2026-04-17T10:01:00Z","sessionId":"s1","agent":"agent7","usage":{"input_tokens":200,"output_tokens":40,"cached_input_tokens":10}}"#,
                "\n",
            ),
        )
        .expect("write jsonl");

        let db = Db::open_path(dir.path().join("meta.db")).expect("db");
        db.migrate().expect("migrate");
        let stats = ingest_claude_sessions(&db, &dir.path().join(".claude").join("projects"))
            .expect("ingest");

        assert_eq!(stats.files, 1);
        assert_eq!(stats.turns, 2);
        let output = db
            .query(
                "SELECT COUNT(*) AS c, SUM(input_tokens) AS input, \
                 SUM(output_tokens) AS output, SUM(cached_input_tokens) AS cached \
                 FROM token_usage",
            )
            .expect("query");
        assert!(output.contains("2"), "{output}");
        assert!(output.contains("300"), "{output}");
        assert!(output.contains("60"), "{output}");
        assert!(output.contains("15"), "{output}");
    }

    #[test]
    fn claude_sessions_ingest_batches_many_rows() {
        let dir = tempfile::tempdir().expect("tempdir");
        let projects = dir.path().join(".claude").join("projects").join("repo");
        fs::create_dir_all(&projects).expect("projects");
        // Force more than two full batch flushes.
        let total_rows = BATCH_ROWS * 2 + 17;
        let mut buf = String::new();
        for i in 0..total_rows {
            buf.push_str(&format!(
                r#"{{"timestamp":"2026-04-17T10:00:00Z","sessionId":"s{i}","usage":{{"input_tokens":{i},"output_tokens":1}}}}"#,
            ));
            buf.push('\n');
        }
        fs::write(projects.join("session.jsonl"), buf).expect("write");

        let db = Db::open_path(dir.path().join("meta.db")).expect("db");
        db.migrate().expect("migrate");
        let stats = ingest_claude_sessions(&db, &dir.path().join(".claude").join("projects"))
            .expect("ingest");
        assert_eq!(stats.turns, total_rows);
        let out = db
            .query("SELECT COUNT(*) AS c, MAX(id) AS hi FROM token_usage")
            .expect("query");
        assert!(out.contains(&total_rows.to_string()), "{out}");
    }

    #[test]
    fn claude_sessions_ingest_is_idempotent_on_replay() {
        let dir = tempfile::tempdir().expect("tempdir");
        let projects = dir.path().join(".claude").join("projects").join("repo");
        fs::create_dir_all(&projects).expect("projects");
        fs::write(
            projects.join("session.jsonl"),
            concat!(
                r#"{"timestamp":"2026-04-17T10:00:00Z","sessionId":"s1","agent":"agent7","message":{"model":"claude-sonnet-4","usage":{"input_tokens":100,"output_tokens":20}}}"#,
                "\n",
                r#"{"timestamp":"2026-04-17T10:01:00Z","sessionId":"s1","agent":"agent7","usage":{"input_tokens":200,"output_tokens":40,"cached_input_tokens":10}}"#,
                "\n",
            ),
        )
        .expect("write jsonl");

        let db = Db::open_path(dir.path().join("meta.db")).expect("db");
        db.migrate().expect("migrate");

        let first = ingest_claude_sessions(&db, &dir.path().join(".claude").join("projects"))
            .expect("first ingest");
        let after_first = db
            .query("SELECT COUNT(*) AS c FROM token_usage")
            .expect("count after first ingest");

        let second = ingest_claude_sessions(&db, &dir.path().join(".claude").join("projects"))
            .expect("second ingest");
        let after_second = db
            .query("SELECT COUNT(*) AS c FROM token_usage")
            .expect("count after second ingest");

        assert_eq!(first.turns, 2);
        assert_eq!(second.turns, 2);
        assert!(after_first.contains('2'), "{after_first}");
        assert_eq!(after_first, after_second);
    }
}