kaizen-cli 0.1.43

Distributable agent observability: real-time-tailable sessions, agile-style retros, and repo-level improvement (Cursor, Claude Code, Codex). SQLite, redact before any sync you enable.
Documentation
use super::*;
use crate::store::outbox_redb::{LegacyOutbox, LegacyOutboxRow};
use anyhow::bail;
use rusqlite::Transaction;

const MARKER_KEY: &str = "outbox_redb_migration_v1_digest";
const REPLACEMENT_KINDS: [&str; 3] = ["tool_spans", "repo_snapshots", "workspace_facts"];

#[derive(Clone, Debug)]
struct QueueRow {
    owner_id: String,
    kind: String,
    payload: String,
}

pub(super) fn migrate(conn: &Connection, root: &Path) -> Result<()> {
    let source = root.join("hot").join("outbox.redb");
    if !source.try_exists()? {
        return Ok(());
    }
    let legacy = LegacyOutbox::open(&source)?;
    let rows = legacy.pending_rows()?;
    migrate_rows(conn, &rows, &digest(&rows))?;
    drop(legacy);
    rename_source(&source)
}

fn migrate_rows(conn: &Connection, legacy: &[LegacyOutboxRow], digest: &str) -> Result<()> {
    let tx = Transaction::new_unchecked(conn, TransactionBehavior::Immediate)?;
    if marker_matches(&tx, digest)? {
        tx.commit()?;
        return Ok(());
    }
    if legacy.is_empty() {
        return commit_marker(tx, digest);
    }
    let merged = reconcile(legacy, sqlite_pending(&tx)?);
    replace_pending(&tx, &merged)?;
    commit_marker(tx, digest)
}

fn commit_marker(tx: Transaction<'_>, digest: &str) -> Result<()> {
    write_marker(&tx, digest)?;
    tx.commit()?;
    Ok(())
}

fn sqlite_pending(tx: &Transaction<'_>) -> Result<Vec<QueueRow>> {
    let mut stmt = tx.prepare(
        "SELECT session_id, kind, payload FROM sync_outbox
         WHERE sent = 0 ORDER BY id",
    )?;
    let rows = stmt.query_map([], read_queue_row)?;
    rows.collect::<rusqlite::Result<_>>().map_err(Into::into)
}

fn read_queue_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<QueueRow> {
    Ok(QueueRow {
        owner_id: row.get(0)?,
        kind: row.get(1)?,
        payload: row.get(2)?,
    })
}

fn reconcile(legacy: &[LegacyOutboxRow], sqlite: Vec<QueueRow>) -> Vec<QueueRow> {
    let replacements = replacement_keys(legacy);
    let mut exact = exact_counts(legacy);
    let unmatched = sqlite
        .into_iter()
        .filter(|row| keep_sqlite(row, &replacements, &mut exact));
    legacy.iter().map(QueueRow::from).chain(unmatched).collect()
}

fn replacement_keys(rows: &[LegacyOutboxRow]) -> HashSet<(String, String)> {
    rows.iter()
        .filter(|row| is_replacement(&row.kind))
        .map(|row| (row.owner_id.clone(), row.kind.clone()))
        .collect()
}

fn exact_counts(rows: &[LegacyOutboxRow]) -> HashMap<(String, String, String), usize> {
    rows.iter()
        .filter(|row| !is_replacement(&row.kind))
        .fold(HashMap::new(), |mut counts, row| {
            *counts.entry(exact_legacy(row)).or_default() += 1;
            counts
        })
}

fn keep_sqlite(
    row: &QueueRow,
    replacements: &HashSet<(String, String)>,
    exact: &mut HashMap<(String, String, String), usize>,
) -> bool {
    if replacements.contains(&(row.owner_id.clone(), row.kind.clone())) {
        return false;
    }
    consume_exact(exact, exact_sqlite(row))
}

fn consume_exact(
    counts: &mut HashMap<(String, String, String), usize>,
    key: (String, String, String),
) -> bool {
    let Some(count) = counts.get_mut(&key) else {
        return true;
    };
    if *count == 0 {
        return true;
    }
    *count -= 1;
    false
}

fn replace_pending(tx: &Transaction<'_>, rows: &[QueueRow]) -> Result<()> {
    tx.execute("DELETE FROM sync_outbox WHERE sent = 0", [])?;
    rows.iter().try_for_each(|row| {
        tx.execute(
            "INSERT INTO sync_outbox (session_id, kind, payload, sent)
             VALUES (?1, ?2, ?3, 0)",
            params![row.owner_id, row.kind, row.payload],
        )
        .map(|_| ())
    })?;
    Ok(())
}

fn marker_matches(tx: &Transaction<'_>, digest: &str) -> Result<bool> {
    let marker = tx
        .query_row(
            "SELECT v FROM sync_state WHERE k = ?1",
            [MARKER_KEY],
            |row| row.get::<_, String>(0),
        )
        .optional()?;
    Ok(marker.as_deref() == Some(digest))
}

fn write_marker(tx: &Transaction<'_>, digest: &str) -> Result<()> {
    tx.execute(
        "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
         ON CONFLICT(k) DO UPDATE SET v = excluded.v",
        params![MARKER_KEY, digest],
    )?;
    Ok(())
}

fn digest(rows: &[LegacyOutboxRow]) -> String {
    let mut hasher = blake3::Hasher::new();
    hasher.update(b"kaizen-outbox-migration-v1");
    rows.iter().for_each(|row| hash_row(&mut hasher, row));
    hasher.finalize().to_hex().to_string()
}

fn hash_row(hasher: &mut blake3::Hasher, row: &LegacyOutboxRow) {
    hasher.update(&row.id.to_le_bytes());
    hash_field(hasher, row.owner_id.as_bytes());
    hash_field(hasher, row.kind.as_bytes());
    hash_field(hasher, row.payload.as_bytes());
}

fn hash_field(hasher: &mut blake3::Hasher, value: &[u8]) {
    hasher.update(&(value.len() as u64).to_le_bytes());
    hasher.update(value);
}

fn exact_legacy(row: &LegacyOutboxRow) -> (String, String, String) {
    (row.owner_id.clone(), row.kind.clone(), row.payload.clone())
}

fn exact_sqlite(row: &QueueRow) -> (String, String, String) {
    (row.owner_id.clone(), row.kind.clone(), row.payload.clone())
}

fn is_replacement(kind: &str) -> bool {
    REPLACEMENT_KINDS.contains(&kind)
}

fn rename_source(source: &Path) -> Result<()> {
    let target = source.with_file_name("outbox.redb.migrated-v1");
    if target.try_exists()? {
        bail!(
            "legacy outbox migration target exists: {}",
            target.display()
        );
    }
    std::fs::rename(source, &target)
        .with_context(|| format!("archive legacy outbox: {}", source.display()))?;
    Ok(())
}

impl From<&LegacyOutboxRow> for QueueRow {
    fn from(row: &LegacyOutboxRow) -> Self {
        Self {
            owner_id: row.owner_id.clone(),
            kind: row.kind.clone(),
            payload: row.payload.clone(),
        }
    }
}