paceflow 0.2.4

Local-first CLI that turns AI coding session history and git metadata into engineering analytics.
Documentation
pub(crate) mod shared;

use anyhow::Result;
use rusqlite::Connection;
use std::path::{Path, PathBuf};

use crate::db;
use crate::ingest_progress::IngestProgressObserver;

pub fn plan_session_files() -> Result<Vec<PathBuf>> {
    shared::plan_session_files()
}

pub fn ingest_planned_sessions(
    db: &Connection,
    session_files: &[PathBuf],
    verbose: bool,
    mut progress: Option<&mut dyn IngestProgressObserver>,
) -> Result<usize> {
    let mut total_rows = 0usize;

    for session_file in session_files {
        if verbose {
            eprint!("  {:?} ... ", session_file);
        }

        match ingest_session(session_file, db) {
            Ok(0) => {
                if verbose {
                    eprintln!("skipped (already ingested or empty)");
                }
            }
            Ok(written) => {
                if verbose {
                    eprintln!("wrote {} rows", written);
                }
                total_rows += written;
            }
            Err(error) => {
                eprintln!("Warning: skipping {:?}: {}", session_file, error);
            }
        }

        if let Some(observer) = progress.as_mut() {
            observer.advance(&session_file.to_string_lossy());
        }
    }

    Ok(total_rows)
}

fn ingest_session(path: &Path, db: &Connection) -> Result<usize> {
    let parsed = shared::parse_session_file(path)?;
    if parsed.session_id.trim().is_empty() {
        return Ok(0);
    }
    if parsed.visible_messages.is_empty()
        && parsed.structured_writes.is_empty()
        && parsed.usage_events.is_empty()
    {
        return Ok(0);
    }

    let source_file = parsed.source_file.clone();
    let already_exists = db::session_exists(db, &parsed.session_id)?;
    let usage_already_exists = db::session_usage_exists(db, "claude", &parsed.session_id)?;
    if already_exists {
        db::upsert_metadata_session_with_model(
            db,
            "claude",
            &parsed.session_id,
            parsed.session_cwd.as_deref(),
            parsed.started_at.as_deref(),
            parsed.ended_at.as_deref().or(parsed.started_at.as_deref()),
            Some(&source_file),
            Some("claude"),
            parsed.model_name.as_deref(),
        )?;
        if usage_already_exists {
            return Ok(0);
        }
        let mut written = 0usize;
        for usage in parsed.usage_events {
            db::ingest_session_usage(
                db,
                "claude",
                &parsed.session_id,
                parsed.ended_at.as_deref().or(parsed.started_at.as_deref()),
                parsed.model_name.as_deref(),
                usage.input_tokens,
                usage.cache_read_input_tokens,
                usage.cache_creation_input_tokens,
                usage.output_tokens,
                0,
                usage.total_tokens,
                None,
                "estimated_from_tokens",
            )?;
            written += 1;
        }
        return Ok(written);
    }

    db::upsert_metadata_session_with_model(
        db,
        "claude",
        &parsed.session_id,
        parsed.session_cwd.as_deref(),
        parsed.started_at.as_deref(),
        parsed.ended_at.as_deref().or(parsed.started_at.as_deref()),
        Some(&source_file),
        Some("claude"),
        parsed.model_name.as_deref(),
    )?;
    let mut written = 1usize;

    for usage in &parsed.usage_events {
        db::ingest_session_usage(
            db,
            "claude",
            &parsed.session_id,
            parsed.ended_at.as_deref().or(parsed.started_at.as_deref()),
            parsed.model_name.as_deref(),
            usage.input_tokens,
            usage.cache_read_input_tokens,
            usage.cache_creation_input_tokens,
            usage.output_tokens,
            0,
            usage.total_tokens,
            None,
            "estimated_from_tokens",
        )?;
        written += 1;
    }

    for message in parsed.visible_messages {
        let words = message.text.split_whitespace().count() as i64;
        if words == 0 {
            continue;
        }
        db::ingest_session_message(
            db,
            "claude",
            &parsed.session_id,
            message.role.as_str(),
            &message.text,
            words,
            message.timestamp.as_deref(),
        )?;
        written += 1;
    }

    for change in parsed.structured_writes {
        db::ingest_accepted_code_change(
            db,
            "claude",
            &parsed.session_id,
            &change.abs_path,
            change.added_lines,
            change.removed_lines,
            change.timestamp.as_deref(),
        )?;
        written += 1;
    }

    Ok(written)
}

#[cfg(test)]
mod tests {
    use super::ingest_session;
    use crate::db::init_app_schema;
    use anyhow::Result;
    use rusqlite::Connection;
    use serde_json::json;
    use std::fs;
    use tempfile::tempdir;

