nebu-ctx 0.8.2

NebuCtx runtime for the nebu-ctx self-hosted client/server product.
Documentation
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncOutboxStatus {
    pub queued: usize,
    pub failed: usize,
    pub telemetry: usize,
    pub server_tool_calls: usize,
    pub code_index_syncs: usize,
    pub path: String,
    pub readable: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncReport {
    pub schema_version: u32,
    pub generated_at: DateTime<Utc>,
    pub action: String,
    pub before: SyncOutboxStatus,
    pub after: Option<SyncOutboxStatus>,
    pub attempted: usize,
    pub warnings: Vec<String>,
}

pub fn run_cli(args: &[String]) -> i32 {
    let json = args.iter().any(|arg| arg == "--json");
    let help = args.iter().any(|arg| arg == "--help" || arg == "-h");
    let action = args
        .iter()
        .find(|arg| !arg.starts_with('-'))
        .map(|arg| arg.as_str())
        .unwrap_or("status");

    if help {
        println!("Usage:");
        println!("  nebu-ctx sync status [--json]");
        println!("  nebu-ctx sync flush [--json]");
        return 0;
    }

    match action {
        "status" => print_report(build_status_report("status", 0, None), json),
        "flush" | "replay" => {
            let before = build_outbox_status();
            let attempted = crate::core::telemetry_queue::flush_pending();
            let after = build_outbox_status();
            print_report(build_status_report("flush", attempted, Some((before, after))), json)
        }
        _ => {
            eprintln!("Unknown sync action: {action}");
            eprintln!("Usage: nebu-ctx sync status|flush [--json]");
            2
        }
    }
}

pub fn build_outbox_status() -> SyncOutboxStatus {
    let path = outbox_path();
    match crate::core::sync_outbox::load_entries() {
        Ok(entries) => SyncOutboxStatus {
            queued: entries.len(),
            failed: entries.iter().filter(|entry| entry.last_error.is_some()).count(),
            telemetry: entries
                .iter()
                .filter(|entry| entry.kind == crate::core::sync_outbox::OutboxOperationKind::TelemetryIngest)
                .count(),
            server_tool_calls: entries
                .iter()
                .filter(|entry| entry.kind == crate::core::sync_outbox::OutboxOperationKind::ServerToolCall)
                .count(),
            code_index_syncs: entries
                .iter()
                .filter(|entry| entry.kind == crate::core::sync_outbox::OutboxOperationKind::CodeIndexSync)
                .count(),
            path: path.to_string_lossy().to_string(),
            readable: true,
        },
        Err(_) => SyncOutboxStatus {
            queued: 0,
            failed: 0,
            telemetry: 0,
            server_tool_calls: 0,
            code_index_syncs: 0,
            path: path.to_string_lossy().to_string(),
            readable: false,
        },
    }
}

fn build_status_report(
    action: &str,
    attempted: usize,
    flush_statuses: Option<(SyncOutboxStatus, SyncOutboxStatus)>,
) -> SyncReport {
    let before = flush_statuses
        .as_ref()
        .map(|(before, _)| before.clone())
        .unwrap_or_else(build_outbox_status);
    let after = flush_statuses.map(|(_, after)| after);
    let mut warnings = Vec::new();
    if !before.readable || after.as_ref().is_some_and(|status| !status.readable) {
        warnings.push("sync outbox is not readable".to_string());
    }

    SyncReport {
        schema_version: 1,
        generated_at: Utc::now(),
        action: action.to_string(),
        before,
        after,
        attempted,
        warnings,
    }
}

fn print_report(report: SyncReport, json: bool) -> i32 {
    if json {
        println!("{}", serde_json::to_string_pretty(&report).unwrap_or_else(|_| "{}".to_string()));
    } else {
        print_human(&report);
    }

    if report.before.readable && report.after.as_ref().is_none_or(|status| status.readable) {
        0
    } else {
        1
    }
}

fn print_human(report: &SyncReport) {
    println!("nebu-ctx sync {}", report.action);
    println!(
        "  before: {} queued, {} failed ({} telemetry, {} server calls, {} index syncs)",
        report.before.queued,
        report.before.failed,
        report.before.telemetry,
        report.before.server_tool_calls,
        report.before.code_index_syncs
    );
    if let Some(after) = &report.after {
        println!(
            "  after:  {} queued, {} failed (attempted {})",
            after.queued, after.failed, report.attempted
        );
    }
    println!("  outbox: {}", report.before.path);
}

fn outbox_path() -> std::path::PathBuf {
    crate::core::data_dir::nebu_ctx_data_dir()
        .map(|dir| dir.join("sync/outbox"))
        .unwrap_or_else(|_| std::path::PathBuf::from("sync/outbox"))
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn build_outbox_status_counts_kinds() {
        let _lock = crate::core::data_dir::test_env_lock();
        let tmp = tempfile::tempdir().unwrap();
        std::env::set_var("NEBU_CTX_DATA_DIR", tmp.path());

        crate::core::sync_outbox::enqueue(
            crate::core::sync_outbox::OutboxOperationKind::TelemetryIngest,
            serde_json::json!({"tool_name":"ctx_read"}),
        )
        .unwrap();
        crate::core::sync_outbox::enqueue(
            crate::core::sync_outbox::OutboxOperationKind::ServerToolCall,
            serde_json::json!({"tool_name":"ctx_brain"}),
        )
        .unwrap();
        crate::core::sync_outbox::enqueue(
            crate::core::sync_outbox::OutboxOperationKind::CodeIndexSync,
            serde_json::json!({"project_id":"proj_test"}),
        )
        .unwrap();

        let status = build_outbox_status();
        assert!(status.readable);
        assert_eq!(status.queued, 3);
        assert_eq!(status.telemetry, 1);
        assert_eq!(status.server_tool_calls, 1);
        assert_eq!(status.code_index_syncs, 1);
    }
}