use crate::{CliError, CliResult};
use clap::{Args, Subcommand};
use rand::{rngs::OsRng, Rng};
use rusqlite::{
params, Connection, OpenFlags, OptionalExtension, Transaction, TransactionBehavior,
};
use sha2::{Digest, Sha256};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::time::Duration;
const CURRENT_SCHEMA_VERSION: i64 = 5;
const CANONICAL_TS_GLOB: &str = "????-??-??T??:??:??Z";
const BUSY_TIMEOUT_MS: u64 = 5000;
#[derive(Debug, Args)]
pub struct DbArgs {
#[arg(
long,
default_value = ".zynk/zynk.db",
help = "SQLite database path for live zynk state"
)]
pub db: PathBuf,
#[command(subcommand)]
pub command: DbCommand,
}
#[derive(Debug, Subcommand)]
pub enum DbCommand {
Init,
Audit(Box<DbAuditCommand>),
Import(Box<DbImportCommand>),
Export(Box<DbExportCommand>),
Serve(crate::db_dashboard::DbServeArgs),
}
#[derive(Debug, Args)]
pub struct DbAuditCommand {
#[command(subcommand)]
pub command: DbAuditSubcommand,
}
#[derive(Debug, Subcommand)]
pub enum DbAuditSubcommand {
Append(DbAuditAppendArgs),
}
#[derive(Debug, Args)]
pub struct DbAuditAppendArgs {
#[arg(long)]
pub session_id: String,
#[arg(long)]
pub audit_id: Option<String>,
#[arg(long)]
pub timestamp: String,
#[arg(long)]
pub source_agent_id: Option<String>,
#[arg(long)]
pub target_agent_id: Option<String>,
#[arg(long)]
pub source_address: String,
#[arg(long)]
pub target_address: String,
#[arg(long)]
pub transport: String,
#[arg(long)]
pub transport_thread_id: Option<String>,
#[arg(long)]
pub workspace_id: String,
#[arg(long)]
pub mid: String,
#[arg(long = "type")]
pub record_type: String,
#[arg(long)]
pub mode: Option<String>,
#[arg(long)]
pub r#ref: Option<String>,
#[arg(long)]
pub re: Option<String>,
#[arg(long, value_parser = ["agent", "operator", "helper-tool", "unknown"])]
pub command_origin: String,
#[arg(long)]
pub payload: Option<String>,
#[arg(long)]
pub payload_file: Option<PathBuf>,
#[arg(long, default_value = "hash-only")]
pub payload_redaction_policy: String,
#[arg(long, default_value_t = 12)]
pub excerpt_chars: usize,
#[arg(long)]
pub delivery_status: String,
#[arg(long)]
pub observed_by: String,
#[arg(long, value_parser = ["transport", "agent", "operator", "helper-tool"])]
pub verified_by: String,
}
#[derive(Debug, Args)]
pub struct DbImportCommand {
#[command(subcommand)]
pub command: DbImportSubcommand,
}
#[derive(Debug, Subcommand)]
pub enum DbImportSubcommand {
Outputs(DbImportOutputsArgs),
}
#[derive(Debug, Args)]
pub struct DbExportCommand {
#[command(subcommand)]
pub command: DbExportSubcommand,
}
#[derive(Debug, Subcommand)]
pub enum DbExportSubcommand {
Outputs(DbExportOutputsArgs),
}
#[derive(Debug, Args)]
#[command(
after_help = "v0.2 exports status.md and audit.md only; summary.md is not exported because the DB schema does not store summary body content."
)]
pub struct DbExportOutputsArgs {
#[arg(
long,
default_value = "outputs",
help = "runtime outputs root; writes <root>/sessions/<session-id>"
)]
pub root: PathBuf,
#[arg(long)]
pub session_id: String,
}
#[derive(Debug, Args)]
#[command(
after_help = "Legacy DB recovery: if an upgraded database warns about non-canonical \
timestamps, rebuild it from artifacts with: rm .zynk/zynk.db && zynk db init && \
zynk db import outputs --root outputs"
)]
pub struct DbImportOutputsArgs {
#[arg(
long,
default_value = "outputs",
help = "runtime outputs root; reads <root>/sessions/*"
)]
pub root: PathBuf,
#[arg(long)]
pub session_id: Option<String>,
#[arg(long)]
pub timestamp: Option<String>,
}
pub fn run(args: DbArgs) -> CliResult<()> {
match args.command {
DbCommand::Init => {
initialize(&args.db)?;
println!("{}", display_path(&args.db)?.display());
Ok(())
}
DbCommand::Audit(command) => run_audit_command(&args.db, *command),
DbCommand::Import(command) => run_import_command(&args.db, *command),
DbCommand::Export(command) => run_export_command(&args.db, *command),
DbCommand::Serve(serve_args) => crate::db_dashboard::serve(&args.db, serve_args),
}
}
fn run_audit_command(path: &Path, command: DbAuditCommand) -> CliResult<()> {
match command.command {
DbAuditSubcommand::Append(args) => {
let audit_id = append_audit_record(path, &args)?;
println!("{audit_id}");
Ok(())
}
}
}
fn run_import_command(path: &Path, command: DbImportCommand) -> CliResult<()> {
match command.command {
DbImportSubcommand::Outputs(args) => {
let summary = import_outputs(path, &args)?;
println!(
"sessions imported: {}; warnings: {}",
summary.sessions_seen, summary.warning_count
);
Ok(())
}
}
}
fn run_export_command(path: &Path, command: DbExportCommand) -> CliResult<()> {
match command.command {
DbExportSubcommand::Outputs(args) => {
let summary = export_outputs(path, &args)?;
println!(
"session exported: {}; written: {}; unchanged: {}",
args.session_id, summary.written, summary.unchanged
);
Ok(())
}
}
}
fn display_path(path: &Path) -> CliResult<PathBuf> {
if path.is_absolute() {
return Ok(path.to_path_buf());
}
std::env::current_dir()
.map(|cwd| cwd.join(path))
.map_err(|error| CliError::failure(format!("failed to read current directory: {error}")))
}
fn initialize(path: &Path) -> CliResult<()> {
open_database(path).map(|_| ())
}
pub(crate) fn open_database(path: &Path) -> CliResult<Connection> {
if let Some(parent) = path
.parent()
.filter(|parent| !parent.as_os_str().is_empty())
{
fs::create_dir_all(parent).map_err(|error| {
CliError::failure(format!("failed to create {}: {error}", parent.display()))
})?;
}
let mut connection = Connection::open(path).map_err(|error| {
CliError::failure(format!("failed to open {}: {error}", path.display()))
})?;
configure_connection(&connection)?;
migrate(&mut connection)?;
Ok(connection)
}
fn configure_connection(connection: &Connection) -> CliResult<()> {
configure_read_connection(connection)?;
connection
.pragma_update(None, "journal_mode", "WAL")
.map_err(|error| CliError::failure(format!("failed to enable SQLite WAL mode: {error}")))?;
Ok(())
}
pub(crate) fn open_read_database(path: &Path) -> CliResult<Connection> {
let connection =
Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY).map_err(|error| {
CliError::failure(format!("failed to open {}: {error}", path.display()))
})?;
configure_read_connection(&connection)?;
Ok(connection)
}
fn configure_read_connection(connection: &Connection) -> CliResult<()> {
connection
.busy_timeout(Duration::from_millis(BUSY_TIMEOUT_MS))
.map_err(|error| {
CliError::failure(format!("failed to set SQLite busy timeout: {error}"))
})?;
connection
.pragma_update(None, "foreign_keys", "ON")
.map_err(|error| {
CliError::failure(format!("failed to enable SQLite foreign keys: {error}"))
})?;
Ok(())
}
fn migrate(connection: &mut Connection) -> CliResult<()> {
let version: i64 = connection
.pragma_query_value(None, "user_version", |row| row.get(0))
.map_err(|error| {
CliError::failure(format!("failed to read SQLite schema version: {error}"))
})?;
if version > CURRENT_SCHEMA_VERSION {
return Err(CliError::failure(format!(
"database schema version {version} is newer than this zynk binary supports ({CURRENT_SCHEMA_VERSION}); upgrade zynk"
)));
}
if version == CURRENT_SCHEMA_VERSION {
return Ok(());
}
let transaction = connection
.transaction()
.map_err(|error| CliError::failure(format!("failed to start SQLite migration: {error}")))?;
create_schema_migrations(&transaction)?;
if version < 1 {
apply_v1(&transaction)?;
set_user_version(&transaction, 1)?;
}
if version < 2 {
apply_v2(&transaction)?;
set_user_version(&transaction, 2)?;
}
if version < 3 {
apply_v3(&transaction)?;
set_user_version(&transaction, 3)?;
}
if version < 4 {
apply_v4(&transaction)?;
set_user_version(&transaction, 4)?;
}
if version < 5 {
apply_v5(&transaction)?;
set_user_version(&transaction, 5)?;
}
transaction.commit().map_err(|error| {
CliError::failure(format!("failed to commit SQLite migration: {error}"))
})?;
if version < 4 {
warn_if_noncanonical_timestamps(connection)?;
}
Ok(())
}
fn warn_if_noncanonical_timestamps(connection: &Connection) -> CliResult<()> {
let count = noncanonical_timestamp_count(connection)?;
if count > 0 {
eprintln!(
"warning: {count} pre-existing row(s) carry non-canonical timestamps and will sort \
incorrectly; this upgrade does not rewrite them. Recover with: \
rm .zynk/zynk.db && zynk db init && zynk db import outputs --root outputs"
);
}
Ok(())
}
fn noncanonical_timestamp_count(connection: &Connection) -> CliResult<i64> {
let glob = CANONICAL_TS_GLOB;
let sql = format!(
"SELECT
(SELECT COUNT(*) FROM audit_records WHERE timestamp NOT GLOB '{glob}')
+ (SELECT COUNT(*) FROM status_events WHERE timestamp NOT GLOB '{glob}')
+ (SELECT COUNT(*) FROM messages WHERE timestamp NOT GLOB '{glob}')
+ (SELECT COUNT(*) FROM sessions
WHERE created_at NOT GLOB '{glob}'
OR updated_at NOT GLOB '{glob}'
OR (completed_at IS NOT NULL AND completed_at NOT GLOB '{glob}'))"
);
connection
.query_row(&sql, [], |row| row.get(0))
.map_err(|error| CliError::failure(format!("failed to scan timestamps: {error}")))
}
fn set_user_version(connection: &Connection, version: i64) -> CliResult<()> {
connection
.pragma_update(None, "user_version", version)
.map_err(|error| {
CliError::failure(format!("failed to set SQLite schema version: {error}"))
})?;
Ok(())
}
fn create_schema_migrations(connection: &Connection) -> CliResult<()> {
connection
.execute_batch(
"CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
);",
)
.map_err(|error| {
CliError::failure(format!("failed to create schema_migrations: {error}"))
})?;
Ok(())
}
fn apply_v1(connection: &Connection) -> CliResult<()> {
connection
.execute_batch(
"CREATE TABLE projects (
project_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
root_path TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE agents (
agent_id TEXT PRIMARY KEY,
display_name TEXT NOT NULL,
agent_kind TEXT NOT NULL,
current_transport TEXT,
current_address TEXT,
-- D4: nullable by design. An agent with no active session
-- legitimately has no current status; session_agents.agent_status
-- (below) is NOT NULL because an agent in a session always has one.
current_agent_status TEXT CHECK (
current_agent_status IS NULL
OR current_agent_status IN ('idle', 'working', 'blocked', 'done', 'unknown')
),
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE sessions (
session_id TEXT PRIMARY KEY,
project_id TEXT NOT NULL REFERENCES projects(project_id),
title TEXT NOT NULL,
phase TEXT NOT NULL,
mode TEXT NOT NULL,
workflow_status TEXT NOT NULL CHECK (
workflow_status IN ('idle', 'working', 'blocked', 'waiting-for-operator', 'done')
),
lead_agent_id TEXT REFERENCES agents(agent_id),
artifact_ref TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
completed_at TEXT
);
CREATE TABLE agent_addresses (
agent_id TEXT NOT NULL REFERENCES agents(agent_id),
transport TEXT NOT NULL,
workspace_id TEXT,
address TEXT NOT NULL,
first_seen_at TEXT NOT NULL,
last_seen_at TEXT NOT NULL,
PRIMARY KEY (agent_id, transport, address, first_seen_at)
);
CREATE TABLE session_agents (
session_id TEXT NOT NULL REFERENCES sessions(session_id),
agent_id TEXT NOT NULL REFERENCES agents(agent_id),
role TEXT NOT NULL,
-- D4: NOT NULL — an agent within a session always has a status
-- (the Herdr agent_status domain, distinct from sessions.workflow_status).
agent_status TEXT NOT NULL CHECK (
agent_status IN ('idle', 'working', 'blocked', 'done', 'unknown')
),
last_seen_at TEXT NOT NULL,
PRIMARY KEY (session_id, agent_id)
);
CREATE TABLE audit_records (
audit_id TEXT PRIMARY KEY,
previous_audit_id TEXT REFERENCES audit_records(audit_id),
session_id TEXT NOT NULL REFERENCES sessions(session_id),
source_agent_id TEXT,
target_agent_id TEXT,
source_address TEXT NOT NULL,
target_address TEXT NOT NULL,
transport TEXT NOT NULL,
workspace_id TEXT NOT NULL,
mid TEXT NOT NULL,
record_type TEXT NOT NULL,
command_origin TEXT NOT NULL CHECK (
command_origin IN ('agent', 'operator', 'helper-tool', 'unknown')
),
mode TEXT,
ref TEXT,
re TEXT,
payload_hash TEXT NOT NULL,
payload_redaction_policy TEXT NOT NULL CHECK (
payload_redaction_policy IN ('hash-only', 'excerpt', 'full')
),
content_size INTEGER NOT NULL,
delivery_status TEXT NOT NULL CHECK (
delivery_status IN ('drafted', 'sent', 'observed', 'failed', 'unknown')
),
observed_by TEXT NOT NULL,
verified_by TEXT NOT NULL CHECK (
verified_by IN ('transport', 'agent', 'operator', 'helper-tool')
),
timestamp TEXT NOT NULL
);
CREATE TABLE messages (
message_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL REFERENCES sessions(session_id),
mid TEXT NOT NULL,
source_agent_id TEXT REFERENCES agents(agent_id),
target_agent_id TEXT REFERENCES agents(agent_id),
message_type TEXT NOT NULL,
mode TEXT,
ref TEXT,
transport TEXT NOT NULL,
transport_thread_id TEXT,
payload_redaction_policy TEXT NOT NULL CHECK (
payload_redaction_policy IN ('hash-only', 'excerpt', 'full')
),
payload_excerpt TEXT,
payload_hash TEXT NOT NULL,
latest_delivery_status TEXT NOT NULL CHECK (
latest_delivery_status IN ('drafted', 'sent', 'observed', 'failed', 'unknown')
),
latest_verified_by TEXT NOT NULL CHECK (
latest_verified_by IN ('transport', 'agent', 'operator', 'helper-tool')
),
latest_audit_id TEXT REFERENCES audit_records(audit_id),
timestamp TEXT NOT NULL,
UNIQUE (session_id, mid)
);
CREATE TABLE status_events (
status_event_id INTEGER PRIMARY KEY,
session_id TEXT NOT NULL REFERENCES sessions(session_id),
timestamp TEXT NOT NULL,
phase TEXT NOT NULL,
mode TEXT NOT NULL,
workflow_status TEXT NOT NULL CHECK (
workflow_status IN ('idle', 'working', 'blocked', 'waiting-for-operator', 'done')
),
completed_since_last_update TEXT NOT NULL,
in_progress TEXT NOT NULL,
next_action TEXT NOT NULL,
blockers TEXT NOT NULL,
asks_for_zevs TEXT NOT NULL,
risk_or_residual_uncertainty TEXT NOT NULL,
expected_wait TEXT NOT NULL
);
CREATE TABLE artifacts (
artifact_id INTEGER PRIMARY KEY,
session_id TEXT NOT NULL REFERENCES sessions(session_id),
kind TEXT NOT NULL,
path TEXT NOT NULL,
title TEXT,
content_hash TEXT,
updated_at TEXT NOT NULL,
UNIQUE (session_id, path)
);
CREATE TABLE imports (
import_id INTEGER PRIMARY KEY,
source_kind TEXT NOT NULL CHECK (source_kind IN ('outputs', 'agent-loop')),
source_path TEXT NOT NULL,
source_hash TEXT NOT NULL,
imported_at TEXT NOT NULL,
result TEXT NOT NULL,
warning_summary TEXT NOT NULL
);
INSERT OR IGNORE INTO schema_migrations (version, name) VALUES (1, 'initial-live-state');",
)
.map_err(|error| {
CliError::failure(format!("failed to apply SQLite migration 1: {error}"))
})?;
Ok(())
}
fn apply_v2(connection: &Connection) -> CliResult<()> {
if !table_has_column(connection, "audit_records", "command_origin")? {
connection
.execute_batch(
"ALTER TABLE audit_records
ADD COLUMN command_origin TEXT NOT NULL DEFAULT 'unknown' CHECK (
command_origin IN ('agent', 'operator', 'helper-tool', 'unknown')
);",
)
.map_err(|error| {
CliError::failure(format!(
"failed to add audit_records.command_origin: {error}"
))
})?;
}
connection
.execute_batch(
"CREATE UNIQUE INDEX IF NOT EXISTS audit_records_one_child_per_previous
ON audit_records(session_id, previous_audit_id)
WHERE previous_audit_id IS NOT NULL;
CREATE TRIGGER IF NOT EXISTS audit_records_no_sent_agent_insert
BEFORE INSERT ON audit_records
WHEN NEW.delivery_status = 'sent' AND NEW.verified_by = 'agent'
BEGIN
SELECT RAISE(ABORT, 'delivery_status=sent requires transport, helper-tool, or operator verification');
END;
CREATE TRIGGER IF NOT EXISTS audit_records_no_update
BEFORE UPDATE ON audit_records
BEGIN
SELECT RAISE(ABORT, 'audit_records are append-only');
END;
CREATE TRIGGER IF NOT EXISTS audit_records_no_delete
BEFORE DELETE ON audit_records
BEGIN
SELECT RAISE(ABORT, 'audit_records are append-only');
END;
CREATE TRIGGER IF NOT EXISTS messages_no_sent_agent_insert
BEFORE INSERT ON messages
WHEN NEW.latest_delivery_status = 'sent' AND NEW.latest_verified_by = 'agent'
BEGIN
SELECT RAISE(ABORT, 'delivery_status=sent requires transport, helper-tool, or operator verification');
END;
CREATE TRIGGER IF NOT EXISTS messages_no_sent_agent_update
BEFORE UPDATE ON messages
WHEN NEW.latest_delivery_status = 'sent' AND NEW.latest_verified_by = 'agent'
BEGIN
SELECT RAISE(ABORT, 'delivery_status=sent requires transport, helper-tool, or operator verification');
END;
INSERT OR IGNORE INTO schema_migrations (version, name) VALUES (2, 'audit-chain-constraints');",
)
.map_err(|error| {
CliError::failure(format!("failed to apply SQLite migration 2: {error}"))
})?;
Ok(())
}
fn apply_v3(connection: &Connection) -> CliResult<()> {
if !table_has_column(connection, "status_events", "event_text")? {
connection
.execute_batch(
"ALTER TABLE status_events
ADD COLUMN event_text TEXT NOT NULL DEFAULT '';",
)
.map_err(|error| {
CliError::failure(format!("failed to add status_events.event_text: {error}"))
})?;
}
connection
.execute(
"INSERT OR IGNORE INTO schema_migrations (version, name)
VALUES (3, 'status-event-text')",
[],
)
.map_err(|error| {
CliError::failure(format!("failed to apply SQLite migration 3: {error}"))
})?;
Ok(())
}
fn apply_v4(connection: &Connection) -> CliResult<()> {
if !table_has_column(connection, "status_events", "content_hash")? {
connection
.execute_batch(
"ALTER TABLE status_events
ADD COLUMN content_hash TEXT NOT NULL DEFAULT '';",
)
.map_err(|error| {
CliError::failure(format!("failed to add status_events.content_hash: {error}"))
})?;
}
backfill_status_content_hash(connection)?;
let glob = CANONICAL_TS_GLOB;
connection
.execute_batch(&format!(
"CREATE TRIGGER IF NOT EXISTS audit_records_ts_canonical_insert
BEFORE INSERT ON audit_records
WHEN NEW.timestamp NOT GLOB '{glob}'
BEGIN SELECT RAISE(ABORT, 'audit_records.timestamp must be RFC3339 UTC seconds (Z)'); END;
CREATE TRIGGER IF NOT EXISTS status_events_ts_canonical_insert
BEFORE INSERT ON status_events
WHEN NEW.timestamp NOT GLOB '{glob}'
BEGIN SELECT RAISE(ABORT, 'status_events.timestamp must be RFC3339 UTC seconds (Z)'); END;
CREATE TRIGGER IF NOT EXISTS messages_ts_canonical_insert
BEFORE INSERT ON messages
WHEN NEW.timestamp NOT GLOB '{glob}'
BEGIN SELECT RAISE(ABORT, 'messages.timestamp must be RFC3339 UTC seconds (Z)'); END;
CREATE TRIGGER IF NOT EXISTS messages_ts_canonical_update
BEFORE UPDATE ON messages
WHEN NEW.timestamp NOT GLOB '{glob}'
BEGIN SELECT RAISE(ABORT, 'messages.timestamp must be RFC3339 UTC seconds (Z)'); END;
CREATE TRIGGER IF NOT EXISTS sessions_ts_canonical_insert
BEFORE INSERT ON sessions
WHEN NEW.created_at NOT GLOB '{glob}'
OR NEW.updated_at NOT GLOB '{glob}'
OR (NEW.completed_at IS NOT NULL AND NEW.completed_at NOT GLOB '{glob}')
BEGIN SELECT RAISE(ABORT, 'sessions timestamps must be RFC3339 UTC seconds (Z)'); END;
CREATE TRIGGER IF NOT EXISTS sessions_ts_canonical_update
BEFORE UPDATE ON sessions
WHEN NEW.created_at NOT GLOB '{glob}'
OR NEW.updated_at NOT GLOB '{glob}'
OR (NEW.completed_at IS NOT NULL AND NEW.completed_at NOT GLOB '{glob}')
BEGIN SELECT RAISE(ABORT, 'sessions timestamps must be RFC3339 UTC seconds (Z)'); END;"
))
.map_err(|error| {
CliError::failure(format!("failed to create v4 timestamp triggers: {error}"))
})?;
connection
.execute(
"INSERT OR IGNORE INTO schema_migrations (version, name)
VALUES (4, 'timestamp-canonical-and-content-hash')",
[],
)
.map_err(|error| {
CliError::failure(format!("failed to apply SQLite migration 4: {error}"))
})?;
Ok(())
}
fn apply_v5(connection: &Connection) -> CliResult<()> {
if !table_has_column(connection, "audit_records", "due")? {
connection
.execute_batch("ALTER TABLE audit_records ADD COLUMN due TEXT;")
.map_err(|error| {
CliError::failure(format!("failed to add audit_records.due: {error}"))
})?;
}
let glob = CANONICAL_TS_GLOB;
connection
.execute_batch(&format!(
"DROP TRIGGER IF EXISTS audit_records_ts_canonical_insert;
CREATE TRIGGER audit_records_ts_canonical_insert
BEFORE INSERT ON audit_records
WHEN NEW.timestamp NOT GLOB '{glob}'
OR (NEW.due IS NOT NULL AND NEW.due NOT GLOB '{glob}')
BEGIN SELECT RAISE(ABORT, 'audit_records.timestamp and due must be RFC3339 UTC seconds (Z)'); END;"
))
.map_err(|error| {
CliError::failure(format!("failed to update v5 due trigger: {error}"))
})?;
connection
.execute(
"INSERT OR IGNORE INTO schema_migrations (version, name)
VALUES (5, 'audit-due-and-db-canonical-write-path')",
[],
)
.map_err(|error| {
CliError::failure(format!("failed to apply SQLite migration 5: {error}"))
})?;
Ok(())
}
fn backfill_status_content_hash(connection: &Connection) -> CliResult<()> {
let mut select = connection
.prepare(
"SELECT status_event_id, phase, mode, workflow_status,
completed_since_last_update, in_progress, next_action, blockers,
asks_for_zevs, risk_or_residual_uncertainty, expected_wait, event_text
FROM status_events
WHERE content_hash = ''",
)
.map_err(|error| {
CliError::failure(format!(
"failed to read status_events for backfill: {error}"
))
})?;
let rows = select
.query_map([], |row| {
let id: i64 = row.get(0)?;
let fields = (1..=11)
.map(|i| row.get::<_, String>(i))
.collect::<Result<Vec<_>, _>>()?;
Ok((id, fields))
})
.map_err(|error| {
CliError::failure(format!(
"failed to read status_events for backfill: {error}"
))
})?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| {
CliError::failure(format!(
"failed to read status_events for backfill: {error}"
))
})?;
for (id, fields) in rows {
let refs: Vec<&str> = fields.iter().map(String::as_str).collect();
let hash = status_content_hash(&refs);
connection
.execute(
"UPDATE status_events SET content_hash = ?1 WHERE status_event_id = ?2",
params![hash, id],
)
.map_err(|error| {
CliError::failure(format!("failed to backfill content_hash: {error}"))
})?;
}
Ok(())
}
fn status_content_hash(fields: &[&str]) -> String {
let mut hasher = Sha256::new();
hasher.update(fields.join("\n").as_bytes());
format!("sha256:{:x}", hasher.finalize())
}
#[derive(Default)]
struct ImportSummary {
sessions_seen: usize,
warning_count: usize,
}
#[derive(Debug)]
pub(crate) struct ImportedStatus {
pub(crate) session_id: String,
pub(crate) last_update: String,
pub(crate) lead_agent: String,
pub(crate) phase: String,
pub(crate) mode: String,
pub(crate) artifact_ref: String,
pub(crate) workflow_status: String,
pub(crate) completed_since_last_update: String,
pub(crate) in_progress: String,
pub(crate) next_action: String,
pub(crate) blockers: String,
pub(crate) asks_for_zevs: String,
pub(crate) risk_or_residual_uncertainty: String,
pub(crate) expected_wait: String,
pub(crate) rolling_events: Vec<String>,
}
#[derive(Debug, Clone)]
pub(crate) struct ImportedAuditRecord {
pub(crate) audit_id: String,
pub(crate) previous_audit_id: Option<String>,
pub(crate) timestamp: String,
pub(crate) source_agent_id: Option<String>,
pub(crate) source_address: String,
pub(crate) target_agent_id: Option<String>,
pub(crate) target_address: String,
pub(crate) transport: String,
pub(crate) workspace_id: String,
pub(crate) session_id: String,
pub(crate) mid: String,
pub(crate) record_type: String,
pub(crate) command_origin: String,
pub(crate) mode: Option<String>,
pub(crate) r#ref: Option<String>,
pub(crate) re: Option<String>,
pub(crate) payload_hash: String,
pub(crate) payload_redaction_policy: String,
pub(crate) content_size: i64,
pub(crate) delivery_status: String,
pub(crate) observed_by: String,
pub(crate) verified_by: String,
pub(crate) due: Option<String>,
}
enum ImportedAuditInsert {
Inserted,
Existing,
RebasedToRoot { original_error: String },
}
#[derive(Default)]
struct ExportSummary {
written: usize,
unchanged: usize,
}
#[derive(Debug, Clone)]
struct ExportedAuditRecord {
audit_id: String,
previous_audit_id: Option<String>,
timestamp: String,
source_agent: String,
source_address: String,
target_agent: String,
target_address: String,
transport: String,
workspace_id: String,
session_id: String,
mid: String,
record_type: String,
command_origin: String,
mode: Option<String>,
r#ref: Option<String>,
re: Option<String>,
payload_hash: String,
payload_redaction_policy: String,
content_size: i64,
delivery_status: String,
observed_by: String,
verified_by: String,
due: Option<String>,
}
fn export_outputs(path: &Path, args: &DbExportOutputsArgs) -> CliResult<ExportSummary> {
let connection = open_read_database(path)?;
let status = load_export_status(&connection, &args.session_id)?;
let events = load_export_status_events(&connection, &args.session_id)?;
let audit_records = load_export_audit_records(&connection, &args.session_id)?;
let session_dir = args.root.join("sessions").join(&args.session_id);
let exports = [
(
session_dir.join("status.md"),
render_export_status(&status, &events),
),
(
session_dir.join("audit.md"),
render_export_audit(&args.session_id, &audit_records),
),
];
let mut summary = ExportSummary::default();
for (path, content) in exports {
if write_if_changed(&path, &content)? {
summary.written += 1;
} else {
summary.unchanged += 1;
}
}
Ok(summary)
}
fn load_export_status(connection: &Connection, session_id: &str) -> CliResult<ImportedStatus> {
connection
.query_row(
"SELECT s.session_id,
COALESCE(se.timestamp, s.updated_at),
COALESCE(s.lead_agent_id, 'unknown'),
COALESCE(se.phase, s.phase),
COALESCE(se.mode, s.mode),
COALESCE(s.artifact_ref, 'unknown'),
COALESCE(se.workflow_status, s.workflow_status),
COALESCE(se.completed_since_last_update, 'unknown'),
COALESCE(se.in_progress, 'unknown'),
COALESCE(se.next_action, 'unknown'),
COALESCE(se.blockers, 'unknown'),
COALESCE(se.asks_for_zevs, 'unknown'),
COALESCE(se.risk_or_residual_uncertainty, 'unknown'),
COALESCE(se.expected_wait, 'unknown')
FROM sessions AS s
LEFT JOIN status_events AS se
ON se.status_event_id = (
SELECT status_event_id
FROM status_events
WHERE session_id = s.session_id
ORDER BY timestamp DESC, status_event_id DESC
LIMIT 1
)
WHERE s.session_id = ?1",
[session_id],
|row| {
Ok(ImportedStatus {
session_id: row.get(0)?,
last_update: row.get(1)?,
lead_agent: row.get(2)?,
phase: row.get(3)?,
mode: row.get(4)?,
artifact_ref: row.get(5)?,
workflow_status: row.get(6)?,
completed_since_last_update: row.get(7)?,
in_progress: row.get(8)?,
next_action: row.get(9)?,
blockers: row.get(10)?,
asks_for_zevs: row.get(11)?,
risk_or_residual_uncertainty: row.get(12)?,
expected_wait: row.get(13)?,
rolling_events: Vec::new(),
})
},
)
.optional()
.map_err(|error| CliError::failure(format!("failed to load session for export: {error}")))?
.ok_or_else(|| CliError::failure(format!("session not found: {session_id}")))
}
fn load_export_status_events(connection: &Connection, session_id: &str) -> CliResult<Vec<String>> {
let mut statement = connection
.prepare(
"SELECT timestamp, workflow_status, next_action, event_text
FROM status_events
WHERE session_id = ?1
ORDER BY timestamp DESC, status_event_id DESC
LIMIT 10",
)
.map_err(|error| CliError::failure(format!("failed to load status events: {error}")))?;
let events = statement
.query_map([session_id], |row| {
let timestamp: String = row.get(0)?;
let workflow_status: String = row.get(1)?;
let next_action: String = row.get(2)?;
let event_text: String = row.get(3)?;
let events = if event_text.trim().is_empty() {
vec![format!(
"{timestamp} - status={workflow_status}; next={next_action}"
)]
} else {
event_text
.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.map(str::to_string)
.collect()
};
Ok(events)
})
.map_err(|error| CliError::failure(format!("failed to load status events: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| CliError::failure(format!("failed to read status events: {error}")))?;
let mut events = events.into_iter().flatten().collect::<Vec<_>>();
events.truncate(10);
Ok(events)
}
fn load_export_audit_records(
connection: &Connection,
session_id: &str,
) -> CliResult<Vec<ExportedAuditRecord>> {
let mut statement = connection
.prepare(
"SELECT audit_id, previous_audit_id, timestamp,
COALESCE(source_agent_id, 'unknown'), source_address,
COALESCE(target_agent_id, 'unknown'), target_address,
transport, workspace_id, session_id, mid, record_type,
command_origin, mode, ref, re, payload_hash,
payload_redaction_policy, content_size, delivery_status,
observed_by, verified_by, due
FROM audit_records
WHERE session_id = ?1
ORDER BY timestamp, audit_id",
)
.map_err(|error| CliError::failure(format!("failed to load audit records: {error}")))?;
let records = statement
.query_map([session_id], |row| {
Ok(ExportedAuditRecord {
audit_id: row.get(0)?,
previous_audit_id: row.get(1)?,
timestamp: row.get(2)?,
source_agent: row.get(3)?,
source_address: row.get(4)?,
target_agent: row.get(5)?,
target_address: row.get(6)?,
transport: row.get(7)?,
workspace_id: row.get(8)?,
session_id: row.get(9)?,
mid: row.get(10)?,
record_type: row.get(11)?,
command_origin: row.get(12)?,
mode: row.get(13)?,
r#ref: row.get(14)?,
re: row.get(15)?,
payload_hash: row.get(16)?,
payload_redaction_policy: row.get(17)?,
content_size: row.get(18)?,
delivery_status: row.get(19)?,
observed_by: row.get(20)?,
verified_by: row.get(21)?,
due: row.get(22)?,
})
})
.map_err(|error| CliError::failure(format!("failed to load audit records: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| CliError::failure(format!("failed to read audit records: {error}")))?;
Ok(order_export_audit_records(records))
}
fn order_export_audit_records(records: Vec<ExportedAuditRecord>) -> Vec<ExportedAuditRecord> {
let mut roots = Vec::new();
let mut children: HashMap<String, Vec<ExportedAuditRecord>> = HashMap::new();
for record in records {
if let Some(previous_audit_id) = &record.previous_audit_id {
children
.entry(previous_audit_id.clone())
.or_default()
.push(record);
} else {
roots.push(record);
}
}
sort_audit_records(&mut roots);
let mut ordered = Vec::new();
let mut leftover = Vec::new();
for root in roots {
let mut current = Some(root);
while let Some(record) = current {
let audit_id = record.audit_id.clone();
ordered.push(record);
current = children.remove(&audit_id).and_then(|mut child_records| {
sort_audit_records(&mut child_records);
let mut child_records = child_records.into_iter();
let next = child_records.next();
leftover.extend(child_records);
next
});
}
}
leftover.extend(children.into_values().flatten());
sort_audit_records(&mut leftover);
ordered.extend(leftover);
ordered
}
fn sort_audit_records(records: &mut [ExportedAuditRecord]) {
records.sort_by(|left, right| {
left.timestamp
.cmp(&right.timestamp)
.then_with(|| left.audit_id.cmp(&right.audit_id))
});
}
fn render_export_status(status: &ImportedStatus, events: &[String]) -> String {
let event_lines = if events.is_empty() {
"No events recorded.".to_string()
} else {
events
.iter()
.enumerate()
.map(|(index, event)| format!("{}. {event}", index + 1))
.collect::<Vec<_>>()
.join("\n")
};
format!(
"# Session Status: {session_id}\n\n\
session_id: {session_id}\n\
last_update: {timestamp}\n\
lead_agent: {lead_agent}\n\
status: {status}\n\n\
## Current State\n\n\
- phase: {phase}\n\
- mode: {mode}\n\
- artifact_ref: {artifact_ref}\n\
- completed_since_last_update: {completed}\n\
- in_progress: {in_progress}\n\
- next_action: {next_action}\n\
- blockers: {blockers}\n\
- asks_for_Zevs: {asks_for_zevs}\n\
- risk_or_residual_uncertainty: {risk}\n\
- expected_wait: {expected_wait}\n\n\
## Rolling Events\n\n\
{event_lines}\n",
session_id = status.session_id,
timestamp = status.last_update,
lead_agent = status.lead_agent,
status = status.workflow_status,
phase = status.phase,
mode = status.mode,
artifact_ref = status.artifact_ref,
completed = status.completed_since_last_update,
in_progress = status.in_progress,
next_action = status.next_action,
blockers = status.blockers,
asks_for_zevs = status.asks_for_zevs,
risk = status.risk_or_residual_uncertainty,
expected_wait = status.expected_wait,
)
}
fn render_export_audit(session_id: &str, records: &[ExportedAuditRecord]) -> String {
let record_text = records
.iter()
.map(render_export_audit_record)
.collect::<Vec<_>>()
.join("\n");
format!(
"# Audit Trail: {session_id}\n\nsession_id: {session_id}\n\n## Records\n\n{record_text}\n"
)
}
fn render_export_audit_record(record: &ExportedAuditRecord) -> String {
let mut fields = vec![
("audit_id", record.audit_id.clone()),
(
"previous_audit_id",
record
.previous_audit_id
.clone()
.unwrap_or_else(|| "none".to_string()),
),
("timestamp", record.timestamp.clone()),
("source_agent", record.source_agent.clone()),
("source_address", record.source_address.clone()),
("target_agent", record.target_agent.clone()),
("target_address", record.target_address.clone()),
("transport", record.transport.clone()),
("workspace_id", record.workspace_id.clone()),
("session_id", record.session_id.clone()),
("mid", record.mid.clone()),
("type", record.record_type.clone()),
("command_origin", record.command_origin.clone()),
("payload_hash", record.payload_hash.clone()),
(
"payload_redaction_policy",
record.payload_redaction_policy.clone(),
),
("content_size", record.content_size.to_string()),
("delivery_status", record.delivery_status.clone()),
("observed_by", record.observed_by.clone()),
("verified_by", record.verified_by.clone()),
];
for (key, value) in [
("mode", record.mode.as_ref()),
("ref", record.r#ref.as_ref()),
("re", record.re.as_ref()),
("due", record.due.as_ref()),
] {
if let Some(value) = value {
fields.push((key, value.clone()));
}
}
let fields = fields
.into_iter()
.map(|(key, value)| format!("{key}={value}"))
.collect::<Vec<_>>()
.join("\n");
format!("```text\n{fields}\n```")
}
fn write_if_changed(path: &Path, content: &str) -> CliResult<bool> {
if path.exists() {
let existing = fs::read_to_string(path).map_err(|error| {
CliError::failure(format!("failed to read {}: {error}", path.display()))
})?;
if existing == content {
return Ok(false);
}
}
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).map_err(|error| {
CliError::failure(format!("failed to create {}: {error}", parent.display()))
})?;
}
fs::write(path, content).map_err(|error| {
CliError::failure(format!("failed to write {}: {error}", path.display()))
})?;
Ok(true)
}
pub(crate) fn import_outputs_root(db_path: &Path, root: &Path) -> CliResult<()> {
import_outputs(
db_path,
&DbImportOutputsArgs {
root: root.to_path_buf(),
session_id: None,
timestamp: None,
},
)
.map(|_summary| ())
}
fn import_outputs(path: &Path, args: &DbImportOutputsArgs) -> CliResult<ImportSummary> {
let sessions_dir = args.root.join("sessions");
if !sessions_dir.exists() {
return Ok(ImportSummary::default());
}
let timestamp = match &args.timestamp {
Some(value) => normalize_arg_timestamp(value)?,
None => crate::timestamp::now_utc_seconds(),
};
let mut connection = open_database(path)?;
let mut summary = ImportSummary::default();
for entry in fs::read_dir(&sessions_dir).map_err(|error| {
CliError::failure(format!(
"failed to read {}: {error}",
sessions_dir.display()
))
})? {
let entry = entry
.map_err(|error| CliError::failure(format!("failed to read session entry: {error}")))?;
let session_dir = entry.path();
if !session_dir.is_dir() {
continue;
}
let Some(session_id) = session_dir
.file_name()
.and_then(|name| name.to_str())
.map(str::to_string)
else {
continue;
};
if args
.session_id
.as_ref()
.is_some_and(|requested| requested != &session_id)
{
continue;
}
summary.sessions_seen += 1;
let transaction = connection
.transaction_with_behavior(TransactionBehavior::Immediate)
.map_err(|error| {
CliError::failure(format!(
"failed to start SQLite import transaction: {error}"
))
})?;
let warnings = import_outputs_session(&transaction, &args.root, &session_dir, ×tamp)?;
summary.warning_count += warnings.len();
insert_import_row(
&transaction,
&session_dir,
&source_hash_for_session_dir(&session_dir)?,
×tamp,
&warnings,
)?;
transaction.commit().map_err(|error| {
CliError::failure(format!(
"failed to commit outputs import transaction: {error}"
))
})?;
}
Ok(summary)
}
fn import_outputs_session(
transaction: &Transaction<'_>,
root: &Path,
session_dir: &Path,
import_timestamp: &str,
) -> CliResult<Vec<String>> {
let mut warnings = Vec::new();
let fallback_session_id = session_dir
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("unknown")
.to_string();
let status_path = session_dir.join("status.md");
let mut status = if status_path.exists() {
Some(parse_status_artifact(&status_path, &fallback_session_id)?)
} else {
None
};
if let Some(status) = status.as_mut() {
status.last_update =
crate::timestamp::canonicalize(&status.last_update).ok_or_else(|| {
CliError::failure(format!(
"invalid status timestamp in {}: {}",
status_path.display(),
status.last_update
))
})?;
}
let session_id = status
.as_ref()
.map(|status| status.session_id.clone())
.unwrap_or(fallback_session_id);
let project_id = import_project_id(root);
ensure_project(transaction, &project_id, root, import_timestamp)?;
if let Some(status) = &status {
if status.lead_agent != "unknown" {
ensure_agent(transaction, &status.lead_agent, import_timestamp)?;
}
let inserted = transaction
.execute(
"INSERT OR IGNORE INTO sessions (
session_id, project_id, title, phase, mode, workflow_status,
lead_agent_id, artifact_ref, created_at, updated_at
)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)",
params![
status.session_id,
project_id,
status.session_id,
status.phase,
status.mode,
status.workflow_status,
nullable_unknown(&status.lead_agent),
status.artifact_ref,
status.last_update,
],
)
.map_err(|error| CliError::failure(format!("failed to import session: {error}")))?;
if inserted == 0 {
warnings.push(format!("skipped existing session {}", status.session_id));
}
insert_status_event_if_missing(transaction, status)?;
} else {
transaction
.execute(
"INSERT OR IGNORE INTO sessions (
session_id, project_id, title, phase, mode, workflow_status,
created_at, updated_at
)
VALUES (?1, ?2, ?1, 'import', 'validate', 'idle', ?3, ?3)",
params![session_id, project_id, import_timestamp],
)
.map_err(|error| CliError::failure(format!("failed to import session: {error}")))?;
}
let audit_path = session_dir.join("audit.md");
if audit_path.exists() {
let mut new_messages = HashSet::new();
for mut record in parse_audit_artifact(&audit_path, &session_id)? {
if record.verified_by.is_empty() {
warnings.push(format!(
"skipped audit_id {}: missing verified_by",
record.audit_id
));
continue;
}
match crate::timestamp::canonicalize(&record.timestamp) {
Some(canonical) => record.timestamp = canonical,
None => {
warnings.push(format!(
"skipped audit_id {}: invalid timestamp {}",
record.audit_id, record.timestamp
));
continue;
}
}
if let Some(due) = record.due.clone() {
match crate::timestamp::canonicalize(&due) {
Some(canonical) => record.due = Some(canonical),
None => {
warnings.push(format!(
"skipped audit_id {}: invalid due {due}",
record.audit_id
));
continue;
}
}
}
match insert_imported_audit_record(transaction, &record) {
Ok(ImportedAuditInsert::Inserted) => {
ensure_record_agents(transaction, &record)?;
insert_or_update_imported_message(transaction, &record, &mut new_messages)?;
}
Ok(ImportedAuditInsert::Existing) => {
warnings.push(format!("skipped existing audit_id {}", record.audit_id))
}
Ok(ImportedAuditInsert::RebasedToRoot { original_error }) => {
warnings.push(format!(
"rebased audit_id {} to chain root after insert failed: {}",
record.audit_id, original_error
));
ensure_record_agents(transaction, &record)?;
insert_or_update_imported_message(transaction, &record, &mut new_messages)?;
}
Err(error) => {
warnings.push(format!("skipped audit_id {}: {error}", record.audit_id))
}
}
}
}
Ok(warnings)
}
fn parse_status_artifact(path: &Path, fallback_session_id: &str) -> CliResult<ImportedStatus> {
let content = fs::read_to_string(path).map_err(|error| {
CliError::failure(format!("failed to read {}: {error}", path.display()))
})?;
let values = parse_markdown_key_values(&content);
Ok(ImportedStatus {
session_id: value_or(&values, "session_id", fallback_session_id),
last_update: value_or(&values, "last_update", "1970-01-01T00:00:00Z"),
lead_agent: value_or(&values, "lead_agent", "unknown"),
phase: value_or(&values, "phase", "import"),
mode: value_or(&values, "mode", "validate"),
artifact_ref: value_or(&values, "artifact_ref", "unknown"),
workflow_status: value_or(&values, "status", "idle"),
completed_since_last_update: value_or(&values, "completed_since_last_update", "unknown"),
in_progress: value_or(&values, "in_progress", "unknown"),
next_action: value_or(&values, "next_action", "unknown"),
blockers: value_or(&values, "blockers", "unknown"),
asks_for_zevs: value_or(&values, "asks_for_Zevs", "unknown"),
risk_or_residual_uncertainty: value_or(&values, "risk_or_residual_uncertainty", "unknown"),
expected_wait: value_or(&values, "expected_wait", "unknown"),
rolling_events: parse_rolling_events(&content),
})
}
fn parse_audit_artifact(
path: &Path,
fallback_session_id: &str,
) -> CliResult<Vec<ImportedAuditRecord>> {
let content = fs::read_to_string(path).map_err(|error| {
CliError::failure(format!("failed to read {}: {error}", path.display()))
})?;
let mut records = Vec::new();
for values in parse_fenced_key_value_blocks(&content) {
let audit_id = value_or(&values, "audit_id", "");
if audit_id.is_empty() {
continue;
}
records.push(ImportedAuditRecord {
audit_id,
previous_audit_id: optional_audit_id(&value_or(&values, "previous_audit_id", "none")),
timestamp: value_or(&values, "timestamp", "1970-01-01T00:00:00Z"),
source_agent_id: nullable_unknown(&value_or(&values, "source_agent", "unknown")),
source_address: value_or(&values, "source_address", "unknown"),
target_agent_id: nullable_unknown(&value_or(&values, "target_agent", "unknown")),
target_address: value_or(&values, "target_address", "unknown"),
transport: value_or(&values, "transport", "unknown"),
workspace_id: value_or(&values, "workspace_id", "unknown"),
session_id: value_or(&values, "session_id", fallback_session_id),
mid: value_or(&values, "mid", "unknown"),
record_type: value_or(&values, "type", "unknown"),
command_origin: value_or(&values, "command_origin", "unknown"),
mode: optional_text(&value_or(&values, "mode", "")),
r#ref: optional_text(&value_or(&values, "ref", "")),
re: optional_text(&value_or(&values, "re", "")),
payload_hash: value_or(&values, "payload_hash", "sha256:unknown"),
payload_redaction_policy: value_or(&values, "payload_redaction_policy", "hash-only"),
content_size: value_or(&values, "content_size", "0").parse().unwrap_or(0),
delivery_status: value_or(&values, "delivery_status", "unknown"),
observed_by: value_or(&values, "observed_by", "unknown"),
verified_by: value_or(&values, "verified_by", ""),
due: optional_text(&value_or(&values, "due", "")),
});
}
Ok(records)
}
fn parse_markdown_key_values(content: &str) -> BTreeMap<String, String> {
let mut values = BTreeMap::new();
for line in content.lines() {
let line = line.trim();
if let Some((key, value)) = line
.strip_prefix("- ")
.and_then(|line| line.split_once(": "))
{
values.insert(key.to_string(), value.to_string());
} else if let Some((key, value)) = line.split_once(": ") {
values.insert(key.to_string(), value.to_string());
}
}
values
}
fn parse_rolling_events(content: &str) -> Vec<String> {
let Some((_, events_text)) = content.split_once("## Rolling Events") else {
return Vec::new();
};
events_text
.lines()
.filter_map(|line| {
let line = line.trim();
let (prefix, entry) = line.split_once(". ")?;
prefix.parse::<usize>().ok()?;
Some(entry.to_string())
})
.collect()
}
fn parse_fenced_key_value_blocks(content: &str) -> Vec<BTreeMap<String, String>> {
let mut blocks = Vec::new();
let mut current: Option<BTreeMap<String, String>> = None;
for line in content.lines() {
let line = line.trim();
if line == "```text" {
current = Some(BTreeMap::new());
continue;
}
if line == "```" {
if let Some(values) = current.take() {
blocks.push(values);
}
continue;
}
if let Some(values) = current.as_mut() {
if let Some((key, value)) = line.split_once('=') {
values.insert(key.to_string(), value.to_string());
}
}
}
blocks
}
fn ensure_project(
transaction: &Transaction<'_>,
project_id: &str,
root: &Path,
timestamp: &str,
) -> CliResult<()> {
transaction
.execute(
"INSERT OR IGNORE INTO projects (project_id, name, root_path, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?4)",
params![
project_id,
root.file_name()
.and_then(|name| name.to_str())
.unwrap_or("outputs"),
root.display().to_string(),
timestamp,
],
)
.map_err(|error| CliError::failure(format!("failed to import project: {error}")))?;
Ok(())
}
fn ensure_agent(transaction: &Transaction<'_>, agent_id: &str, timestamp: &str) -> CliResult<()> {
transaction
.execute(
"INSERT OR IGNORE INTO agents (
agent_id, display_name, agent_kind, created_at, updated_at
)
VALUES (?1, ?1, 'imported', ?2, ?2)",
params![agent_id, timestamp],
)
.map_err(|error| CliError::failure(format!("failed to import agent: {error}")))?;
Ok(())
}
fn ensure_record_agents(
transaction: &Transaction<'_>,
record: &ImportedAuditRecord,
) -> CliResult<()> {
if let Some(agent_id) = &record.source_agent_id {
ensure_agent(transaction, agent_id, &record.timestamp)?;
}
if let Some(agent_id) = &record.target_agent_id {
ensure_agent(transaction, agent_id, &record.timestamp)?;
}
Ok(())
}
fn insert_status_event_if_missing(
transaction: &Transaction<'_>,
status: &ImportedStatus,
) -> CliResult<()> {
let event_text = status.rolling_events.join("\n");
let content_hash = status_content_hash(&[
&status.phase,
&status.mode,
&status.workflow_status,
&status.completed_since_last_update,
&status.in_progress,
&status.next_action,
&status.blockers,
&status.asks_for_zevs,
&status.risk_or_residual_uncertainty,
&status.expected_wait,
&event_text,
]);
let exists: Option<i64> = transaction
.query_row(
"SELECT status_event_id
FROM status_events
WHERE session_id = ?1 AND timestamp = ?2 AND content_hash = ?3
LIMIT 1",
params![status.session_id, status.last_update, content_hash],
|row| row.get(0),
)
.optional()
.map_err(|error| CliError::failure(format!("failed to read status events: {error}")))?;
if exists.is_some() {
return Ok(());
}
transaction
.execute(
"INSERT INTO status_events (
session_id, timestamp, phase, mode, workflow_status,
completed_since_last_update, in_progress, next_action, blockers,
asks_for_zevs, risk_or_residual_uncertainty, expected_wait, event_text,
content_hash
)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
params![
status.session_id,
status.last_update,
status.phase,
status.mode,
status.workflow_status,
status.completed_since_last_update,
status.in_progress,
status.next_action,
status.blockers,
status.asks_for_zevs,
status.risk_or_residual_uncertainty,
status.expected_wait,
event_text,
content_hash,
],
)
.map_err(|error| CliError::failure(format!("failed to import status event: {error}")))?;
Ok(())
}
fn insert_imported_audit_record(
transaction: &Transaction<'_>,
record: &ImportedAuditRecord,
) -> Result<ImportedAuditInsert, String> {
let exists: Option<String> = transaction
.query_row(
"SELECT audit_id FROM audit_records WHERE audit_id = ?1",
[&record.audit_id],
|row| row.get(0),
)
.optional()
.map_err(|error| error.to_string())?;
if exists.is_some() {
return Ok(ImportedAuditInsert::Existing);
}
match execute_imported_audit_insert(transaction, record, record.previous_audit_id.as_deref()) {
Ok(_) => Ok(ImportedAuditInsert::Inserted),
Err(error) if record.previous_audit_id.is_some() => {
let original_error = error.to_string();
match execute_imported_audit_insert(transaction, record, None) {
Ok(_) => Ok(ImportedAuditInsert::RebasedToRoot { original_error }),
Err(fallback_error) => Err(format!(
"{original_error}; root fallback failed: {fallback_error}"
)),
}
}
Err(error) => Err(error.to_string()),
}
}
fn execute_imported_audit_insert(
transaction: &Transaction<'_>,
record: &ImportedAuditRecord,
previous_audit_id: Option<&str>,
) -> rusqlite::Result<usize> {
transaction.execute(
"INSERT INTO audit_records (
audit_id, previous_audit_id, session_id, source_agent_id, target_agent_id,
source_address, target_address, transport, workspace_id, mid, record_type,
command_origin, mode, ref, re, payload_hash, payload_redaction_policy,
content_size, delivery_status, observed_by, verified_by, timestamp, due
)
VALUES (
?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11,
?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23
)",
params![
record.audit_id,
previous_audit_id,
record.session_id,
record.source_agent_id,
record.target_agent_id,
record.source_address,
record.target_address,
record.transport,
record.workspace_id,
record.mid,
record.record_type,
record.command_origin,
record.mode,
record.r#ref,
record.re,
record.payload_hash,
record.payload_redaction_policy,
record.content_size,
record.delivery_status,
record.observed_by,
record.verified_by,
record.timestamp,
record.due,
],
)
}
fn insert_or_update_imported_message(
transaction: &Transaction<'_>,
record: &ImportedAuditRecord,
new_messages: &mut HashSet<String>,
) -> CliResult<()> {
let message_id = message_id(&record.session_id, &record.mid);
let inserted = transaction
.execute(
"INSERT OR IGNORE INTO messages (
message_id, session_id, mid, source_agent_id, target_agent_id, message_type,
mode, ref, transport, payload_redaction_policy, payload_hash,
latest_delivery_status, latest_verified_by, latest_audit_id, timestamp
)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
params![
message_id,
record.session_id,
record.mid,
record.source_agent_id,
record.target_agent_id,
record.record_type,
record.mode,
record.r#ref,
record.transport,
record.payload_redaction_policy,
record.payload_hash,
record.delivery_status,
record.verified_by,
record.audit_id,
record.timestamp,
],
)
.map_err(|error| CliError::failure(format!("failed to import message: {error}")))?;
if inserted == 1 {
new_messages.insert(message_id);
return Ok(());
}
if !new_messages.contains(&message_id) {
return Ok(());
}
transaction
.execute(
"UPDATE messages
SET source_agent_id = ?2,
target_agent_id = ?3,
message_type = ?4,
mode = ?5,
ref = ?6,
transport = ?7,
payload_redaction_policy = ?8,
payload_hash = ?9,
latest_delivery_status = ?10,
latest_verified_by = ?11,
latest_audit_id = ?12,
timestamp = ?13
WHERE message_id = ?1",
params![
message_id,
record.source_agent_id,
record.target_agent_id,
record.record_type,
record.mode,
record.r#ref,
record.transport,
record.payload_redaction_policy,
record.payload_hash,
record.delivery_status,
record.verified_by,
record.audit_id,
record.timestamp,
],
)
.map_err(|error| {
CliError::failure(format!("failed to update imported message: {error}"))
})?;
Ok(())
}
pub(crate) fn resolve_projection_db(db: Option<&Path>, no_db: bool) -> Option<PathBuf> {
if no_db {
return None;
}
if let Some(path) = db {
return Some(path.to_path_buf());
}
let default = PathBuf::from(".zynk/zynk.db");
default.exists().then_some(default)
}
pub(crate) fn project_status(
db_path: &Path,
root: &Path,
status: &ImportedStatus,
) -> CliResult<()> {
let mut connection = open_database(db_path)?;
let transaction = connection
.transaction_with_behavior(TransactionBehavior::Immediate)
.map_err(|error| {
CliError::failure(format!("failed to start status projection: {error}"))
})?;
project_live_status(&transaction, root, status)?;
transaction.commit().map_err(|error| {
CliError::failure(format!("failed to commit status projection: {error}"))
})?;
Ok(())
}
fn project_live_status(
transaction: &Transaction<'_>,
root: &Path,
status: &ImportedStatus,
) -> CliResult<()> {
let project_id = import_project_id(root);
ensure_project(transaction, &project_id, root, &status.last_update)?;
if status.lead_agent != "unknown" {
ensure_agent(transaction, &status.lead_agent, &status.last_update)?;
}
transaction
.execute(
"INSERT INTO sessions (
session_id, project_id, title, phase, mode, workflow_status,
lead_agent_id, artifact_ref, created_at, updated_at
)
VALUES (?1, ?2, ?1, ?3, ?4, ?5, ?6, ?7, ?8, ?8)
ON CONFLICT(session_id) DO UPDATE SET
phase = excluded.phase,
mode = excluded.mode,
workflow_status = excluded.workflow_status,
lead_agent_id = excluded.lead_agent_id,
artifact_ref = excluded.artifact_ref,
updated_at = excluded.updated_at",
params![
status.session_id,
project_id,
status.phase,
status.mode,
status.workflow_status,
nullable_unknown(&status.lead_agent),
status.artifact_ref,
status.last_update,
],
)
.map_err(|error| CliError::failure(format!("failed to project session: {error}")))?;
insert_status_event_if_missing(transaction, status)?;
Ok(())
}
#[derive(Debug, PartialEq)]
pub(crate) enum LiveAuditProjection {
Projected,
AlreadyPresent,
Gap {
reason: String,
},
}
pub(crate) fn project_audit(
db_path: &Path,
root: &Path,
record: &ImportedAuditRecord,
) -> CliResult<LiveAuditProjection> {
let mut connection = open_database(db_path)?;
let transaction = connection
.transaction_with_behavior(TransactionBehavior::Immediate)
.map_err(|error| CliError::failure(format!("failed to start audit projection: {error}")))?;
let outcome = project_live_audit_record(&transaction, root, record)?;
transaction.commit().map_err(|error| {
CliError::failure(format!("failed to commit audit projection: {error}"))
})?;
Ok(outcome)
}
fn project_live_audit_record(
transaction: &Transaction<'_>,
root: &Path,
record: &ImportedAuditRecord,
) -> CliResult<LiveAuditProjection> {
let canonical_ts = match crate::timestamp::canonicalize(&record.timestamp) {
Some(value) => value,
None => {
return Ok(LiveAuditProjection::Gap {
reason: format!("non-canonical timestamp {}", record.timestamp),
})
}
};
let exists: Option<String> = transaction
.query_row(
"SELECT audit_id FROM audit_records WHERE audit_id = ?1",
[&record.audit_id],
|row| row.get(0),
)
.optional()
.map_err(|error| CliError::failure(format!("failed to read audit_records: {error}")))?;
if exists.is_some() {
return Ok(LiveAuditProjection::AlreadyPresent);
}
let project_id = import_project_id(root);
ensure_project(transaction, &project_id, root, &canonical_ts)?;
ensure_session_min(transaction, &project_id, &record.session_id, &canonical_ts)?;
let mut canonical_record = record.clone();
canonical_record.timestamp = canonical_ts;
match execute_imported_audit_insert(
transaction,
&canonical_record,
canonical_record.previous_audit_id.as_deref(),
) {
Ok(_) => {
ensure_record_agents(transaction, &canonical_record)?;
upsert_message_latest(transaction, &canonical_record)?;
Ok(LiveAuditProjection::Projected)
}
Err(error) => Ok(LiveAuditProjection::Gap {
reason: error.to_string(),
}),
}
}
fn ensure_session_min(
transaction: &Transaction<'_>,
project_id: &str,
session_id: &str,
timestamp: &str,
) -> CliResult<()> {
transaction
.execute(
"INSERT OR IGNORE INTO sessions (
session_id, project_id, title, phase, mode, workflow_status, created_at, updated_at
)
VALUES (?1, ?2, ?1, 'live', 'unknown', 'idle', ?3, ?3)",
params![session_id, project_id, timestamp],
)
.map_err(|error| CliError::failure(format!("failed to ensure session: {error}")))?;
Ok(())
}
fn upsert_message_latest(
transaction: &Transaction<'_>,
record: &ImportedAuditRecord,
) -> CliResult<()> {
let message_id = message_id(&record.session_id, &record.mid);
transaction
.execute(
"INSERT INTO messages (
message_id, session_id, mid, source_agent_id, target_agent_id, message_type,
mode, ref, transport, payload_redaction_policy, payload_hash,
latest_delivery_status, latest_verified_by, latest_audit_id, timestamp
)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
ON CONFLICT(session_id, mid) DO UPDATE SET
source_agent_id = excluded.source_agent_id,
target_agent_id = excluded.target_agent_id,
message_type = excluded.message_type,
mode = excluded.mode,
ref = excluded.ref,
transport = excluded.transport,
payload_redaction_policy = excluded.payload_redaction_policy,
payload_hash = excluded.payload_hash,
latest_delivery_status = excluded.latest_delivery_status,
latest_verified_by = excluded.latest_verified_by,
latest_audit_id = excluded.latest_audit_id,
timestamp = excluded.timestamp",
params![
message_id,
record.session_id,
record.mid,
record.source_agent_id,
record.target_agent_id,
record.record_type,
record.mode,
record.r#ref,
record.transport,
record.payload_redaction_policy,
record.payload_hash,
record.delivery_status,
record.verified_by,
record.audit_id,
record.timestamp,
],
)
.map_err(|error| CliError::failure(format!("failed to project message: {error}")))?;
Ok(())
}
fn insert_import_row(
transaction: &Transaction<'_>,
session_dir: &Path,
source_hash: &str,
imported_at: &str,
warnings: &[String],
) -> CliResult<()> {
let warning_summary = if warnings.is_empty() {
"none".to_string()
} else {
warnings.join("; ")
};
let result = if warnings.is_empty() {
"imported"
} else {
"imported-with-warnings"
};
transaction
.execute(
"INSERT INTO imports (
source_kind, source_path, source_hash, imported_at, result, warning_summary
)
VALUES ('outputs', ?1, ?2, ?3, ?4, ?5)",
params![
session_dir.display().to_string(),
source_hash,
imported_at,
result,
warning_summary,
],
)
.map_err(|error| CliError::failure(format!("failed to record import: {error}")))?;
Ok(())
}
fn normalize_arg_timestamp(value: &str) -> CliResult<String> {
crate::timestamp::canonicalize(value).ok_or_else(|| {
CliError::usage(format!(
"--timestamp must be RFC3339 (e.g. 2026-05-29T02:00:00Z): {value}"
))
})
}
fn append_audit_record(path: &Path, args: &DbAuditAppendArgs) -> CliResult<String> {
if args.payload.is_some() == args.payload_file.is_some() {
return Err(CliError::usage(
"exactly one of --payload or --payload-file is required",
));
}
if args.excerpt_chars < 1 {
return Err(CliError::usage("--excerpt-chars must be >= 1"));
}
if !matches!(
args.payload_redaction_policy.as_str(),
"hash-only" | "excerpt" | "full"
) {
return Err(CliError::usage(format!(
"invalid redaction policy: {}",
args.payload_redaction_policy
)));
}
if !matches!(
args.delivery_status.as_str(),
"drafted" | "sent" | "observed" | "failed" | "unknown"
) {
return Err(CliError::usage(format!(
"invalid delivery status: {}",
args.delivery_status
)));
}
let timestamp = normalize_arg_timestamp(&args.timestamp)?;
let payload = payload_from_args(args)?;
let payload_bytes = payload.as_bytes();
let payload_hash = format!("sha256:{:x}", Sha256::digest(payload_bytes));
let payload_excerpt =
payload_excerpt(&payload, args.excerpt_chars, &args.payload_redaction_policy);
let message_id = message_id(&args.session_id, &args.mid);
let mut connection = open_database(path)?;
let transaction = connection
.transaction_with_behavior(TransactionBehavior::Immediate)
.map_err(|error| {
CliError::failure(format!("failed to start SQLite audit transaction: {error}"))
})?;
let previous_audit_id = tail_audit_id(&transaction, &args.session_id)?;
let audit_id = match &args.audit_id {
Some(audit_id) => audit_id.clone(),
None => generate_audit_id(&transaction)?,
};
transaction
.execute(
"INSERT INTO audit_records (
audit_id, previous_audit_id, session_id, source_agent_id, target_agent_id,
source_address, target_address, transport, workspace_id, mid, record_type,
command_origin, mode, ref, re, payload_hash, payload_redaction_policy,
content_size, delivery_status, observed_by, verified_by, timestamp
)
VALUES (
?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11,
?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
)",
params![
audit_id,
previous_audit_id,
args.session_id,
args.source_agent_id,
args.target_agent_id,
args.source_address,
args.target_address,
args.transport,
args.workspace_id,
args.mid,
args.record_type,
args.command_origin,
args.mode,
args.r#ref,
args.re,
payload_hash,
args.payload_redaction_policy,
payload_bytes.len() as i64,
args.delivery_status,
args.observed_by,
args.verified_by,
timestamp,
],
)
.map_err(|error| CliError::failure(format!("failed to append audit record: {error}")))?;
transaction
.execute(
"INSERT INTO messages (
message_id, session_id, mid, source_agent_id, target_agent_id, message_type,
mode, ref, transport, transport_thread_id, payload_redaction_policy,
payload_excerpt, payload_hash, latest_delivery_status, latest_verified_by,
latest_audit_id, timestamp
)
VALUES (
?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17
)
ON CONFLICT(session_id, mid) DO UPDATE SET
source_agent_id = excluded.source_agent_id,
target_agent_id = excluded.target_agent_id,
message_type = excluded.message_type,
mode = excluded.mode,
ref = excluded.ref,
transport = excluded.transport,
transport_thread_id = excluded.transport_thread_id,
payload_redaction_policy = excluded.payload_redaction_policy,
payload_excerpt = excluded.payload_excerpt,
payload_hash = excluded.payload_hash,
latest_delivery_status = excluded.latest_delivery_status,
latest_verified_by = excluded.latest_verified_by,
latest_audit_id = excluded.latest_audit_id,
timestamp = excluded.timestamp",
params![
message_id,
args.session_id,
args.mid,
args.source_agent_id,
args.target_agent_id,
args.record_type,
args.mode,
args.r#ref,
args.transport,
args.transport_thread_id,
args.payload_redaction_policy,
payload_excerpt,
payload_hash,
args.delivery_status,
args.verified_by,
audit_id,
timestamp,
],
)
.map_err(|error| CliError::failure(format!("failed to update message state: {error}")))?;
transaction.commit().map_err(|error| {
CliError::failure(format!("failed to commit audit transaction: {error}"))
})?;
Ok(audit_id)
}
fn payload_from_args(args: &DbAuditAppendArgs) -> CliResult<String> {
if let Some(path) = &args.payload_file {
let mut payload = String::new();
fs::File::open(path)
.and_then(|mut file| file.read_to_string(&mut payload))
.map_err(|error| {
CliError::failure(format!(
"failed to read payload file {}: {error}",
path.display()
))
})?;
return Ok(payload);
}
Ok(args.payload.clone().unwrap_or_default())
}
fn payload_excerpt(payload: &str, chars: usize, policy: &str) -> Option<String> {
match policy {
"hash-only" => None,
"full" => Some(payload.to_string()),
_ => {
let count = payload.chars().count();
if count <= chars * 2 {
Some(payload.to_string())
} else {
let start = payload.chars().take(chars).collect::<String>();
let end = payload
.chars()
.rev()
.take(chars)
.collect::<String>()
.chars()
.rev()
.collect::<String>();
Some(format!("{start}...{end}"))
}
}
}
}
fn tail_audit_id(transaction: &Transaction<'_>, session_id: &str) -> CliResult<Option<String>> {
transaction
.query_row(
"SELECT record.audit_id
FROM audit_records AS record
WHERE record.session_id = ?1
AND NOT EXISTS (
SELECT 1
FROM audit_records AS child
WHERE child.session_id = record.session_id
AND child.previous_audit_id = record.audit_id
)
ORDER BY record.timestamp DESC, record.audit_id DESC
LIMIT 1",
[session_id],
|row| row.get(0),
)
.optional()
.map_err(|error| CliError::failure(format!("failed to read audit chain tail: {error}")))
}
fn generate_audit_id(transaction: &Transaction<'_>) -> CliResult<String> {
let mut statement = transaction
.prepare("SELECT audit_id FROM audit_records")
.map_err(|error| CliError::failure(format!("failed to read audit ids: {error}")))?;
let existing_ids = statement
.query_map([], |row| row.get::<_, String>(0))
.map_err(|error| CliError::failure(format!("failed to read audit ids: {error}")))?
.collect::<Result<HashSet<_>, _>>()
.map_err(|error| CliError::failure(format!("failed to read audit ids: {error}")))?;
const ALPHABET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
let mut rng = OsRng;
for _ in 0..100 {
let audit_id = (0..6)
.map(|_| {
let index = rng.gen_range(0..ALPHABET.len());
ALPHABET[index] as char
})
.collect::<String>();
if !existing_ids.contains(&audit_id) {
return Ok(audit_id);
}
}
Err(CliError::usage(
"could not generate unique audit_id after 100 attempts",
))
}
fn message_id(session_id: &str, mid: &str) -> String {
let digest = Sha256::digest(format!("{session_id}\0{mid}").as_bytes());
format!("msg-{digest:x}")
}
fn import_project_id(root: &Path) -> String {
let digest = Sha256::digest(root.display().to_string().as_bytes());
let hex = format!("{digest:x}");
format!("project-{}", &hex[..16])
}
fn source_hash_for_session_dir(session_dir: &Path) -> CliResult<String> {
let mut hasher = Sha256::new();
for file_name in ["status.md", "audit.md", "summary.md"] {
let path = session_dir.join(file_name);
if path.exists() {
hasher.update(file_name.as_bytes());
let content = fs::read(&path).map_err(|error| {
CliError::failure(format!("failed to read {}: {error}", path.display()))
})?;
hasher.update(content);
}
}
Ok(format!("sha256:{:x}", hasher.finalize()))
}
fn value_or(values: &BTreeMap<String, String>, key: &str, fallback: &str) -> String {
values
.get(key)
.filter(|value| !value.is_empty())
.cloned()
.unwrap_or_else(|| fallback.to_string())
}
fn optional_text(value: &str) -> Option<String> {
if value.is_empty() || value == "unknown" {
None
} else {
Some(value.to_string())
}
}
fn optional_audit_id(value: &str) -> Option<String> {
if matches!(value, "" | "none" | "unknown") {
None
} else {
Some(value.to_string())
}
}
fn nullable_unknown(value: &str) -> Option<String> {
if value.is_empty() || value == "unknown" {
None
} else {
Some(value.to_string())
}
}
fn table_has_column(connection: &Connection, table: &str, column: &str) -> CliResult<bool> {
let mut statement = connection
.prepare(&format!("PRAGMA table_info({table})"))
.map_err(|error| {
CliError::failure(format!("failed to inspect SQLite table {table}: {error}"))
})?;
let columns = statement
.query_map([], |row| row.get::<_, String>(1))
.map_err(|error| {
CliError::failure(format!("failed to inspect SQLite table {table}: {error}"))
})?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| {
CliError::failure(format!("failed to inspect SQLite table {table}: {error}"))
})?;
Ok(columns.iter().any(|name| name == column))
}
#[cfg(test)]
mod tests {
use super::*;
fn open_v3(connection: &Connection) {
create_schema_migrations(connection).unwrap();
apply_v1(connection).unwrap();
apply_v2(connection).unwrap();
apply_v3(connection).unwrap();
connection.pragma_update(None, "user_version", 3).unwrap();
}
#[test]
fn v4_backfills_content_hash_for_legacy_status_event() {
let connection = Connection::open_in_memory().unwrap();
open_v3(&connection);
connection
.execute(
"INSERT INTO projects (project_id, name, root_path, created_at, updated_at)
VALUES ('p', 'p', '/p', '2026-05-29T00:00:00Z', '2026-05-29T00:00:00Z')",
[],
)
.unwrap();
connection
.execute(
"INSERT INTO sessions (session_id, project_id, title, phase, mode, workflow_status, created_at, updated_at)
VALUES ('s', 'p', 's', 'tooling', 'review', 'working', '2026-05-29T00:00:00Z', '2026-05-29T00:00:00Z')",
[],
)
.unwrap();
connection
.execute(
"INSERT INTO status_events (session_id, timestamp, phase, mode, workflow_status,
completed_since_last_update, in_progress, next_action, blockers,
asks_for_zevs, risk_or_residual_uncertainty, expected_wait, event_text)
VALUES ('s', '2026-05-29T01:00:00Z', 'tooling', 'review', 'working',
'c', 'p', 'next-A', 'none', 'none', 'none', 'unknown', 'event one')",
[],
)
.unwrap();
apply_v4(&connection).unwrap();
let stored: String = connection
.query_row(
"SELECT content_hash FROM status_events WHERE session_id='s'",
[],
|row| row.get(0),
)
.unwrap();
let expected = status_content_hash(&[
"tooling",
"review",
"working",
"c",
"p",
"next-A",
"none",
"none",
"none",
"unknown",
"event one",
]);
assert_eq!(
stored, expected,
"legacy row must be backfilled with the canonical hash"
);
assert!(stored.starts_with("sha256:"));
}
#[test]
fn noncanonical_scan_counts_legacy_offset_rows() {
let connection = Connection::open_in_memory().unwrap();
open_v3(&connection);
connection
.execute(
"INSERT INTO projects (project_id, name, root_path, created_at, updated_at)
VALUES ('p', 'p', '/p', '2026-05-29T00:00:00Z', '2026-05-29T00:00:00Z')",
[],
)
.unwrap();
connection
.execute(
"INSERT INTO sessions (session_id, project_id, title, phase, mode, workflow_status, created_at, updated_at)
VALUES ('s', 'p', 's', 'x', 'review', 'working', '2026-05-29T00:00:00Z', '2026-05-28T18:00:00+07:00')",
[],
)
.unwrap();
assert_eq!(noncanonical_timestamp_count(&connection).unwrap(), 1);
apply_v4(&connection).unwrap();
assert_eq!(noncanonical_timestamp_count(&connection).unwrap(), 1);
}
#[test]
fn migrated_legacy_status_row_dedups_on_reimport() {
let mut connection = Connection::open_in_memory().unwrap();
open_v3(&connection);
connection
.execute(
"INSERT INTO projects (project_id, name, root_path, created_at, updated_at)
VALUES ('p', 'p', '/p', '2026-05-29T00:00:00Z', '2026-05-29T00:00:00Z')",
[],
)
.unwrap();
connection
.execute(
"INSERT INTO sessions (session_id, project_id, title, phase, mode, workflow_status, created_at, updated_at)
VALUES ('s', 'p', 's', 'tooling', 'review', 'working', '2026-05-29T00:00:00Z', '2026-05-29T00:00:00Z')",
[],
)
.unwrap();
connection
.execute(
"INSERT INTO status_events (session_id, timestamp, phase, mode, workflow_status,
completed_since_last_update, in_progress, next_action, blockers,
asks_for_zevs, risk_or_residual_uncertainty, expected_wait, event_text)
VALUES ('s', '2026-05-29T01:00:00Z', 'tooling', 'review', 'working',
'c', 'p', 'next-A', 'none', 'none', 'none', 'unknown', 'event one')",
[],
)
.unwrap();
apply_v4(&connection).unwrap();
let status = ImportedStatus {
session_id: "s".to_string(),
last_update: "2026-05-29T01:00:00Z".to_string(),
lead_agent: "codex".to_string(),
phase: "tooling".to_string(),
mode: "review".to_string(),
artifact_ref: "tools/x".to_string(),
workflow_status: "working".to_string(),
completed_since_last_update: "c".to_string(),
in_progress: "p".to_string(),
next_action: "next-A".to_string(),
blockers: "none".to_string(),
asks_for_zevs: "none".to_string(),
risk_or_residual_uncertainty: "none".to_string(),
expected_wait: "unknown".to_string(),
rolling_events: vec!["event one".to_string()],
};
let tx = connection.transaction().unwrap();
insert_status_event_if_missing(&tx, &status).unwrap();
tx.commit().unwrap();
let count: i64 = connection
.query_row(
"SELECT COUNT(*) FROM status_events WHERE session_id='s' AND timestamp='2026-05-29T01:00:00Z'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
count, 1,
"re-import of a backfilled legacy row must dedup, not duplicate"
);
}
#[test]
fn schema_check_enums_match_canonical_values() {
let connection = Connection::open_in_memory().unwrap();
open_v3(&connection);
apply_v4(&connection).unwrap();
let table_sql = |name: &str| -> String {
connection
.query_row(
"SELECT sql FROM sqlite_master WHERE type='table' AND name=?1",
[name],
|row| row.get(0),
)
.unwrap()
};
let expectations: &[(&str, &str)] = &[
(
"sessions",
"workflow_status IN ('idle', 'working', 'blocked', 'waiting-for-operator', 'done')",
),
(
"status_events",
"workflow_status IN ('idle', 'working', 'blocked', 'waiting-for-operator', 'done')",
),
(
"agents",
"current_agent_status IN ('idle', 'working', 'blocked', 'done', 'unknown')",
),
(
"session_agents",
"agent_status IN ('idle', 'working', 'blocked', 'done', 'unknown')",
),
(
"audit_records",
"command_origin IN ('agent', 'operator', 'helper-tool', 'unknown')",
),
(
"audit_records",
"payload_redaction_policy IN ('hash-only', 'excerpt', 'full')",
),
(
"audit_records",
"delivery_status IN ('drafted', 'sent', 'observed', 'failed', 'unknown')",
),
(
"audit_records",
"verified_by IN ('transport', 'agent', 'operator', 'helper-tool')",
),
(
"messages",
"payload_redaction_policy IN ('hash-only', 'excerpt', 'full')",
),
(
"messages",
"latest_delivery_status IN ('drafted', 'sent', 'observed', 'failed', 'unknown')",
),
(
"messages",
"latest_verified_by IN ('transport', 'agent', 'operator', 'helper-tool')",
),
];
for (table, fragment) in expectations {
let sql = table_sql(table);
assert!(
sql.contains(fragment),
"{table} CHECK drifted from canonical enum; expected exact fragment: {fragment}"
);
}
}
fn open_v4(connection: &Connection) {
create_schema_migrations(connection).unwrap();
apply_v1(connection).unwrap();
apply_v2(connection).unwrap();
apply_v3(connection).unwrap();
apply_v4(connection).unwrap();
connection.pragma_update(None, "user_version", 4).unwrap();
}
fn seed_audit_session(connection: &Connection) {
connection
.execute(
"INSERT INTO projects (project_id, name, root_path, created_at, updated_at)
VALUES ('p', 'p', '/p', '2026-05-29T00:00:00Z', '2026-05-29T00:00:00Z')",
[],
)
.unwrap();
connection
.execute(
"INSERT INTO sessions (session_id, project_id, title, phase, mode, workflow_status, created_at, updated_at)
VALUES ('s', 'p', 's', 'x', 'review', 'working', '2026-05-29T00:00:00Z', '2026-05-29T00:00:00Z')",
[],
)
.unwrap();
}
fn insert_audit_with_due(
connection: &Connection,
audit_id: &str,
due: Option<&str>,
) -> rusqlite::Result<usize> {
connection.execute(
"INSERT INTO audit_records (audit_id, previous_audit_id, session_id, source_address,
target_address, transport, workspace_id, mid, record_type, command_origin,
payload_hash, payload_redaction_policy, content_size, delivery_status, observed_by,
verified_by, timestamp, due)
VALUES (?1, NULL, 's', 'src', 'tgt', 'herdr', 'w', ?1, 'status-update', 'agent',
'sha256:x', 'hash-only', 0, 'observed', 'codex', 'helper-tool',
'2026-05-29T01:00:00Z', ?2)",
params![audit_id, due],
)
}
#[test]
fn v5_adds_due_column_and_preserves_existing_audit_rows() {
let connection = Connection::open_in_memory().unwrap();
open_v4(&connection);
seed_audit_session(&connection);
connection
.execute(
"INSERT INTO audit_records (audit_id, previous_audit_id, session_id, source_address,
target_address, transport, workspace_id, mid, record_type, command_origin,
payload_hash, payload_redaction_policy, content_size, delivery_status,
observed_by, verified_by, timestamp)
VALUES ('a1', NULL, 's', 'src', 'tgt', 'herdr', 'w', 'm1', 'status-update',
'agent', 'sha256:x', 'hash-only', 0, 'observed', 'codex', 'helper-tool',
'2026-05-29T01:00:00Z')",
[],
)
.unwrap();
assert!(!table_has_column(&connection, "audit_records", "due").unwrap());
apply_v5(&connection).unwrap();
assert!(table_has_column(&connection, "audit_records", "due").unwrap());
let due: Option<String> = connection
.query_row(
"SELECT due FROM audit_records WHERE audit_id='a1'",
[],
|row| row.get(0),
)
.unwrap();
assert!(
due.is_none(),
"existing audit row must read due=NULL after v5"
);
}
#[test]
fn v5_due_shape_trigger_accepts_null_and_canonical_rejects_malformed() {
let connection = Connection::open_in_memory().unwrap();
open_v4(&connection);
apply_v5(&connection).unwrap();
seed_audit_session(&connection);
insert_audit_with_due(&connection, "a1", None).expect("NULL due must be accepted");
insert_audit_with_due(&connection, "a2", Some("2026-06-01T12:00:00Z"))
.expect("canonical due must be accepted");
assert!(
insert_audit_with_due(&connection, "a3", Some("2026-06-01 12:00:00")).is_err(),
"shape-violating due (no T/Z) must be rejected by the trigger"
);
}
#[test]
fn migrate_advances_v4_db_to_v5_with_due_column() {
let mut connection = Connection::open_in_memory().unwrap();
open_v4(&connection);
assert!(!table_has_column(&connection, "audit_records", "due").unwrap());
migrate(&mut connection).unwrap();
let version: i64 = connection
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, 5);
assert!(table_has_column(&connection, "audit_records", "due").unwrap());
migrate(&mut connection).unwrap();
let version_again: i64 = connection
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version_again, 5);
}
#[test]
fn live_status_projection_upserts_session_current_state() {
let mut connection = Connection::open_in_memory().unwrap();
open_v4(&connection);
apply_v5(&connection).unwrap();
let root = Path::new("outputs");
let make =
|phase: &str, status: &str, lead: &str, artifact: &str, ts: &str| ImportedStatus {
session_id: "s".to_string(),
last_update: ts.to_string(),
lead_agent: lead.to_string(),
phase: phase.to_string(),
mode: "build".to_string(),
artifact_ref: artifact.to_string(),
workflow_status: status.to_string(),
completed_since_last_update: "c".to_string(),
in_progress: "p".to_string(),
next_action: "n".to_string(),
blockers: "none".to_string(),
asks_for_zevs: "none".to_string(),
risk_or_residual_uncertainty: "none".to_string(),
expected_wait: "unknown".to_string(),
rolling_events: vec![format!("event-{phase}")],
};
{
let tx = connection.transaction().unwrap();
project_live_status(
&tx,
root,
&make("p1", "working", "claude", "a1", "2026-05-29T01:00:00Z"),
)
.unwrap();
tx.commit().unwrap();
}
{
let tx = connection.transaction().unwrap();
project_live_status(
&tx,
root,
&make("p2", "blocked", "codex", "a2", "2026-05-29T02:00:00Z"),
)
.unwrap();
tx.commit().unwrap();
}
let (phase, status, lead, artifact): (String, String, Option<String>, Option<String>) =
connection
.query_row(
"SELECT phase, workflow_status, lead_agent_id, artifact_ref
FROM sessions WHERE session_id='s'",
[],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
)
.unwrap();
assert_eq!(phase, "p2", "session phase must reflect the latest write");
assert_eq!(status, "blocked");
assert_eq!(lead.as_deref(), Some("codex"));
assert_eq!(artifact.as_deref(), Some("a2"));
let events: i64 = connection
.query_row(
"SELECT COUNT(*) FROM status_events WHERE session_id='s'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(events, 2, "two distinct status writes append two events");
}
fn open_v5(connection: &Connection) {
open_v4(connection);
apply_v5(connection).unwrap();
connection
.pragma_update(None, "foreign_keys", "ON")
.unwrap();
}
fn live_audit_record(audit_id: &str, previous: Option<&str>, ts: &str) -> ImportedAuditRecord {
ImportedAuditRecord {
audit_id: audit_id.to_string(),
previous_audit_id: previous.map(str::to_string),
timestamp: ts.to_string(),
source_agent_id: Some("claude".to_string()),
source_address: "w-1".to_string(),
target_agent_id: Some("codex".to_string()),
target_address: "w-2".to_string(),
transport: "herdr".to_string(),
workspace_id: "w".to_string(),
session_id: "s".to_string(),
mid: audit_id.to_string(),
record_type: "status-update".to_string(),
command_origin: "agent".to_string(),
mode: None,
r#ref: None,
re: None,
payload_hash: "sha256:x".to_string(),
payload_redaction_policy: "hash-only".to_string(),
content_size: 0,
delivery_status: "observed".to_string(),
observed_by: "codex".to_string(),
verified_by: "helper-tool".to_string(),
due: None,
}
}
fn project_one_audit(
connection: &mut Connection,
record: &ImportedAuditRecord,
) -> LiveAuditProjection {
let tx = connection.transaction().unwrap();
let outcome = project_live_audit_record(&tx, Path::new("outputs"), record).unwrap();
tx.commit().unwrap();
outcome
}
#[test]
fn live_audit_projection_preserves_file_chain_identity() {
let mut connection = Connection::open_in_memory().unwrap();
open_v5(&connection);
assert_eq!(
project_one_audit(
&mut connection,
&live_audit_record("a1", None, "2026-05-29T01:00:00Z")
),
LiveAuditProjection::Projected
);
assert_eq!(
project_one_audit(
&mut connection,
&live_audit_record("a2", Some("a1"), "2026-05-29T02:00:00Z")
),
LiveAuditProjection::Projected
);
let previous: Option<String> = connection
.query_row(
"SELECT previous_audit_id FROM audit_records WHERE audit_id='a2'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
previous.as_deref(),
Some("a1"),
"previous must be the file value verbatim, not DB-tail-derived"
);
}
#[test]
fn live_audit_projection_gap_on_missing_previous_without_rebase() {
let mut connection = Connection::open_in_memory().unwrap();
open_v5(&connection);
let outcome = project_one_audit(
&mut connection,
&live_audit_record("a9", Some("ghost"), "2026-05-29T03:00:00Z"),
);
assert!(
matches!(outcome, LiveAuditProjection::Gap { .. }),
"missing previous must be a Gap, got {outcome:?}"
);
let count: i64 = connection
.query_row(
"SELECT COUNT(*) FROM audit_records WHERE audit_id='a9'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
count, 0,
"a gapped record must not be inserted (no live rebase)"
);
}
#[test]
fn live_audit_projection_is_idempotent_on_reprojection() {
let mut connection = Connection::open_in_memory().unwrap();
open_v5(&connection);
assert_eq!(
project_one_audit(
&mut connection,
&live_audit_record("a1", None, "2026-05-29T01:00:00Z")
),
LiveAuditProjection::Projected
);
assert_eq!(
project_one_audit(
&mut connection,
&live_audit_record("a1", None, "2026-05-29T01:00:00Z")
),
LiveAuditProjection::AlreadyPresent
);
let count: i64 = connection
.query_row(
"SELECT COUNT(*) FROM audit_records WHERE audit_id='a1'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 1);
}
}