zynk 1.0.0

Portable protocol and helper CLI for multi-agent collaboration.
use crate::{CliError, CliResult};
use clap::Args;
use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};

#[derive(Debug, Args)]
pub struct DashboardArgs {
    #[arg(
        long,
        default_value = "outputs",
        help = "runtime outputs root; reads <root>/sessions/*/status.md"
    )]
    pub root: PathBuf,
    #[arg(long)]
    pub output: Option<PathBuf>,
    #[arg(long)]
    pub timestamp: Option<String>,
}

#[derive(Debug, Default)]
struct SessionStatus {
    session_id: String,
    last_update: String,
    lead_agent: String,
    status: String,
    phase: String,
    mode: String,
    artifact_ref: String,
    next_action: String,
    blockers: String,
    asks_for_zevs: String,
    risk: String,
    expected_wait: String,
    status_path: PathBuf,
    summary_path: PathBuf,
    audit_path: PathBuf,
}

pub fn run(args: DashboardArgs) -> CliResult<()> {
    let timestamp = args
        .timestamp
        .clone()
        .unwrap_or_else(crate::timestamp::now_utc_seconds);
    // Prefer the live DB when one exists (ADR 032 D6): send-only sessions created
    // via an audited send (no status.md) show up in the snapshot — dogfood gap 1.
    // Otherwise fall back to the file-scan for the no-DB case.
    let sessions = match discover_db(&args.root) {
        Some(db) => read_sessions_from_db(&db)?,
        None => read_sessions(&args.root)?,
    };
    let output = args
        .output
        .unwrap_or_else(|| args.root.join("dashboard.md"));
    if let Some(parent) = output.parent() {
        fs::create_dir_all(parent).map_err(|error| {
            CliError::failure(format!("failed to create {}: {error}", parent.display()))
        })?;
    }
    fs::write(&output, render_dashboard(&args.root, &timestamp, &sessions)).map_err(|error| {
        CliError::failure(format!("failed to write {}: {error}", output.display()))
    })?;
    println!("{}", output.display());
    Ok(())
}

fn read_sessions(root: &Path) -> CliResult<Vec<SessionStatus>> {
    let sessions_dir = root.join("sessions");
    if !sessions_dir.exists() {
        return Ok(Vec::new());
    }
    let mut sessions = Vec::new();
    for entry in fs::read_dir(&sessions_dir).map_err(|error| {
        CliError::failure(format!(
            "failed to read {}: {error}",
            sessions_dir.display()
        ))
    })? {
        let entry = entry
            .map_err(|error| CliError::failure(format!("failed to read session entry: {error}")))?;
        let status_path = entry.path().join("status.md");
        if status_path.exists() {
            sessions.push(parse_status_file(root, &status_path)?);
        }
    }
    sessions.sort_by(|left, right| left.session_id.cmp(&right.session_id));
    Ok(sessions)
}

fn discover_db(root: &Path) -> Option<PathBuf> {
    // Default live DB location (ADR 028): <cwd>/.zynk/zynk.db. Also accept one under root.
    [PathBuf::from(".zynk/zynk.db"), root.join(".zynk/zynk.db")]
        .into_iter()
        .find(|candidate| candidate.exists())
}