    #[test]
    fn session_ingest_filters_meta_and_tool_result_noise() -> Result<()> {
        let tempdir = tempdir()?;
        let path = tempdir.path().join("claude-session.jsonl");
        fs::write(
            &path,
            [
                json!({
                    "type": "user",
                    "timestamp": "2026-04-22T09:00:00Z",
                    "sessionId": "claude-meta",
                    "cwd": "/tmp/repo",
                    "message": {"role": "user", "content": "please inspect this"}
                })
                .to_string(),
                json!({
                    "type": "assistant",
                    "timestamp": "2026-04-22T09:00:01Z",
                    "sessionId": "claude-meta",
                    "cwd": "/tmp/repo",
                    "message": {
                        "model": "claude-opus-4-6",
                        "role": "assistant",
                        "content": [{"type": "text", "text": "Looking now."}]
                    }
                })
                .to_string(),
                json!({
                    "type": "user",
                    "timestamp": "2026-04-22T09:00:02Z",
                    "sessionId": "claude-meta",
                    "cwd": "/tmp/repo",
                    "isMeta": true,
                    "message": {"role": "user", "content": "<local-command-caveat>ignore</local-command-caveat>"}
                })
                .to_string(),
                json!({
                    "type": "user",
                    "timestamp": "2026-04-22T09:00:03Z",
                    "sessionId": "claude-meta",
                    "cwd": "/tmp/repo",
                    "message": {
                        "role": "user",
                        "content": [{"type": "tool_result", "tool_use_id": "toolu_read", "content": "ok"}]
                    }
                })
                .to_string(),
            ]
            .join("\n"),
        )?;

        let conn = Connection::open_in_memory()?;
        init_app_schema(&conn)?;
        let written = ingest_session(&path, &conn)?;
        assert_eq!(written, 3);

        let rows: Vec<(String, String)> = {
            let mut stmt = conn.prepare(
                "SELECT role, content
                 FROM fact_session_message
                 WHERE provider='claude' AND session_id='claude-meta'
                 ORDER BY message_index",
            )?;
            stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
                .collect::<std::result::Result<Vec<_>, _>>()?
        };
        assert_eq!(
            rows,
            vec![
                ("user".to_string(), "please inspect this".to_string()),
                ("assistant".to_string(), "Looking now.".to_string()),
            ]
        );
        Ok(())
    }

    #[test]
    fn session_ingest_uses_latest_real_model_not_synthetic_placeholder() -> Result<()> {
        let tempdir = tempdir()?;
        let path = tempdir.path().join("claude-models.jsonl");
        fs::write(
            &path,
            [
                json!({
                    "type": "user",
                    "timestamp": "2026-04-22T09:00:00Z",
                    "sessionId": "claude-models",
                    "cwd": "/tmp/repo",
                    "message": {"role": "user", "content": "please update"}
                })
                .to_string(),
                json!({
                    "type": "assistant",
                    "timestamp": "2026-04-22T09:00:01Z",
                    "sessionId": "claude-models",
                    "cwd": "/tmp/repo",
                    "message": {
                        "model": "<synthetic>",
                        "role": "assistant",
                        "content": [{"type": "text", "text": "API error"}]
                    },
                    "isApiErrorMessage": true
                })
                .to_string(),
                json!({
                    "type": "assistant",
                    "timestamp": "2026-04-22T09:00:02Z",
                    "sessionId": "claude-models",
                    "cwd": "/tmp/repo",
                    "message": {
                        "model": "claude-opus-4-6",
                        "role": "assistant",
                        "content": [{"type": "text", "text": "Recovered."}]
                    }
                })
                .to_string(),
                json!({
                    "type": "assistant",
                    "timestamp": "2026-04-22T09:00:03Z",
                    "sessionId": "claude-models",
                    "cwd": "/tmp/repo",
                    "message": {
                        "model": "claude-sonnet-4-5-20250929",
                        "role": "assistant",
                        "content": [{"type": "text", "text": "Switching models."}]
                    }
                })
                .to_string(),
            ]
            .join("\n"),
        )?;

        let conn = Connection::open_in_memory()?;
        init_app_schema(&conn)?;
        ingest_session(&path, &conn)?;

        let model_name: Option<String> = conn.query_row(
            "SELECT m.model_name
             FROM metadata_sessions s
             LEFT JOIN metadata_models m ON m.id = s.model_id
             WHERE s.provider='claude' AND s.session_id='claude-models'",
            [],
            |row| row.get(0),
        )?;
        assert_eq!(
            model_name.as_deref(),
            Some("claude/claude-sonnet-4-5-20250929")
        );
        Ok(())
    }
}