use std::path::PathBuf;
use anyhow::{bail, Context, Result};
use serde::Serialize;
use serde_json::Value;
use crate::commands::{
attach, check, doctor, drift, gc, hooks, link, policy_check, preflight, scaffold, session_open,
skills, start, sync, unlink,
};
use crate::extensions;
use crate::handoff;
use crate::memory::{candidate, compact, evidence, promote, provider, queue};
use crate::paths;
use crate::profile;
use crate::recovery;
use crate::repo;
use crate::runtime_api;
use crate::runtime_events;
use crate::state::{
compiled, escalation, protected_write, radar, runtime_export, session, session_gates,
};
const MCP_START_COMPACT_FIELDS: &[&str] = &[
"command",
"ok",
"path",
"profile",
"project_id",
"locality_id",
"compact_summary",
"compiled_state",
"alerts",
"refresh_actions",
"warnings",
"session_state",
"escalation",
];
pub fn dispatch(tool_name: &str, args: &Value) -> Result<Value> {
let command = get_required_string(args, "command")?;
let full = get_bool(args, "full");
let report = match (tool_name, command.as_str()) {
("ccd_repo", "attach") => {
let path = resolve_path(args)?;
let report = attach::run(
&path,
get_opt_str(args, "profile").as_deref(),
get_opt_str_any(args, &["project_id", "locality_id"]).as_deref(),
get_opt_str(args, "display_name").as_deref(),
)?;
to_value(&report)
}
("ccd_repo", "scaffold") => {
let path = resolve_path(args)?;
let report = scaffold::run(&path, get_bool(args, "force"))?;
to_value(&report)
}
("ccd_repo", "link") => {
let path = resolve_path(args)?;
let report = link::run(
&path,
get_opt_str(args, "profile").as_deref(),
get_opt_str_any(args, &["project_id", "locality_id"]).as_deref(),
get_opt_str(args, "display_name").as_deref(),
)?;
to_value(&report)
}
("ccd_repo", "unlink") => {
let path = resolve_path(args)?;
let report = unlink::run(&path)?;
to_value(&report)
}
("ccd_repo", "gc") => {
let path = resolve_path(args)?;
let report = gc::run(&path)?;
to_value(&report)
}
("ccd_repo", "skills-install") => {
let path = resolve_path(args)?;
let global = !get_bool(args, "local");
let report = skills::run(&path, global)?;
to_value(&report)
}
("ccd_health", "check") => {
let path = resolve_path(args)?;
let report = check::run(&path)?;
to_value(&report)
}
("ccd_health", "preflight") => {
let path = resolve_path(args)?;
let report = preflight::run(&path, get_opt_str(args, "profile").as_deref())?;
to_value(&report)
}
("ccd_health", "doctor") => {
let path = resolve_path(args)?;
let severity = get_opt_str(args, "severity");
let fields = get_opt_str(args, "fields");
let field_vec: Option<Vec<String>> = fields
.as_ref()
.map(|f| f.split(',').map(|s| s.trim().to_owned()).collect());
if let Some(ref fv) = field_vec {
crate::output::validate_fields(&crate::output::DOCTOR_FIELD_FILTER_SPEC, fv)?;
}
let mut report = doctor::run(&path, None, doctor::RunOptions::default())?;
if let Some(sev) = severity {
report.filter_by_status(&sev);
}
let value = to_value(&report)?;
match &field_vec {
Some(fv) => crate::output::try_filter_json_fields(
value,
fv,
&crate::output::DOCTOR_FIELD_FILTER_SPEC,
),
None => Ok(value),
}
}
("ccd_health", "drift") => {
let path = resolve_path(args)?;
let report = drift::run(&path)?;
to_value(&report)
}
("ccd_health", "sync") => {
let path = resolve_path(args)?;
let check_mode = get_bool(args, "check");
let mut sync_profile = sync::SyncProfile::default();
if paths::git::is_git_work_tree(&path) {
let prof = profile::resolve(get_opt_str(args, "profile").as_deref())?;
let layout = paths::state::StateLayout::resolve(&path, prof)?;
if let Ok(config) = layout.load_profile_config() {
sync_profile.merge_config(&config.sync);
}
}
let report = sync::run(&path, check_mode, &sync_profile)?;
to_value(&report)
}
("ccd_health", "hooks-install") => {
let path = resolve_path(args)?;
let report = hooks::install(&path, get_bool(args, "force"))?;
to_value(&report)
}
("ccd_health", "hooks-check") => {
let path = resolve_path(args)?;
let report = hooks::check(&path)?;
to_value(&report)
}
("ccd_session", "start") => {
let path = resolve_path(args)?;
let profile = get_opt_str(args, "profile");
let refresh = get_bool(args, "refresh");
let fields = get_opt_str(args, "fields");
let field_vec: Option<Vec<String>> = fields
.as_ref()
.map(|f| f.split(',').map(|s| s.trim().to_owned()).collect());
if let Some(ref fv) = field_vec {
crate::output::validate_start_fields(fv)?;
}
match &field_vec {
Some(fv) if !crate::output::needs_source_rendering(fv) => {
let r = start::run_compiled_only(&path, profile.as_deref(), refresh)?;
let value = serde_json::to_value(&r)?;
crate::output::try_filter_json_fields(
value,
fv,
&crate::output::START_FIELD_FILTER_SPEC,
)
}
Some(fv) => {
let r = start::run(&path, profile.as_deref(), refresh)?;
let value = serde_json::to_value(&r)?;
crate::output::try_filter_json_fields(
value,
fv,
&crate::output::START_FIELD_FILTER_SPEC,
)
}
None if full => {
let r = start::run(&path, profile.as_deref(), refresh)?;
to_value(&r)
}
None => {
let r = start::run_compiled_only(&path, profile.as_deref(), refresh)?;
let value = serde_json::to_value(&r)?;
let compact: Vec<String> = MCP_START_COMPACT_FIELDS
.iter()
.map(|s| s.to_string())
.collect();
crate::output::try_filter_json_fields(
value,
&compact,
&crate::output::START_FIELD_FILTER_SPEC,
)
}
}
}
("ccd_session", "session-open") => {
let path = resolve_path(args)?;
let worktree = get_opt_str(args, "worktree").map(PathBuf::from);
let report = session_open::run(
&path,
get_opt_str(args, "profile").as_deref(),
worktree.as_deref(),
get_opt_str(args, "branch").as_deref(),
get_opt_str(args, "from").as_deref(),
get_opt_str(args, "pod").as_deref(),
)?;
to_value(&report)
}
("ccd_session", "session-state-start") => {
let path = resolve_path(args)?;
let mode = get_opt_str(args, "mode")
.map(|value| parse_session_mode(&value))
.transpose()?;
let report = session::start(
&path,
get_opt_str(args, "profile").as_deref(),
None,
session::SessionStartOptions::interactive(mode),
)?;
to_value(&report)
}
("ccd_session", "session-state-clear") => {
let path = resolve_path(args)?;
let report = session::clear(
&path,
get_opt_str(args, "profile").as_deref(),
None,
session::SessionClearOptions::default(),
)?;
to_value(&report)
}
("ccd_session_lifecycle", "start-session") => {
let path = resolve_path(args)?;
let owner = parse_session_owner(args)?;
let mode = get_opt_str(args, "mode")
.map(|value| parse_session_mode(&value))
.transpose()?;
let locality_id = locality_id_for_path(&path);
let report = session::start(
&path,
get_opt_str(args, "profile").as_deref(),
locality_id.as_deref(),
session::SessionStartOptions {
mode,
lifecycle: owner
.lifecycle
.unwrap_or(session::SessionLifecycle::Interactive),
owner_kind: owner.owner_kind,
actor_id: owner.actor_id,
supervisor_id: owner.supervisor_id,
lease_ttl_secs: owner.lease_ttl_secs,
},
)?;
to_value(&report)
}
("ccd_session_lifecycle", "heartbeat-session") => {
let path = resolve_path(args)?;
let owner = parse_session_owner(args)?;
let actor_id = owner.actor_id.ok_or_else(|| {
anyhow::anyhow!("missing required argument: session_owner.actor_id")
})?;
let report = session::heartbeat(
&path,
get_opt_str(args, "profile").as_deref(),
session::SessionHeartbeatOptions {
actor_id,
activity: get_opt_str(args, "activity"),
},
)?;
to_value(&report)
}
("ccd_session_lifecycle", "clear-session") => {
let path = resolve_path(args)?;
let owner = parse_session_owner(args)?;
let locality_id = locality_id_for_path(&path);
let report = session::clear(
&path,
get_opt_str(args, "profile").as_deref(),
locality_id.as_deref(),
session::SessionClearOptions {
actor_id: owner.actor_id,
reason: get_opt_str(args, "reason"),
},
)?;
to_value(&report)
}
("ccd_session_lifecycle", "takeover-session") => {
let path = resolve_path(args)?;
let owner = parse_session_owner(args)?;
let actor_id = owner.actor_id.ok_or_else(|| {
anyhow::anyhow!("missing required argument: session_owner.actor_id")
})?;
let locality_id = locality_id_for_path(&path);
let reason = get_required_string(args, "reason")?;
let report = session::takeover(
&path,
get_opt_str(args, "profile").as_deref(),
locality_id.as_deref(),
session::SessionTakeoverOptions {
actor_id,
supervisor_id: owner.supervisor_id,
reason,
},
)?;
to_value(&report)
}
("ccd_session_gates", "list-gates") => {
let path = resolve_path(args)?;
let report = session_gates::list(&path, get_opt_str(args, "profile").as_deref())?;
to_value(&report)
}
("ccd_session_gates", "replace-gates") => {
let path = resolve_path(args)?;
let report = session_gates::replace(
&path,
get_opt_str(args, "profile").as_deref(),
get_string_list(args, "gates")?,
parse_protected_write(args),
)?;
to_value(&report)
}
("ccd_session_gates", "seed-gates") => {
let path = resolve_path(args)?;
let source = parse_gate_seed_source(&get_required_string(args, "from")?)?;
let report = session_gates::seed(
&path,
get_opt_str(args, "profile").as_deref(),
source,
parse_protected_write(args),
)?;
to_value(&report)
}
("ccd_session_gates", "set-gate-status") => {
let path = resolve_path(args)?;
let index = get_required_usize(args, "index")?;
let status = parse_gate_status(&get_required_string(args, "status")?)?;
let report = session_gates::set_status(
&path,
get_opt_str(args, "profile").as_deref(),
index,
status,
parse_protected_write(args),
)?;
to_value(&report)
}
("ccd_session_gates", "advance-gates") => {
let path = resolve_path(args)?;
let report = session_gates::advance(
&path,
get_opt_str(args, "profile").as_deref(),
parse_protected_write(args),
)?;
to_value(&report)
}
("ccd_session_gates", "clear-gates") => {
let path = resolve_path(args)?;
let report = session_gates::clear(
&path,
get_opt_str(args, "profile").as_deref(),
parse_protected_write(args),
)?;
to_value(&report)
}
("ccd_state", "runtime-state-export") => {
let path = resolve_path(args)?;
let target = get_opt_str(args, "projection_target")
.map(|s| parse_projection_target(&s))
.transpose()?;
let format = get_opt_str(args, "projection_format")
.map(|s| parse_projection_format(&s))
.transpose()?;
let report = runtime_export::run(
&path,
get_opt_str(args, "profile").as_deref(),
target,
format,
)?;
to_value(&report)
}
("ccd_delegation", "child-bootstrap") => {
let path = resolve_path(args)?;
let report =
crate::state::child_bootstrap::run(&path, get_opt_str(args, "profile").as_deref())?;
to_value(&report)
}
("ccd_state", "policy-check") => {
let path = resolve_path(args)?;
let action_family = get_required_string(args, "action_family")?;
let report = policy_check::run(
&path,
get_opt_str(args, "profile").as_deref(),
parse_policy_action_family(&action_family)?,
)?;
to_value(&report)
}
("ccd_state", "escalation-state-list") => {
let path = resolve_path(args)?;
let report = escalation::list(&path, get_opt_str(args, "profile").as_deref())?;
to_value(&report)
}
("ccd_state", "escalation-state-set") => {
let path = resolve_path(args)?;
let kind = get_opt_str(args, "kind")
.map(|value| parse_escalation_kind(&value))
.transpose()?;
let reason = get_required_string(args, "reason")?;
let id = get_opt_str(args, "id").or_else(|| get_runtime_request_idempotency_key(args));
let report = escalation::set(
&path,
get_opt_str(args, "profile").as_deref(),
id.as_deref(),
parse_protected_write(args),
kind,
&reason,
)?;
to_value(&report)
}
("ccd_escalation", "list-escalations") => {
let path = resolve_path(args)?;
let report = escalation::list(&path, get_opt_str(args, "profile").as_deref())?;
to_value(&report)
}
("ccd_escalation", "set-escalation") => {
let path = resolve_path(args)?;
let kind = get_opt_str(args, "kind")
.map(|value| parse_escalation_kind(&value))
.transpose()?;
let reason = get_required_string(args, "reason")?;
let id = get_opt_str(args, "id").or_else(|| get_runtime_request_idempotency_key(args));
let report = escalation::set(
&path,
get_opt_str(args, "profile").as_deref(),
id.as_deref(),
parse_protected_write(args),
kind,
&reason,
)?;
to_value(&report)
}
("ccd_escalation", "clear-escalation") => {
let path = resolve_path(args)?;
let report = escalation::clear(
&path,
get_opt_str(args, "profile").as_deref(),
get_opt_str(args, "id").as_deref(),
parse_protected_write(args),
)?;
to_value(&report)
}
("ccd_recovery", "write-recovery") => {
let path = resolve_path(args)?;
let report = recovery::write_with_input(
&path,
get_opt_str(args, "profile").as_deref(),
parse_recovery_write_input(args)?,
parse_protected_write(args),
)?;
to_value(&report)
}
("ccd_state", "radar-state") => {
let path = resolve_path(args)?;
let report = radar::run(&path, get_opt_str(args, "profile").as_deref(), false)?;
to_value(&report)
}
("ccd_state", "checkpoint") => {
let path = resolve_path(args)?;
let report = radar::run(&path, get_opt_str(args, "profile").as_deref(), true)?;
to_value(&report)
}
("ccd_context", "context-check") => {
let path = resolve_path(args)?;
let trigger = get_opt_str(args, "trigger")
.map(|value| parse_context_check_trigger(&value))
.transpose()?
.unwrap_or(radar::ContextCheckTrigger::Interval);
let report =
radar::run_context_check(&path, get_opt_str(args, "profile").as_deref(), trigger)?;
to_value(&report)
}
("ccd_state", "handoff-refresh") => {
let path = resolve_path(args)?;
let report = handoff::refresh::run(
&path,
get_opt_str(args, "profile").as_deref(),
get_bool(args, "write"),
)?;
to_value(&report)
}
("ccd_memory_capture", "memory-evidence-submit") => {
let path = resolve_path(args)?;
let report = evidence::submit(
&path,
get_opt_str(args, "profile").as_deref(),
parse_memory_evidence_envelope(args)?,
parse_protected_write(args),
&submit_runtime_hints(args),
)?;
to_value(&report)
}
("ccd_memory_capture", "memory-candidate-extract") => {
let path = resolve_path(args)?;
let (evidence_id, limit) = parse_extract_options(args)?;
let report = evidence::extract_candidates(
&path,
get_opt_str(args, "profile").as_deref(),
evidence_id.as_deref(),
limit,
get_bool(args, "write"),
parse_protected_write(args),
)?;
to_value(&report)
}
("ccd_memory", "memory-candidate-admit") => {
let path = resolve_path(args)?;
let entry = get_required_string(args, "entry")?;
let (source_scope, destination) = parse_candidate_route(args)?;
if let Some(idempotency_key) = get_runtime_request_idempotency_key(args) {
if let Some(snapshot) = try_replay_memory_snapshot(
&path,
get_opt_str(args, "profile").as_deref(),
candidate::replay_request(
&entry,
source_scope,
destination,
Some(&idempotency_key),
)?,
)? {
Ok(snapshot)
} else {
let report = candidate::run_admit(
&path,
get_opt_str(args, "profile").as_deref(),
&entry,
source_scope,
destination,
get_bool(args, "write"),
parse_protected_write(args),
Some(&idempotency_key),
)?;
to_value(&report)
}
} else {
let report = candidate::run_admit(
&path,
get_opt_str(args, "profile").as_deref(),
&entry,
source_scope,
destination,
get_bool(args, "write"),
parse_protected_write(args),
None,
)?;
to_value(&report)
}
}
("ccd_memory", "memory-compact") => {
let path = resolve_path(args)?;
let scope = parse_compact_scope(&parse_compact_scope_arg(args)?)?;
let entry = get_opt_str(args, "entry");
let mode = parse_compact_mode(args, entry.as_deref(), get_bool(args, "write"))?;
if let Some(idempotency_key) = get_runtime_request_idempotency_key(args) {
if let Some(snapshot) = try_replay_memory_snapshot(
&path,
get_opt_str(args, "profile").as_deref(),
compact::replay_request(scope, &mode, Some(&idempotency_key))?,
)? {
Ok(snapshot)
} else {
let report = compact::run(
&path,
get_opt_str(args, "profile").as_deref(),
scope,
mode,
get_bool(args, "write"),
parse_protected_write(args),
Some(&idempotency_key),
)?;
to_value(&report)
}
} else {
let report = compact::run(
&path,
get_opt_str(args, "profile").as_deref(),
scope,
mode,
get_bool(args, "write"),
parse_protected_write(args),
None,
)?;
to_value(&report)
}
}
("ccd_memory", "memory-promote") => {
let path = resolve_path(args)?;
let entry = get_required_string(args, "entry")?;
let destination = if let Some(obj) =
args.get("promote_destination").and_then(|v| v.as_object())
{
match obj
.get("type")
.and_then(|v| v.as_str())
.unwrap_or("profile-memory")
{
"work-stream-memory" | "branch-memory" => {
promote::PromoteDestination::BranchMemory
}
"project-memory" | "repo-memory" => promote::PromoteDestination::RepoMemory,
"project-truth" => {
let target_file = obj
.get("target_file")
.and_then(|v| v.as_str())
.map(PathBuf::from)
.ok_or_else(|| {
anyhow::anyhow!(
"`promote_destination.target_file` is required when type is project-truth"
)
})?;
promote::PromoteDestination::ProjectTruth { target_file }
}
_ => promote::PromoteDestination::ProfileMemory,
}
} else {
promote::PromoteDestination::ProfileMemory
};
let nested_source_outcome = args
.get("promote_destination")
.and_then(|value| value.as_object())
.and_then(|obj| obj.get("source_outcome"))
.and_then(Value::as_str)
.map(parse_source_outcome)
.transpose()?;
let top_level_source_outcome = get_opt_str(args, "source_outcome")
.map(|s| parse_source_outcome(&s))
.transpose()?;
if let (Some(nested), Some(top_level)) =
(nested_source_outcome, top_level_source_outcome)
{
if nested != top_level {
bail!(
"conflicting source_outcome values: top-level and promote_destination.source_outcome must match"
);
}
}
let source_outcome = nested_source_outcome.or(top_level_source_outcome);
if let Some(idempotency_key) = get_runtime_request_idempotency_key(args) {
if let Some(snapshot) = try_replay_memory_snapshot(
&path,
get_opt_str(args, "profile").as_deref(),
promote::replay_request(
&entry,
source_outcome,
&destination,
Some(&idempotency_key),
)?,
)? {
Ok(snapshot)
} else {
let report = promote::run(
&path,
get_opt_str(args, "profile").as_deref(),
&entry,
get_bool(args, "write"),
source_outcome,
destination,
parse_protected_write(args),
Some(&idempotency_key),
)?;
to_value(&report)
}
} else {
let report = promote::run(
&path,
get_opt_str(args, "profile").as_deref(),
&entry,
get_bool(args, "write"),
source_outcome,
destination,
parse_protected_write(args),
None,
)?;
to_value(&report)
}
}
("ccd_memory_recall", "memory-search") => {
let path = resolve_path(args)?;
let report = provider::search(
&path,
get_opt_str(args, "profile").as_deref(),
&get_required_string(args, "query")?,
get_opt_usize(args, "limit")?,
)?;
to_value(&report)
}
("ccd_memory_recall", "memory-describe") => {
let path = resolve_path(args)?;
let report = provider::describe(
&path,
get_opt_str(args, "profile").as_deref(),
&get_required_string(args, "native_id")?,
)?;
to_value(&report)
}
("ccd_memory_recall", "memory-expand") => {
let path = resolve_path(args)?;
let report = provider::expand(
&path,
get_opt_str(args, "profile").as_deref(),
&get_required_string(args, "native_id")?,
get_opt_usize(args, "limit")?,
)?;
to_value(&report)
}
("ccd_memory_recall", "memory-sync-status") => {
let path = resolve_path(args)?;
let report = provider::sync_status(&path, get_opt_str(args, "profile").as_deref())?;
to_value(&report)
}
("ccd_memory_recall", "memory-source-map") => {
let path = resolve_path(args)?;
let report = provider::source_map(&path, get_opt_str(args, "profile").as_deref())?;
to_value(&report)
}
(tool, cmd) => {
if let Some(report) = extensions::dispatch_mcp(tool, args) {
report
} else {
bail!("unknown tool/command: {tool}/{cmd}")
}
}
}?;
let mut event_report = report.clone();
attach_runtime_metadata(tool_name, command.as_str(), args, &mut event_report)?;
runtime_events::publish_for_mcp(tool_name, command.as_str(), args, &event_report)?;
let mut report = compact_report(tool_name, command.as_str(), report, full, args);
attach_runtime_metadata(tool_name, command.as_str(), args, &mut report)?;
Ok(report)
}
fn resolve_path(args: &Value) -> Result<PathBuf> {
let raw = args.get("path").and_then(Value::as_str).unwrap_or(".");
paths::cli::resolve(raw.as_ref())
}
fn locality_id_for_path(path: &std::path::Path) -> Option<String> {
repo::marker::load(path)
.ok()
.flatten()
.map(|marker| marker.locality_id)
}
fn get_required_string(args: &Value, key: &str) -> Result<String> {
args.get(key)
.and_then(Value::as_str)
.map(String::from)
.ok_or_else(|| anyhow::anyhow!("missing required argument: {key}"))
}
fn get_opt_str(args: &Value, key: &str) -> Option<String> {
args.get(key).and_then(Value::as_str).map(String::from)
}
fn get_opt_str_any(args: &Value, keys: &[&str]) -> Option<String> {
keys.iter().find_map(|key| get_opt_str(args, key))
}
fn get_bool(args: &Value, key: &str) -> bool {
args.get(key).and_then(Value::as_bool).unwrap_or(false)
}
fn get_opt_u64(args: &Value, key: &str) -> Option<u64> {
args.get(key).and_then(Value::as_u64)
}
fn get_opt_usize(args: &Value, key: &str) -> Result<Option<usize>> {
let Some(value) = args.get(key) else {
return Ok(None);
};
match value {
Value::Null => Ok(None),
Value::Number(_) => {
let raw = value.as_u64().ok_or_else(|| {
anyhow::anyhow!("invalid argument {key}: expected a positive integer")
})?;
usize::try_from(raw)
.map(Some)
.map_err(|_| anyhow::anyhow!("invalid argument {key}: value is too large"))
}
Value::String(raw) => {
let parsed = raw.parse::<u128>().map_err(|_| {
anyhow::anyhow!("invalid argument {key}: expected a positive integer")
})?;
usize::try_from(parsed)
.map(Some)
.map_err(|_| anyhow::anyhow!("invalid argument {key}: value is too large"))
}
_ => bail!("invalid argument {key}: expected a positive integer"),
}
}
fn get_required_usize(args: &Value, key: &str) -> Result<usize> {
if let Some(value) = args.get(key).and_then(Value::as_u64) {
return usize::try_from(value)
.map_err(|_| anyhow::anyhow!("invalid argument {key}: value is too large"));
}
let raw = get_required_string(args, key)?;
raw.parse::<usize>()
.map_err(|_| anyhow::anyhow!("invalid argument {key}: expected a positive integer"))
}
fn get_string_list(args: &Value, key: &str) -> Result<Vec<String>> {
if let Some(values) = args.get(key).and_then(Value::as_array) {
return values
.iter()
.map(|value| {
value.as_str().map(str::to_owned).ok_or_else(|| {
anyhow::anyhow!("invalid argument {key}: array entries must be strings")
})
})
.collect();
}
Ok(vec![get_required_string(args, key)?])
}
#[derive(Default)]
struct McpSessionOwnerArgs {
lifecycle: Option<session::SessionLifecycle>,
owner_kind: Option<session::SessionOwnerKind>,
actor_id: Option<String>,
supervisor_id: Option<String>,
lease_ttl_secs: Option<u64>,
}
fn parse_session_owner(args: &Value) -> Result<McpSessionOwnerArgs> {
let owner = args.get("session_owner").and_then(Value::as_object);
let lifecycle = owner
.and_then(|obj| obj.get("lifecycle"))
.and_then(Value::as_str)
.or_else(|| args.get("lifecycle").and_then(Value::as_str))
.map(parse_session_lifecycle)
.transpose()?;
let owner_kind = owner
.and_then(|obj| obj.get("owner_kind"))
.and_then(Value::as_str)
.or_else(|| args.get("owner_kind").and_then(Value::as_str))
.map(parse_session_owner_kind)
.transpose()?;
let actor_id = owner
.and_then(|obj| obj.get("actor_id"))
.and_then(Value::as_str)
.map(String::from)
.or_else(|| get_opt_str(args, "actor_id"));
let supervisor_id = owner
.and_then(|obj| obj.get("supervisor_id"))
.and_then(Value::as_str)
.map(String::from)
.or_else(|| get_opt_str(args, "supervisor_id"));
let lease_ttl_secs = owner
.and_then(|obj| obj.get("lease_seconds"))
.and_then(Value::as_u64)
.or_else(|| get_opt_u64(args, "lease_seconds"));
Ok(McpSessionOwnerArgs {
lifecycle,
owner_kind,
actor_id,
supervisor_id,
lease_ttl_secs,
})
}
fn parse_protected_write(args: &Value) -> protected_write::ExclusiveWriteOptions {
let protected = args.get("protected_write").and_then(Value::as_object);
protected_write::ExclusiveWriteOptions {
actor_id: protected
.and_then(|obj| obj.get("actor_id"))
.and_then(Value::as_str)
.map(str::to_owned)
.or_else(|| get_opt_str(args, "actor_id")),
session_id: protected
.and_then(|obj| obj.get("session_id"))
.and_then(Value::as_str)
.map(str::to_owned)
.or_else(|| get_opt_str(args, "session_id")),
expected_revision: protected
.and_then(|obj| obj.get("expected_revision"))
.and_then(Value::as_u64)
.or_else(|| get_opt_u64(args, "expected_revision")),
}
}
fn try_replay_memory_snapshot(
repo_root: &std::path::Path,
explicit_profile: Option<&str>,
replay_request: (String, String),
) -> Result<Option<Value>> {
let profile = profile::resolve(explicit_profile)?;
let layout = crate::paths::state::StateLayout::resolve(repo_root, profile)?;
let Some(snapshot) =
queue::load_replayed_snapshot(&layout, &replay_request.1, &replay_request.0)?
else {
return Ok(None);
};
Ok(Some(mark_memory_snapshot_as_replay(snapshot)))
}
fn mark_memory_snapshot_as_replay(mut snapshot: Value) -> Value {
let Some(object) = snapshot.as_object_mut() else {
return snapshot;
};
object.insert("ok".to_owned(), Value::Bool(true));
object.remove("message");
if let Some(staged_write) = object
.get_mut("staged_write")
.and_then(Value::as_object_mut)
{
staged_write.insert(
"outcome".to_owned(),
Value::String("idempotent_noop".to_owned()),
);
staged_write.remove("message");
}
snapshot
}
fn to_value(report: &impl Serialize) -> Result<Value> {
Ok(serde_json::to_value(report)?)
}
fn attach_runtime_metadata(
tool_name: &str,
command: &str,
args: &Value,
report: &mut Value,
) -> Result<()> {
let Some(operation) = runtime_operation_name(tool_name, command) else {
return Ok(());
};
let Some(request) = runtime_api::parse_runtime_request(args)? else {
return Ok(());
};
let Some(object) = report.as_object_mut() else {
return Ok(());
};
object.insert(
"runtime".to_owned(),
serde_json::to_value(
request.response_metadata(operation, runtime_api::runtime_retry_class(operation)),
)?,
);
Ok(())
}
fn get_runtime_request_idempotency_key(args: &Value) -> Option<String> {
args.get("runtime_request")
.and_then(Value::as_object)
.and_then(|request| request.get("idempotency_key"))
.and_then(Value::as_str)
.map(str::to_owned)
}
fn runtime_operation_name(tool_name: &str, command: &str) -> Option<&'static str> {
match (tool_name, command) {
("ccd_session", "start") => Some("load_startup_context"),
("ccd_session_lifecycle", "start-session") => Some("start_session"),
("ccd_session_lifecycle", "heartbeat-session") => Some("heartbeat_session"),
("ccd_session_lifecycle", "clear-session") => Some("clear_session"),
("ccd_session_lifecycle", "takeover-session") => Some("takeover_session"),
("ccd_state", "radar-state") => Some("evaluate_closeout"),
("ccd_state", "checkpoint") => Some("evaluate_checkpoint"),
("ccd_state", "policy-check") => Some("policy_check"),
("ccd_session_gates", "list-gates") => Some("list_execution_gates"),
("ccd_session_gates", "replace-gates") => Some("replace_execution_gates"),
("ccd_session_gates", "seed-gates") => Some("seed_execution_gates"),
("ccd_session_gates", "set-gate-status") => Some("set_execution_gate_status"),
("ccd_session_gates", "advance-gates") => Some("advance_execution_gates"),
("ccd_session_gates", "clear-gates") => Some("clear_execution_gates"),
("ccd_escalation", "list-escalations") => Some("list_escalations"),
("ccd_escalation", "set-escalation") => Some("set_escalation"),
("ccd_escalation", "clear-escalation") => Some("clear_escalation"),
("ccd_recovery", "write-recovery") => Some("write_recovery"),
("ccd_memory_capture", "memory-evidence-submit") => Some("submit_memory_evidence"),
("ccd_memory_capture", "memory-candidate-extract") => Some("extract_memory_candidates"),
("ccd_memory", "memory-candidate-admit") => Some("admit_memory_candidate"),
("ccd_memory", "memory-promote") => Some("promote_memory"),
("ccd_memory", "memory-compact") => Some("compact_memory"),
("ccd_memory_recall", "memory-search") => Some("search_memory_recall"),
("ccd_memory_recall", "memory-describe") => Some("describe_memory_recall"),
("ccd_memory_recall", "memory-expand") => Some("expand_memory_recall"),
("ccd_memory_recall", "memory-sync-status") => Some("probe_ingest_sync_status"),
("ccd_memory_recall", "memory-source-map") => Some("export_ingest_source_map"),
("ccd_context", "context-check") => Some("evaluate_context_refresh"),
_ => None,
}
}
fn parse_memory_evidence_envelope(args: &Value) -> Result<evidence::MemoryEvidenceEnvelope> {
let envelope = args
.get("evidence_envelope")
.ok_or_else(|| anyhow::anyhow!("missing required argument: evidence_envelope"))?;
let envelope = envelope
.as_object()
.ok_or_else(|| anyhow::anyhow!("invalid argument evidence_envelope: expected an object"))?;
Ok(evidence::MemoryEvidenceEnvelope {
scope: envelope
.get("scope")
.and_then(Value::as_str)
.ok_or_else(|| anyhow::anyhow!("missing required argument: evidence_envelope.scope"))?
.to_owned(),
entry_type: envelope
.get("type")
.and_then(Value::as_str)
.ok_or_else(|| anyhow::anyhow!("missing required argument: evidence_envelope.type"))?
.to_owned(),
source_kind: envelope
.get("source_kind")
.and_then(Value::as_str)
.ok_or_else(|| {
anyhow::anyhow!("missing required argument: evidence_envelope.source_kind")
})?
.to_owned(),
summary: envelope
.get("summary")
.and_then(Value::as_str)
.ok_or_else(|| anyhow::anyhow!("missing required argument: evidence_envelope.summary"))?
.to_owned(),
source_ref: envelope
.get("source_ref")
.and_then(Value::as_str)
.map(str::to_owned),
host_reference: envelope.get("host").and_then(Value::as_str).map(|host| {
evidence::MemoryEvidenceHostReference {
host: host.to_owned(),
hook: envelope
.get("host_hook")
.and_then(Value::as_str)
.map(str::to_owned),
session_id: None,
run_id: None,
task_id: None,
}
}),
provider_reference: envelope
.get("provider")
.and_then(Value::as_str)
.map(|provider| evidence::MemoryEvidenceProviderReference {
provider: provider.to_owned(),
native_id: envelope
.get("provider_ref")
.and_then(Value::as_str)
.map(str::to_owned),
}),
})
}
fn parse_extract_options(args: &Value) -> Result<(Option<String>, usize)> {
let Some(options) = args.get("extract_options") else {
return Ok((None, 8));
};
let options = options
.as_object()
.ok_or_else(|| anyhow::anyhow!("invalid argument extract_options: expected an object"))?;
let evidence_id = options
.get("evidence_id")
.and_then(Value::as_str)
.map(str::to_owned);
let limit = options.get("limit").and_then(Value::as_u64).unwrap_or(8) as usize;
Ok((evidence_id, limit.max(1)))
}
fn submit_runtime_hints(args: &Value) -> evidence::SubmitRuntimeHints {
evidence::SubmitRuntimeHints {
host_session_id: runtime_request_string(args, "host_session_id"),
host_run_id: runtime_request_string(args, "host_run_id"),
host_task_id: runtime_request_string(args, "host_task_id"),
}
}
fn runtime_request_string(args: &Value, key: &str) -> Option<String> {
args.get("runtime_request")
.and_then(Value::as_object)
.and_then(|request| request.get(key))
.and_then(Value::as_str)
.map(str::to_owned)
}
fn parse_recovery_write_input(args: &Value) -> Result<recovery::RecoveryWriteInput> {
let checkpoint = args
.get("checkpoint")
.map(|checkpoint| {
serde_json::from_value::<recovery::CheckpointInput>(checkpoint.clone())
.context("invalid argument checkpoint")
})
.transpose()?;
let working_buffer = args
.get("working_buffer")
.map(|working_buffer| {
serde_json::from_value::<recovery::WorkingBufferInput>(working_buffer.clone())
.context("invalid argument working_buffer")
})
.transpose()?;
Ok(recovery::RecoveryWriteInput {
origin: get_opt_str(args, "origin"),
checkpoint,
working_buffer,
})
}
fn compact_report(
tool_name: &str,
command: &str,
mut report: Value,
full: bool,
args: &Value,
) -> Value {
if full {
return report;
}
match (tool_name, command) {
("ccd_health", "doctor") => compact_doctor_report(&mut report),
("ccd_state", "runtime-state-export") => {
let has_projection_args =
args.get("projection_target").is_some() || args.get("projection_format").is_some();
if !has_projection_args {
compact_runtime_state_export(&mut report);
}
}
_ => {}
}
report
}
fn compact_doctor_report(report: &mut Value) {
let Some(checks) = report.get_mut("checks").and_then(Value::as_array_mut) else {
return;
};
checks.retain(|check| check.get("status").and_then(Value::as_str) != Some("pass"));
}
fn compact_runtime_state_export(report: &mut Value) {
let Some(export) = report.get_mut("export").and_then(Value::as_object_mut) else {
return;
};
export.remove("projection");
export.remove("symbolic_projection");
export.remove("bundle_projection");
export.remove("state");
}
fn parse_projection_target(s: &str) -> Result<compiled::ProjectionTarget> {
match s {
"default" => Ok(compiled::ProjectionTarget::Default),
"planning" => Ok(compiled::ProjectionTarget::Planning),
"session" => Ok(compiled::ProjectionTarget::Session),
other => {
bail!("invalid projection_target: {other}; expected default, planning, or session")
}
}
}
fn parse_context_check_trigger(s: &str) -> Result<radar::ContextCheckTrigger> {
match s {
"interval" => Ok(radar::ContextCheckTrigger::Interval),
"manual" => Ok(radar::ContextCheckTrigger::Manual),
"pre-compaction" | "pre_compaction" => Ok(radar::ContextCheckTrigger::PreCompaction),
"idle-reset" | "idle_reset" => Ok(radar::ContextCheckTrigger::IdleReset),
"resume" => Ok(radar::ContextCheckTrigger::Resume),
"supervisor-poll" | "supervisor_poll" => Ok(radar::ContextCheckTrigger::SupervisorPoll),
_ => bail!("invalid trigger `{s}`: expected interval, manual, pre-compaction, idle-reset, resume, or supervisor-poll"),
}
}
fn parse_projection_format(s: &str) -> Result<compiled::ProjectionFormat> {
match s {
"narrative" => Ok(compiled::ProjectionFormat::Narrative),
"symbolic" => Ok(compiled::ProjectionFormat::Symbolic),
"bundle" => Ok(compiled::ProjectionFormat::Bundle),
other => {
bail!("invalid projection_format: {other}; expected narrative, symbolic, or bundle")
}
}
}
fn parse_session_mode(s: &str) -> Result<session::SessionMode> {
session::SessionMode::from_str(s).ok_or_else(|| {
anyhow::anyhow!("invalid mode: {s}; expected general, research, or implement")
})
}
fn parse_session_lifecycle(s: &str) -> Result<session::SessionLifecycle> {
match s {
"interactive" => Ok(session::SessionLifecycle::Interactive),
"autonomous" => Ok(session::SessionLifecycle::Autonomous),
other => bail!("invalid lifecycle: {other}; expected interactive or autonomous"),
}
}
fn parse_session_owner_kind(s: &str) -> Result<session::SessionOwnerKind> {
match s {
"runtime_worker" => Ok(session::SessionOwnerKind::RuntimeWorker),
"runtime_supervisor" => Ok(session::SessionOwnerKind::RuntimeSupervisor),
other => {
bail!("invalid owner_kind: {other}; expected runtime_worker or runtime_supervisor")
}
}
}
fn parse_escalation_kind(s: &str) -> Result<escalation::EscalationKind> {
match s {
"blocking" => Ok(escalation::EscalationKind::Blocking),
"non-blocking" | "non_blocking" => Ok(escalation::EscalationKind::NonBlocking),
other => bail!("invalid kind: {other}; expected blocking or non-blocking"),
}
}
fn parse_gate_seed_source(s: &str) -> Result<session_gates::GateSeedSource> {
match s {
"immediate-actions" | "immediate_actions" => {
Ok(session_gates::GateSeedSource::ImmediateActions)
}
"definition-of-done" | "definition_of_done" => {
Ok(session_gates::GateSeedSource::DefinitionOfDone)
}
other => bail!("invalid from: {other}; expected immediate-actions or definition-of-done"),
}
}
fn parse_gate_status(s: &str) -> Result<session_gates::ExecutionGateStatus> {
match s {
"open" => Ok(session_gates::ExecutionGateStatus::Open),
"done" => Ok(session_gates::ExecutionGateStatus::Done),
"blocked" => Ok(session_gates::ExecutionGateStatus::Blocked),
other => bail!("invalid status: {other}; expected open, done, or blocked"),
}
}
fn parse_policy_action_family(
s: &str,
) -> Result<crate::state::policy_projection::PolicyActionFamily> {
use crate::state::policy_projection::PolicyActionFamily;
match s {
"read_only" => Ok(PolicyActionFamily::ReadOnly),
"local_execution" => Ok(PolicyActionFamily::LocalExecution),
"local_isolation" => Ok(PolicyActionFamily::LocalIsolation),
"local_finalize" => Ok(PolicyActionFamily::LocalFinalize),
"publish_external" => Ok(PolicyActionFamily::PublishExternal),
"shared_queue_mutation" => Ok(PolicyActionFamily::SharedQueueMutation),
"durable_memory_mutation" => Ok(PolicyActionFamily::DurableMemoryMutation),
"repair_destructive" => Ok(PolicyActionFamily::RepairDestructive),
"protected_surface_mutation" => Ok(PolicyActionFamily::ProtectedSurfaceMutation),
other => bail!(
"invalid action_family: {other}; expected read_only, local_execution, local_isolation, local_finalize, publish_external, shared_queue_mutation, durable_memory_mutation, repair_destructive, or protected_surface_mutation"
),
}
}
fn parse_compact_scope(s: &str) -> Result<compact::CompactScope> {
match s {
"profile" => Ok(compact::CompactScope::Profile),
"project" | "repo" => Ok(compact::CompactScope::Repo),
"work-stream" | "work_stream" | "branch" => Ok(compact::CompactScope::Branch),
"workspace" | "clone" => Ok(compact::CompactScope::Clone),
other => {
bail!("invalid scope: {other}; expected profile, project, work-stream, or workspace")
}
}
}
fn parse_compact_scope_arg(args: &Value) -> Result<String> {
args.get("compact_options")
.and_then(Value::as_object)
.and_then(|obj| obj.get("scope"))
.and_then(Value::as_str)
.map(str::to_owned)
.or_else(|| get_opt_str(args, "scope"))
.ok_or_else(|| anyhow::anyhow!("missing required argument: compact_options.scope"))
}
fn parse_compact_mode<'a>(
args: &'a Value,
entry_id: Option<&'a str>,
write: bool,
) -> Result<compact::CompactMode<'a>> {
let compact_options = args.get("compact_options").and_then(Value::as_object);
let keep_id = compact_options
.and_then(|obj| obj.get("keep"))
.and_then(Value::as_str)
.or_else(|| args.get("keep").and_then(Value::as_str));
let decay_class = compact_options
.and_then(|obj| obj.get("decay_class"))
.and_then(Value::as_str)
.or_else(|| args.get("decay_class").and_then(Value::as_str))
.map(parse_decay_class)
.transpose()?;
let review = compact_options
.and_then(|obj| obj.get("review"))
.and_then(Value::as_str)
.or_else(|| args.get("review").and_then(Value::as_str))
.map(parse_compact_review_kind)
.transpose()?;
let remove = compact_options
.and_then(|obj| obj.get("remove"))
.and_then(Value::as_bool)
.unwrap_or_else(|| get_bool(args, "remove"));
compact::parse_mode(compact::CompactModeRequest {
entry_id,
keep_id,
decay_class,
review,
remove,
write,
})
}
fn parse_decay_class(s: &str) -> Result<compact::DecayClass> {
match s {
"permanent" => Ok(compact::DecayClass::Permanent),
"stable" => Ok(compact::DecayClass::Stable),
"active" => Ok(compact::DecayClass::Active),
other => bail!("invalid decay_class: {other}; expected permanent, stable, or active"),
}
}
fn parse_compact_review_kind(s: &str) -> Result<compact::CompactReviewKind> {
match s {
"expired" => Ok(compact::CompactReviewKind::Expired),
"superseded" => Ok(compact::CompactReviewKind::Superseded),
"promotion-candidate" => Ok(compact::CompactReviewKind::PromotionCandidate),
other => {
bail!("invalid review: {other}; expected expired, superseded, or promotion-candidate")
}
}
}
fn parse_source_outcome(s: &str) -> Result<promote::SourceOutcome> {
match s {
"active" => Ok(promote::SourceOutcome::Active),
"superseded" => Ok(promote::SourceOutcome::Superseded),
"link-only" => Ok(promote::SourceOutcome::LinkOnly),
other => {
bail!("invalid source_outcome: {other}; expected active, superseded, or link-only")
}
}
}
fn parse_candidate_route(
args: &Value,
) -> Result<(
candidate::CandidateSourceScope,
candidate::CandidateDestination,
)> {
let route = args.get("candidate_route").and_then(Value::as_object);
let source_scope = route
.and_then(|obj| obj.get("source_scope"))
.and_then(Value::as_str)
.or_else(|| args.get("source_scope").and_then(Value::as_str))
.ok_or_else(|| {
anyhow::anyhow!("missing required argument: candidate_route.source_scope")
})?;
let destination = route
.and_then(|obj| obj.get("destination"))
.and_then(Value::as_str)
.or_else(|| args.get("destination").and_then(Value::as_str))
.ok_or_else(|| anyhow::anyhow!("missing required argument: candidate_route.destination"))?;
Ok((
parse_candidate_source_scope(source_scope)?,
parse_candidate_destination(destination)?,
))
}
fn parse_candidate_source_scope(s: &str) -> Result<candidate::CandidateSourceScope> {
match s {
"clone-memory" | "clone_memory" => Ok(candidate::CandidateSourceScope::Clone),
"branch-memory" | "branch_memory" => Ok(candidate::CandidateSourceScope::Branch),
"pod-memory" | "pod_memory" => Ok(candidate::CandidateSourceScope::Pod),
other => bail!(
"invalid source_scope: {other}; expected clone-memory, branch-memory, or pod-memory"
),
}
}
fn parse_candidate_destination(s: &str) -> Result<candidate::CandidateDestination> {
match s {
"branch-memory" | "branch_memory" => Ok(candidate::CandidateDestination::Branch),
"repo-memory" | "repo_memory" => Ok(candidate::CandidateDestination::Repo),
other => bail!("invalid destination: {other}; expected branch-memory or repo-memory"),
}
}