kaizen-cli 0.1.43

Distributable agent observability: real-time-tailable sessions, agile-style retros, and repo-level improvement (Cursor, Claude Code, Codex). SQLite, redact before any sync you enable.
Documentation
use super::*;

impl Store {
    pub(super) fn append_outbox_row(
        &self,
        owner_id: &str,
        kind: &str,
        payload: &str,
    ) -> Result<()> {
        self.conn.execute(
            "INSERT INTO sync_outbox (session_id, kind, payload, sent)
             VALUES (?1, ?2, ?3, 0)",
            params![owner_id, kind, payload],
        )?;
        Ok(())
    }

    pub fn list_outbox_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
        let mut stmt = self.conn.prepare(
            "SELECT id, kind, payload FROM sync_outbox WHERE sent = 0 ORDER BY id ASC LIMIT ?1",
        )?;
        let rows = stmt.query_map(params![limit as i64], read_outbox_row)?;
        rows.collect::<rusqlite::Result<_>>().map_err(Into::into)
    }

    pub fn mark_outbox_sent(&self, ids: &[i64]) -> Result<()> {
        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Immediate)?;
        ids.iter().try_for_each(|id| {
            tx.execute("UPDATE sync_outbox SET sent = 1 WHERE id = ?1", [id])
                .map(|_| ())
        })?;
        tx.commit()?;
        Ok(())
    }

    pub fn replace_outbox_rows(
        &self,
        owner_id: &str,
        kind: &str,
        payloads: &[String],
    ) -> Result<()> {
        let tx = rusqlite::Transaction::new_unchecked(&self.conn, TransactionBehavior::Immediate)?;
        tx.execute(
            "DELETE FROM sync_outbox WHERE session_id = ?1 AND kind = ?2 AND sent = 0",
            params![owner_id, kind],
        )?;
        insert_outbox_payloads(&tx, owner_id, kind, payloads)?;
        tx.commit()?;
        Ok(())
    }

    pub fn outbox_pending_count(&self) -> Result<u64> {
        let c: i64 =
            self.conn
                .query_row("SELECT COUNT(*) FROM sync_outbox WHERE sent = 0", [], |r| {
                    r.get(0)
                })?;
        Ok(c as u64)
    }

    pub fn set_sync_state_ok(&self) -> Result<()> {
        let now = now_ms().to_string();
        self.conn.execute(
            "INSERT INTO sync_state (k, v) VALUES ('last_success_ms', ?1)
             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
            params![now],
        )?;
        self.conn.execute(
            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', '0')
             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
            [],
        )?;
        self.conn
            .execute("DELETE FROM sync_state WHERE k = 'last_error'", [])?;
        Ok(())
    }

    pub fn set_sync_state_error(&self, msg: &str) -> Result<()> {
        let prev: i64 = self
            .conn
            .query_row(
                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
                [],
                |r| {
                    let s: String = r.get(0)?;
                    Ok(s.parse::<i64>().unwrap_or(0))
                },
            )
            .optional()?
            .unwrap_or(0);
        let next = prev.saturating_add(1);
        self.conn.execute(
            "INSERT INTO sync_state (k, v) VALUES ('last_error', ?1)
             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
            params![msg],
        )?;
        self.conn.execute(
            "INSERT INTO sync_state (k, v) VALUES ('consecutive_failures', ?1)
             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
            params![next.to_string()],
        )?;
        Ok(())
    }

    pub fn sync_status(&self) -> Result<SyncStatusSnapshot> {
        let pending_outbox = self.outbox_pending_count()?;
        let last_success_ms = self
            .conn
            .query_row(
                "SELECT v FROM sync_state WHERE k = 'last_success_ms'",
                [],
                |r| r.get::<_, String>(0),
            )
            .optional()?
            .and_then(|s| s.parse().ok());
        let last_error = self
            .conn
            .query_row("SELECT v FROM sync_state WHERE k = 'last_error'", [], |r| {
                r.get::<_, String>(0)
            })
            .optional()?;
        let consecutive_failures = self
            .conn
            .query_row(
                "SELECT v FROM sync_state WHERE k = 'consecutive_failures'",
                [],
                |r| r.get::<_, String>(0),
            )
            .optional()?
            .and_then(|s| s.parse().ok())
            .unwrap_or(0);
        Ok(SyncStatusSnapshot {
            pending_outbox,
            last_success_ms,
            last_error,
            consecutive_failures,
        })
    }

    pub fn sync_state_get_u64(&self, key: &str) -> Result<Option<u64>> {
        let row: Option<String> = self
            .conn
            .query_row("SELECT v FROM sync_state WHERE k = ?1", params![key], |r| {
                r.get::<_, String>(0)
            })
            .optional()?;
        Ok(row.and_then(|s| s.parse().ok()))
    }

    pub fn sync_state_set_u64(&self, key: &str, v: u64) -> Result<()> {
        self.conn.execute(
            "INSERT INTO sync_state (k, v) VALUES (?1, ?2)
             ON CONFLICT(k) DO UPDATE SET v = excluded.v",
            params![key, v.to_string()],
        )?;
        Ok(())
    }
}

fn read_outbox_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<(i64, String, String)> {
    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
}

fn insert_outbox_payloads(
    tx: &rusqlite::Transaction<'_>,
    owner_id: &str,
    kind: &str,
    payloads: &[String],
) -> rusqlite::Result<()> {
    payloads.iter().try_for_each(|payload| {
        tx.execute(
            "INSERT INTO sync_outbox (session_id, kind, payload, sent)
             VALUES (?1, ?2, ?3, 0)",
            params![owner_id, kind, payload],
        )
        .map(|_| ())
    })
}