fn read_sessions_from_db(db: &Path) -> CliResult<Vec<SessionStatus>> {
    let connection = crate::db::open_read_database(db)?;
    // Codex R1 P2: mirror `db serve`'s `load_sessions` latest-status_event
    // derivation. Import is append-only and does not advance the `sessions` row,
    // so an import-only session would otherwise render a stale
    // status/phase/mode/next_action/last_update here (the v0.3.1 fix only covered
    // `db serve`). LEFT JOIN the latest status_event per session and COALESCE the
    // current-state fields from it, keeping lead_agent/artifact_ref from the
    // sessions row.
    let mut stmt = connection
        .prepare(
            "SELECT
                s.session_id,
                COALESCE(s.lead_agent_id, ''),
                COALESCE(se.workflow_status, s.workflow_status),
                COALESCE(se.phase, s.phase),
                COALESCE(se.mode, s.mode),
                COALESCE(s.artifact_ref, ''),
                COALESCE(se.timestamp, s.updated_at),
                COALESCE(se.next_action, 'unknown'),
                COALESCE(se.blockers, 'unknown'),
                COALESCE(se.asks_for_zevs, 'unknown'),
                COALESCE(se.risk_or_residual_uncertainty, 'unknown'),
                COALESCE(se.expected_wait, 'unknown')
             FROM sessions AS s
             LEFT JOIN status_events AS se
               ON se.status_event_id = (
                 SELECT status_event_id
                 FROM status_events
                 WHERE session_id = s.session_id
                 ORDER BY timestamp DESC, status_event_id DESC
                 LIMIT 1
               )
             ORDER BY s.session_id",
        )
        .map_err(|e| CliError::failure(format!("failed to query sessions: {e}")))?;
    let rows = stmt
        .query_map([], |r| {
            Ok(SessionStatus {
                session_id: r.get(0)?,
                lead_agent: r.get(1)?,
                status: r.get(2)?,
                phase: r.get(3)?,
                mode: r.get(4)?,
                artifact_ref: r.get(5)?,
                last_update: r.get(6)?,
                next_action: r.get(7)?,
                blockers: r.get(8)?,
                asks_for_zevs: r.get(9)?,
                risk: r.get(10)?,
                expected_wait: r.get(11)?,
                status_path: PathBuf::new(),
                summary_path: PathBuf::new(),
                audit_path: PathBuf::new(),
            })
        })
        .map_err(|e| CliError::failure(format!("failed to read sessions: {e}")))?;
    let mut sessions = Vec::new();
    for row in rows {
        let mut session = row.map_err(|e| CliError::failure(format!("row: {e}")))?;
        // ADR 033 M2b T3 (C3=b) follow-up: make the snapshot's current-state `mode`
        // latest-writer across BOTH the agent `status_events.mode` (resolved above as
        // COALESCE(se.mode, s.mode)) AND the latest `mode-switch` operator decision's
        // `mode_to` — IDENTICAL in spirit to `db_dashboard::load_sessions`, so the
        // point-in-time `zynk dashboard` snapshot and the live `db serve` console agree
        // on mode. `last_update` (= COALESCE(se.timestamp, s.updated_at)) is the
        // timestamp the status-derived mode was written at; if the latest mode-switch
        // decision is STRICTLY newer, it wins for `mode` ONLY (ties keep the status
        // mode). No other current-state field is touched; no synthetic status_event is
        // invented (provenance stays in the typed `operator_decisions` table).
        if let Some((decision_ts, mode_to)) =
            crate::db::latest_mode_decision(&connection, &session.session_id)?
        {
            if decision_ts.as_str() > session.last_update.as_str() {
                session.mode = mode_to;
            }
        }
        sessions.push(session);
    }
    Ok(sessions)
}

fn parse_status_file(root: &Path, status_path: &Path) -> CliResult<SessionStatus> {
    let content = fs::read_to_string(status_path).map_err(|error| {
        CliError::failure(format!("failed to read {}: {error}", status_path.display()))
    })?;
    let mut values = BTreeMap::new();
    for line in content.lines() {
        let line = line.trim();
        if let Some((key, value)) = line
            .strip_prefix("- ")
            .and_then(|line| line.split_once(": "))
        {
            values.insert(key.to_string(), value.to_string());
        } else if let Some((key, value)) = line.split_once(": ") {
            values.insert(key.to_string(), value.to_string());
        }
    }

    let session_id = values
        .get("session_id")
        .cloned()
        .or_else(|| {
            status_path
                .parent()?
                .file_name()?
                .to_str()
                .map(str::to_string)
        })
        .unwrap_or_else(|| "unknown".to_string());
    let session_dir = root.join("sessions").join(&session_id);
    Ok(SessionStatus {
        session_id,
        last_update: value_or_unknown(&values, "last_update"),
        lead_agent: value_or_unknown(&values, "lead_agent"),
        status: value_or_unknown(&values, "status"),
        phase: value_or_unknown(&values, "phase"),
        mode: value_or_unknown(&values, "mode"),
        artifact_ref: value_or_unknown(&values, "artifact_ref"),
        next_action: value_or_unknown(&values, "next_action"),
        blockers: value_or_unknown(&values, "blockers"),
        asks_for_zevs: value_or_unknown(&values, "asks_for_Zevs"),
        risk: value_or_unknown(&values, "risk_or_residual_uncertainty"),
        expected_wait: value_or_unknown(&values, "expected_wait"),
        status_path: status_path.to_path_buf(),
        summary_path: session_dir.join("summary.md"),
        audit_path: session_dir.join("audit.md"),
    })
}

