paceflow 0.2.4

Local-first CLI that turns AI coding session history and git metadata into engineering analytics.
Documentation
use anyhow::Result;
use rusqlite::{Connection, OptionalExtension, params};
use std::collections::{BTreeMap, BTreeSet, HashMap};

use crate::change_intel::types::{
    ChangeOpCandidate, LineHashCount, LineSide, ParseError, SessionInfo,
};
use crate::db;

#[derive(Debug, Clone)]
pub struct IngestCursor {
    pub file_mtime: i64,
    pub file_size: i64,
}

pub fn upsert_change_session(conn: &Connection, session: &SessionInfo) -> Result<()> {
    db::upsert_metadata_session(
        conn,
        &session.provider,
        &session.session_id,
        session.session_cwd.as_deref(),
        None,
        None,
        Some(&session.source_file),
    )?;
    Ok(())
}

pub fn upsert_change_op(conn: &Connection, op: &ChangeOpCandidate) -> Result<i64> {
    if let (Some(repo_root), Some(rel_path)) = (op.repo_root.as_deref(), op.rel_path.as_deref()) {
        db::upsert_metadata_file(conn, repo_root, rel_path)?;
    } else if let Some(repo_root) = op.repo_root.as_deref() {
        db::upsert_metadata_repository(conn, repo_root)?;
    }
    db::upsert_fact_tool_write(conn, op)
}

pub fn replace_line_hashes(
    conn: &Connection,
    op_id: i64,
    line_hashes: &[LineHashCount],
) -> Result<()> {
    db::replace_fact_session_code_change_line_hashes(conn, op_id, line_hashes)
}

#[derive(Debug, Clone)]
pub struct RepoAssocState {
    pub session_facts_version: i64,
    pub task_branch_fingerprint: Option<String>,
}

