#[cfg(feature = "daemon")]
use std::sync::{Arc, OnceLock};
use anyhow::Result;
use serde::Serialize;
use serde_json::Value;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
#[cfg(feature = "daemon")]
use tokio::sync::broadcast;
use ulid::Ulid;
use crate::runtime_api::{self, RuntimeResponseMetadata};
const FAMILY_STATUS: &str = "runtime.status";
const FAMILY_APPROVAL_REQUIRED: &str = "runtime.approval_required";
const FAMILY_CHECKPOINT: &str = "runtime.checkpoint";
const FAMILY_MEMORY_COMPACTION: &str = "runtime.memory_compaction";
#[cfg(feature = "daemon")]
static SUBSCRIBERS: OnceLock<broadcast::Sender<Arc<DaemonEvent>>> = OnceLock::new();
#[derive(Clone, Debug, Serialize)]
pub(crate) struct DaemonEvent {
pub(crate) event_id: String,
pub(crate) family: String,
pub(crate) kind: String,
pub(crate) profile: String,
pub(crate) timestamp: String,
pub(crate) runtime: RuntimeResponseMetadata,
pub(crate) payload: Value,
}
#[cfg(feature = "daemon")]
pub(crate) type RuntimeEventSubscription = broadcast::Receiver<Arc<DaemonEvent>>;
struct EventRoute {
family: &'static str,
kind: &'static str,
operation: &'static str,
}
#[cfg(feature = "daemon")]
pub(crate) fn subscribe() -> RuntimeEventSubscription {
subscribers().subscribe()
}
pub(crate) fn publish_for_mcp(
tool_name: &str,
command: &str,
args: &Value,
report: &Value,
) -> Result<()> {
let Some(route) = route_for_mcp(tool_name, command, report) else {
return Ok(());
};
let mut payload = report.clone();
if let Some(object) = payload.as_object_mut() {
object.remove("runtime");
}
publish(DaemonEvent {
event_id: Ulid::new().to_string(),
family: route.family.to_owned(),
kind: route.kind.to_owned(),
profile: event_profile(args, report),
timestamp: OffsetDateTime::now_utc().format(&Rfc3339)?,
runtime: runtime_metadata(args, route.operation)?,
payload,
});
Ok(())
}
#[cfg(feature = "daemon")]
pub(crate) fn advertised_families() -> Vec<&'static str> {
vec![
FAMILY_STATUS,
FAMILY_APPROVAL_REQUIRED,
FAMILY_CHECKPOINT,
FAMILY_MEMORY_COMPACTION,
]
}
#[cfg(feature = "daemon")]
fn subscribers() -> &'static broadcast::Sender<Arc<DaemonEvent>> {
SUBSCRIBERS.get_or_init(|| {
let (tx, _rx) = broadcast::channel(256);
tx
})
}
#[cfg(feature = "daemon")]
fn publish(event: DaemonEvent) {
let _ = subscribers().send(Arc::new(event));
}
#[cfg(not(feature = "daemon"))]
fn publish(_event: DaemonEvent) {}
fn event_profile(args: &Value, report: &Value) -> String {
report
.get("profile")
.and_then(Value::as_str)
.or_else(|| args.get("profile").and_then(Value::as_str))
.unwrap_or("unknown")
.to_owned()
}
fn runtime_metadata(args: &Value, operation: &str) -> Result<RuntimeResponseMetadata> {
Ok(match runtime_api::parse_runtime_request(args)? {
Some(request) => {
request.response_metadata(operation, runtime_api::runtime_retry_class(operation))
}
None => runtime_api::synthetic_response_metadata(operation),
})
}
fn route_for_mcp(tool_name: &str, command: &str, report: &Value) -> Option<EventRoute> {
match (tool_name, command) {
("ccd_session_lifecycle", "start-session") => Some(EventRoute {
family: FAMILY_STATUS,
kind: "start_session",
operation: "start_session",
}),
("ccd_session_lifecycle", "heartbeat-session") => Some(EventRoute {
family: FAMILY_STATUS,
kind: "heartbeat_session",
operation: "heartbeat_session",
}),
("ccd_session_lifecycle", "clear-session") => Some(EventRoute {
family: FAMILY_STATUS,
kind: "clear_session",
operation: "clear_session",
}),
("ccd_session_lifecycle", "takeover-session") => Some(EventRoute {
family: FAMILY_STATUS,
kind: "takeover_session",
operation: "takeover_session",
}),
("ccd_context", "context-check") => Some(EventRoute {
family: FAMILY_STATUS,
kind: runtime_api::EVALUATE_CONTEXT_REFRESH_OPERATION,
operation: runtime_api::EVALUATE_CONTEXT_REFRESH_OPERATION,
}),
("ccd_state", "checkpoint") => Some(EventRoute {
family: FAMILY_CHECKPOINT,
kind: "evaluate_checkpoint",
operation: "evaluate_checkpoint",
}),
("ccd_state", "handover") => Some(EventRoute {
family: FAMILY_CHECKPOINT,
kind: "evaluate_closeout",
operation: "evaluate_closeout",
}),
("ccd_state", "policy-check")
if report.get("decision").and_then(Value::as_str) != Some("allow") =>
{
Some(EventRoute {
family: FAMILY_APPROVAL_REQUIRED,
kind: "policy_check",
operation: "policy_check",
})
}
("ccd_memory", "memory-compact") if should_emit_compaction(report) => Some(EventRoute {
family: FAMILY_MEMORY_COMPACTION,
kind: "compact_memory",
operation: "compact_memory",
}),
_ => None,
}
}
fn should_emit_compaction(report: &Value) -> bool {
if report.get("write_result").is_none() {
return false;
}
!matches!(
report
.get("staged_write")
.and_then(|value| value.get("outcome"))
.and_then(Value::as_str),
Some("idempotent_noop")
)
}
#[cfg(all(test, feature = "daemon"))]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn every_advertised_family_has_a_live_route() {
let empty = json!({});
let policy_deny = json!({ "decision": "approval_required" });
let compaction = json!({
"write_result": { "status": "written" },
"staged_write": { "outcome": "applied" }
});
let emitted = [
route_for_mcp("ccd_session_lifecycle", "start-session", &empty)
.expect("session start route")
.family,
route_for_mcp("ccd_state", "checkpoint", &empty)
.expect("checkpoint route")
.family,
route_for_mcp("ccd_state", "policy-check", &policy_deny)
.expect("policy denial route")
.family,
route_for_mcp("ccd_memory", "memory-compact", &compaction)
.expect("compaction route")
.family,
];
for family in advertised_families() {
assert!(
emitted.contains(&family),
"advertised family `{family}` has no emitter-backed route"
);
}
}
}