fn value_or_unknown(values: &BTreeMap<String, String>, key: &str) -> String {
    values
        .get(key)
        .filter(|value| !value.is_empty())
        .cloned()
        .unwrap_or_else(|| "unknown".to_string())
}

fn render_dashboard(root: &Path, timestamp: &str, sessions: &[SessionStatus]) -> String {
    let mut output = String::new();
    output.push_str("# Multi-Agent Collaboration Dashboard (point-in-time snapshot)\n\n");
    output.push_str(
        "> Snapshot generated by `zynk dashboard`; for a live view run `zynk db serve`.\n\n",
    );
    output.push_str(&format!("last_update: {timestamp}\n"));
    output.push_str(&format!("dashboard_scope: {}\n\n", root.display()));

    output.push_str("## Active Sessions\n\n");
    if sessions.is_empty() {
        output.push_str("No active session status files found.\n\n");
    } else {
        output.push_str("| session_id | lead_agent | status | phase | mode | current_artifact | next_action | last_update |\n");
        output.push_str("| --- | --- | --- | --- | --- | --- | --- | --- |\n");
        for session in sessions {
            output.push_str(&format!(
                "| {} | {} | {} | {} | {} | {} | {} | {} |\n",
                cell(&session.session_id),
                cell(&session.lead_agent),
                cell(&session.status),
                cell(&session.phase),
                cell(&session.mode),
                cell(&session.artifact_ref),
                cell(&session.next_action),
                cell(&session.last_update),
            ));
        }
        output.push('\n');
    }

    output.push_str("## Operator Attention\n\n");
    let attention = sessions
        .iter()
        .filter(|session| needs_attention(session))
        .collect::<Vec<_>>();
    if attention.is_empty() {
        output.push_str("- none\n\n");
    } else {
        for session in attention {
            output.push_str(&format!(
                "- `{}`: status={}, ask={}, blockers={}, risk={}, expected_wait={}\n",
                session.session_id,
                session.status,
                session.asks_for_zevs,
                session.blockers,
                session.risk,
                session.expected_wait,
            ));
        }
        output.push('\n');
    }

    output.push_str("## Links\n\n");
    if sessions.is_empty() {
        output.push_str("- none\n");
    } else {
        for session in sessions {
            output.push_str(&format!(
                "- `{}`: [status]({}) | [summary]({}) | [audit]({})\n",
                session.session_id,
                link(root, &session.status_path),
                link(root, &session.summary_path),
                link(root, &session.audit_path),
            ));
        }
    }
    output
}

fn needs_attention(session: &SessionStatus) -> bool {
    let asks = session.asks_for_zevs.to_ascii_lowercase();
    let blockers = session.blockers.to_ascii_lowercase();
    matches!(session.status.as_str(), "blocked" | "waiting-for-operator")
        || !matches!(asks.as_str(), "none" | "unknown")
        || !matches!(blockers.as_str(), "none" | "unknown")
}

fn cell(value: &str) -> String {
    value.replace('|', "\\|")
}

fn link(root: &Path, path: &Path) -> String {
    path.strip_prefix(root)
        .unwrap_or(path)
        .display()
        .to_string()
}