ccd-cli 1.0.0-beta.2

Bootstrap and validate Continuous Context Development repositories
#[cfg(feature = "daemon")]
use std::sync::{Arc, OnceLock};

use anyhow::Result;
use serde::Serialize;
use serde_json::Value;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
#[cfg(feature = "daemon")]
use tokio::sync::broadcast;
use ulid::Ulid;

use crate::runtime_api::{self, RuntimeResponseMetadata};

const FAMILY_STATUS: &str = "runtime.status";
const FAMILY_APPROVAL_REQUIRED: &str = "runtime.approval_required";
const FAMILY_CHECKPOINT: &str = "runtime.checkpoint";
const FAMILY_MEMORY_COMPACTION: &str = "runtime.memory_compaction";

#[cfg(feature = "daemon")]
static SUBSCRIBERS: OnceLock<broadcast::Sender<Arc<DaemonEvent>>> = OnceLock::new();

#[derive(Clone, Debug, Serialize)]
pub(crate) struct DaemonEvent {
    pub(crate) event_id: String,
    pub(crate) family: String,
    pub(crate) kind: String,
    pub(crate) profile: String,
    pub(crate) timestamp: String,
    pub(crate) runtime: RuntimeResponseMetadata,
    pub(crate) payload: Value,
}

#[cfg(feature = "daemon")]
pub(crate) type RuntimeEventSubscription = broadcast::Receiver<Arc<DaemonEvent>>;

struct EventRoute {
    family: &'static str,
    kind: &'static str,
    operation: &'static str,
}

#[cfg(feature = "daemon")]
pub(crate) fn subscribe() -> RuntimeEventSubscription {
    subscribers().subscribe()
}

pub(crate) fn publish_for_mcp(
    tool_name: &str,
    command: &str,
    args: &Value,
    report: &Value,
) -> Result<()> {
    let Some(route) = route_for_mcp(tool_name, command, report) else {
        return Ok(());
    };

    let mut payload = report.clone();
    if let Some(object) = payload.as_object_mut() {
        object.remove("runtime");
    }

    publish(DaemonEvent {
        event_id: Ulid::new().to_string(),
        family: route.family.to_owned(),
        kind: route.kind.to_owned(),
        profile: event_profile(args, report),
        timestamp: OffsetDateTime::now_utc().format(&Rfc3339)?,
        runtime: runtime_metadata(args, route.operation)?,
        payload,
    });
    Ok(())
}

#[cfg(feature = "daemon")]
pub(crate) fn advertised_families() -> Vec<&'static str> {
    vec![
        FAMILY_STATUS,
        FAMILY_APPROVAL_REQUIRED,
        FAMILY_CHECKPOINT,
        FAMILY_MEMORY_COMPACTION,
    ]
}

#[cfg(feature = "daemon")]
fn subscribers() -> &'static broadcast::Sender<Arc<DaemonEvent>> {
    SUBSCRIBERS.get_or_init(|| {
        let (tx, _rx) = broadcast::channel(256);
        tx
    })
}

#[cfg(feature = "daemon")]
fn publish(event: DaemonEvent) {
    let _ = subscribers().send(Arc::new(event));
}

#[cfg(not(feature = "daemon"))]
fn publish(_event: DaemonEvent) {}

fn event_profile(args: &Value, report: &Value) -> String {
    report
        .get("profile")
        .and_then(Value::as_str)
        .or_else(|| args.get("profile").and_then(Value::as_str))
        .unwrap_or("unknown")
        .to_owned()
}

fn runtime_metadata(args: &Value, operation: &str) -> Result<RuntimeResponseMetadata> {
    Ok(match runtime_api::parse_runtime_request(args)? {
        Some(request) => {
            request.response_metadata(operation, runtime_api::runtime_retry_class(operation))
        }
        None => runtime_api::synthetic_response_metadata(operation),
    })
}

fn route_for_mcp(tool_name: &str, command: &str, report: &Value) -> Option<EventRoute> {
    match (tool_name, command) {
        ("ccd_session_lifecycle", "start-session") => Some(EventRoute {
            family: FAMILY_STATUS,
            kind: "start_session",
            operation: "start_session",
        }),
        ("ccd_session_lifecycle", "heartbeat-session") => Some(EventRoute {
            family: FAMILY_STATUS,
            kind: "heartbeat_session",
            operation: "heartbeat_session",
        }),
        ("ccd_session_lifecycle", "clear-session") => Some(EventRoute {
            family: FAMILY_STATUS,
            kind: "clear_session",
            operation: "clear_session",
        }),
        ("ccd_session_lifecycle", "takeover-session") => Some(EventRoute {
            family: FAMILY_STATUS,
            kind: "takeover_session",
            operation: "takeover_session",
        }),
        ("ccd_context", "context-check") => Some(EventRoute {
            family: FAMILY_STATUS,
            kind: runtime_api::EVALUATE_CONTEXT_REFRESH_OPERATION,
            operation: runtime_api::EVALUATE_CONTEXT_REFRESH_OPERATION,
        }),
        ("ccd_state", "checkpoint") => Some(EventRoute {
            family: FAMILY_CHECKPOINT,
            kind: "evaluate_checkpoint",
            operation: "evaluate_checkpoint",
        }),
        ("ccd_state", "radar-state") => Some(EventRoute {
            family: FAMILY_CHECKPOINT,
            kind: "evaluate_closeout",
            operation: "evaluate_closeout",
        }),
        ("ccd_state", "policy-check")
            if report.get("decision").and_then(Value::as_str) != Some("allow") =>
        {
            Some(EventRoute {
                family: FAMILY_APPROVAL_REQUIRED,
                kind: "policy_check",
                operation: "policy_check",
            })
        }
        ("ccd_memory", "memory-compact") if should_emit_compaction(report) => Some(EventRoute {
            family: FAMILY_MEMORY_COMPACTION,
            kind: "compact_memory",
            operation: "compact_memory",
        }),
        _ => None,
    }
}

fn should_emit_compaction(report: &Value) -> bool {
    if report.get("write_result").is_none() {
        return false;
    }

    !matches!(
        report
            .get("staged_write")
            .and_then(|value| value.get("outcome"))
            .and_then(Value::as_str),
        Some("idempotent_noop")
    )
}

#[cfg(all(test, feature = "daemon"))]
mod tests {
    use super::*;
    use serde_json::json;

    #[test]
    fn every_advertised_family_has_a_live_route() {
        let empty = json!({});
        let policy_deny = json!({ "decision": "approval_required" });
        let compaction = json!({
            "write_result": { "status": "written" },
            "staged_write": { "outcome": "applied" }
        });

        let emitted = [
            route_for_mcp("ccd_session_lifecycle", "start-session", &empty)
                .expect("session start route")
                .family,
            route_for_mcp("ccd_state", "checkpoint", &empty)
                .expect("checkpoint route")
                .family,
            route_for_mcp("ccd_state", "policy-check", &policy_deny)
                .expect("policy denial route")
                .family,
            route_for_mcp("ccd_memory", "memory-compact", &compaction)
                .expect("compaction route")
                .family,
        ];

        for family in advertised_families() {
            assert!(
                emitted.contains(&family),
                "advertised family `{family}` has no emitter-backed route"
            );
        }
    }
}