#[derive(Debug, Clone, Default)]
pub struct SourceReconcileSummary {
    pub ops_upserted: usize,
    pub repos_marked_dirty: usize,
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct OpKey {
    session_id: String,
    call_id: String,
    op_index: i32,
}

#[derive(Debug, Clone)]
struct ToolWriteSnapshot {
    id: i64,
    repo_root: Option<String>,
    rel_path: Option<String>,
    line_hashes: Vec<LineHashCount>,
}

pub fn reconcile_source_tool_writes(
    conn: &Connection,
    provider: &str,
    source_file: &str,
    ops: &[ChangeOpCandidate],
) -> Result<SourceReconcileSummary> {
    let existing_rows = db::list_fact_tool_writes_by_source(conn, provider, source_file)?;
    let mut existing_by_key: BTreeMap<OpKey, ToolWriteSnapshot> = existing_rows
        .into_iter()
        .map(|row| {
            let key = OpKey {
                session_id: row.session_id,
                call_id: row.call_id,
                op_index: row.op_index,
            };
            (
                key.clone(),
                ToolWriteSnapshot {
                    id: row.id,
                    repo_root: row.repo_root,
                    rel_path: row.rel_path,
                    line_hashes: row.line_hashes,
                },
            )
        })
        .collect();

    let mut incoming_keys = BTreeSet::new();
    let mut dirty_by_repo: HashMap<String, BTreeSet<(String, String)>> = HashMap::new();
    let mut summary = SourceReconcileSummary::default();

    for op in ops {
        let key = OpKey {
            session_id: op.session_id.clone(),
            call_id: op.call_id.clone(),
            op_index: op.op_index,
        };
        if !incoming_keys.insert(key.clone()) {
            continue;
        }

        let previous = existing_by_key.remove(&key);
        let new_snapshot = ToolWriteSnapshot {
            id: -1,
            repo_root: op.repo_root.clone(),
            rel_path: op.rel_path.clone(),
            line_hashes: normalized_line_hashes(&op.line_hashes),
        };

        let op_id = upsert_change_op(conn, op)?;
        replace_line_hashes(conn, op_id, &op.line_hashes)?;
        summary.ops_upserted += 1;

        if tool_write_changed(previous.as_ref(), &new_snapshot) {
            record_dirty_snapshot(previous.as_ref(), &mut dirty_by_repo);
            record_dirty_snapshot(Some(&new_snapshot), &mut dirty_by_repo);
        }
    }

    for stale in existing_by_key.into_values() {
        record_dirty_snapshot(Some(&stale), &mut dirty_by_repo);
        db::delete_fact_session_code_change_by_id(conn, stale.id)?;
    }

    for (repo_root, hashes) in dirty_by_repo {
        if hashes.is_empty() {
            continue;
        }
        bump_repo_session_facts_version(conn, &repo_root)?;
        insert_dirty_hashes(conn, &repo_root, &hashes)?;
        summary.repos_marked_dirty += 1;
    }

    Ok(summary)
}

pub fn insert_parse_error(conn: &Connection, err: &ParseError) -> Result<()> {
    conn.execute(
        "INSERT INTO change_parse_errors (
            provider, session_id, source_file, call_id, timestamp, parser_name, error
        ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
        params![
            err.provider,
            err.session_id,
            err.source_file,
            err.call_id,
            err.timestamp,
            err.parser_name,
            err.error,
        ],
    )?;
    Ok(())
}

pub fn get_ingest_cursor(
    conn: &Connection,
    provider: &str,
    source_file: &str,
) -> Result<Option<IngestCursor>> {
    let row = conn
        .query_row(
            "SELECT file_mtime, file_size FROM ingest_cursors
             WHERE provider = ?1 AND source_file = ?2",
            params![provider, source_file],
            |r| {
                Ok(IngestCursor {
                    file_mtime: r.get(0)?,
                    file_size: r.get(1)?,
                })
            },
        )
        .optional()?;

    Ok(row)
}

pub fn upsert_ingest_cursor(
    conn: &Connection,
    provider: &str,
    source_file: &str,
    file_mtime: i64,
    file_size: i64,
) -> Result<()> {
    conn.execute(
        "INSERT INTO ingest_cursors (provider, source_file, file_mtime, file_size)
         VALUES (?1, ?2, ?3, ?4)
         ON CONFLICT(provider, source_file) DO UPDATE SET
            file_mtime = excluded.file_mtime,
            file_size = excluded.file_size,
            last_ingested_at = datetime('now')",
        params![provider, source_file, file_mtime, file_size],
    )?;
    Ok(())
}

pub fn get_repo_assoc_state(conn: &Connection, repo_root: &str) -> Result<RepoAssocState> {
    let row = conn
        .query_row(
            "SELECT session_facts_version, task_branch_fingerprint
             FROM commit_assoc_repo_state
             WHERE repo_root = ?1",
            params![repo_root],
            |row| {
                Ok(RepoAssocState {
                    session_facts_version: row.get(0)?,
                    task_branch_fingerprint: row.get(1)?,
                })
            },
        )
        .optional()?;

    Ok(row.unwrap_or(RepoAssocState {
        session_facts_version: 0,
        task_branch_fingerprint: None,
    }))
}

pub fn bump_repo_session_facts_version(conn: &Connection, repo_root: &str) -> Result<i64> {
    conn.execute(
        "INSERT INTO commit_assoc_repo_state (repo_root, session_facts_version)
         VALUES (?1, 1)
         ON CONFLICT(repo_root) DO UPDATE SET
            session_facts_version = commit_assoc_repo_state.session_facts_version + 1,
            updated_at = datetime('now')",
        params![repo_root],
    )?;

    conn.query_row(
        "SELECT session_facts_version
         FROM commit_assoc_repo_state
         WHERE repo_root = ?1",
        params![repo_root],
        |row| row.get(0),
    )
    .map_err(Into::into)
}

pub fn upsert_repo_branch_fingerprint(
    conn: &Connection,
    repo_root: &str,
    fingerprint: Option<&str>,
) -> Result<()> {
    conn.execute(
        "INSERT INTO commit_assoc_repo_state (repo_root, session_facts_version, task_branch_fingerprint)
         VALUES (?1, 0, ?2)
         ON CONFLICT(repo_root) DO UPDATE SET
            task_branch_fingerprint = excluded.task_branch_fingerprint,
            updated_at = datetime('now')",
        params![repo_root, fingerprint],
    )?;
    Ok(())
}

pub fn insert_dirty_hashes(
    conn: &Connection,
    repo_root: &str,
    hashes: &BTreeSet<(String, String)>,
) -> Result<()> {
    for (side, line_hash) in hashes {
        conn.execute(
            "INSERT INTO commit_assoc_dirty_hash (repo_root, side, line_hash)
             VALUES (?1, ?2, ?3)
             ON CONFLICT(repo_root, side, line_hash) DO NOTHING",
            params![repo_root, side, line_hash],
        )?;
    }
    Ok(())
}

pub fn list_dirty_hashes(conn: &Connection, repo_root: &str) -> Result<Vec<(String, String)>> {
    let mut stmt = conn.prepare(
        "SELECT side, line_hash
         FROM commit_assoc_dirty_hash
         WHERE repo_root = ?1
         ORDER BY side, line_hash",
    )?;
    let rows = stmt.query_map(params![repo_root], |row| Ok((row.get(0)?, row.get(1)?)))?;
    let mut out = Vec::new();
    for row in rows {
        out.push(row?);
    }
    Ok(out)
}

pub fn clear_dirty_hashes(conn: &Connection, repo_root: &str) -> Result<()> {
    conn.execute(
        "DELETE FROM commit_assoc_dirty_hash WHERE repo_root = ?1",
        params![repo_root],
    )?;
    Ok(())
}

fn tool_write_changed(previous: Option<&ToolWriteSnapshot>, next: &ToolWriteSnapshot) -> bool {
    let Some(previous) = previous else {
        return next.repo_root.is_some();
    };

    previous.repo_root != next.repo_root
        || previous.rel_path != next.rel_path
        || normalized_line_hashes(&previous.line_hashes)
            != normalized_line_hashes(&next.line_hashes)
}

fn record_dirty_snapshot(
    snapshot: Option<&ToolWriteSnapshot>,
    dirty_by_repo: &mut HashMap<String, BTreeSet<(String, String)>>,
) {
    let Some(snapshot) = snapshot else {
        return;
    };
    let Some(repo_root) = snapshot.repo_root.as_ref() else {
        return;
    };

    let entry = dirty_by_repo.entry(repo_root.clone()).or_default();
    for line_hash in normalized_line_hashes(&snapshot.line_hashes) {
        entry.insert((line_hash.side.as_str().to_string(), line_hash.line_hash));
    }
}

fn normalized_line_hashes(line_hashes: &[LineHashCount]) -> Vec<LineHashCount> {
    let mut counts: BTreeMap<(LineSide, String), i64> = BTreeMap::new();
    for line_hash in line_hashes {
        *counts
            .entry((line_hash.side, line_hash.line_hash.clone()))
            .or_insert(0) += line_hash.count;
    }

    counts
        .into_iter()
        .map(|((side, line_hash), count)| LineHashCount {
            side,
            line_hash,
            count,
        })
        .collect()
}