use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use uuid::Uuid;
use crate::graph::edges::{Edge, EdgeKind};
use crate::health::quality;
use crate::mcp::protocol::{self, AuditEntry, ErrorCode};
use crate::store::db::KnowledgeWriteOp;
use crate::store::record::{
Category, ConfidenceScore, FileRecord, GotchaRecord, Priority as StorePriority, QualityScore,
Record, RecordLifecycle, RecordSource, RecordVersion, StalenessScore, TombstoneReason,
};
use crate::store::Store;
use super::dispatch_v2::RequestContext;
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
fn audit_nanos_key(prefix: &str) -> String {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
format!("{prefix}{nanos}")
}
const AUDIT_KNOWLEDGE_PREFIX: &str = "audit:knowledge:";
pub(crate) const AUDIT_SESSION_PREFIX: &str = "audit:session:";
fn map_priority(p: &protocol::Priority) -> StorePriority {
match p {
protocol::Priority::Critical => StorePriority::Critical,
protocol::Priority::High => StorePriority::High,
protocol::Priority::Normal => StorePriority::Normal,
protocol::Priority::Low => StorePriority::Low,
}
}
fn map_severity(s: &protocol::Severity) -> StorePriority {
match s {
protocol::Severity::Critical => StorePriority::Critical,
protocol::Severity::High => StorePriority::High,
protocol::Severity::Normal => StorePriority::Normal,
protocol::Severity::Low => StorePriority::Low,
}
}
pub(crate) fn make_audit_with_prefix(
ctx: &RequestContext,
request_id: Uuid,
command_kind: &str,
target_key: &str,
accepted: bool,
error_code: Option<ErrorCode>,
prefix: &str,
) -> Option<(String, Vec<u8>)> {
let entry = AuditEntry {
ts: now_secs(),
peer_uid: ctx.peer.uid,
peer_pid: ctx.peer.pid,
daemon_session: ctx.daemon_session,
request_id,
command_kind: command_kind.to_string(),
target_key: target_key.to_string(),
accepted,
error_code,
};
match rmp_serde::to_vec_named(&entry) {
Ok(bytes) => Some((audit_nanos_key(prefix), bytes)),
Err(e) => {
tracing::error!("audit serialization failed — this is a bug, audit entry skipped: {e}");
None
}
}
}
pub(crate) fn make_audit(
ctx: &RequestContext,
request_id: Uuid,
command_kind: &str,
target_key: &str,
accepted: bool,
error_code: Option<ErrorCode>,
) -> Option<(String, Vec<u8>)> {
make_audit_with_prefix(
ctx,
request_id,
command_kind,
target_key,
accepted,
error_code,
AUDIT_KNOWLEDGE_PREFIX,
)
}
pub(crate) fn make_session_audit(
ctx: &RequestContext,
request_id: Uuid,
command_kind: &str,
target_key: &str,
accepted: bool,
error_code: Option<ErrorCode>,
) -> Option<(String, Vec<u8>)> {
make_audit_with_prefix(
ctx,
request_id,
command_kind,
target_key,
accepted,
error_code,
AUDIT_SESSION_PREFIX,
)
}
type HandlerResult = std::result::Result<serde_json::Value, (ErrorCode, String)>;
const WRITE_CONFLICT_RETRIES: usize = 4;
async fn retry_on_write_conflict<T, F, Fut>(mut op: F) -> Result<T, (ErrorCode, String)>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, (ErrorCode, String)>>,
{
for attempt in 0..WRITE_CONFLICT_RETRIES {
match op().await {
Ok(value) => return Ok(value),
Err((_, ref msg))
if attempt + 1 < WRITE_CONFLICT_RETRIES
&& msg.to_lowercase().contains("write conflict") =>
{
tokio::time::sleep(std::time::Duration::from_millis(5u64 << attempt)).await;
}
Err(e) => return Err(e),
}
}
unreachable!("retry_on_write_conflict loop always returns within the body")
}
pub(crate) async fn handle_gotcha_upsert(
store: &Store,
ctx: &RequestContext,
request_id: Uuid,
input: &protocol::GotchaDraftInput,
) -> HandlerResult {
let now = now_secs();
let key = &input.key;
if !key.starts_with("gotcha:") {
return Err((
ErrorCode::ValidationFailed,
"key must start with gotcha:".into(),
));
}
if input.rule.is_empty() {
return Err((ErrorCode::ValidationFailed, "rule must not be empty".into()));
}
let (record, is_new, old_affected_files) =
retry_on_write_conflict(|| upsert_commit_once(store, ctx, request_id, input, now)).await?;
let quality_val = record.quality.value;
let tier_label = format!("{:?}", record.quality.tier);
sync_has_gotcha_edges(store, key, &old_affected_files, &input.affected_files).await;
let change_kind = if is_new {
crate::store::enforcement::ControlChangeKind::Created
} else {
crate::store::enforcement::ControlChangeKind::Updated
};
let reason_code = if is_new {
"control_created"
} else {
"control_updated"
};
if let Err(e) = crate::store::enforcement::record_event(
store,
crate::store::enforcement::EnforcementEventType::ControlChanged { change_kind },
crate::store::enforcement::SubjectKind::Control,
key.clone(),
"developer".to_string(),
None,
reason_code.to_string(),
None,
)
.await
{
tracing::warn!("gotcha_upsert: enforcement event recording failed for {key}: {e}");
}
if is_new {
let _ = crate::store::extraction::write_on_extraction(
store,
key,
&input.tags,
&input.affected_files,
)
.await;
}
Ok(serde_json::json!({
"ok": true,
"key": key,
"confidence": record.confidence.value,
"quality": quality_val,
"tier": tier_label,
}))
}
async fn upsert_commit_once(
store: &Store,
ctx: &RequestContext,
request_id: Uuid,
input: &protocol::GotchaDraftInput,
now: u64,
) -> Result<(Record, bool, Vec<String>), (ErrorCode, String)> {
let key = &input.key;
let existing = store
.get(key)
.await
.map_err(|e| (ErrorCode::StoreError, format!("store read failed: {e}")))?;
let is_tombstoned = existing
.as_ref()
.map(|r| matches!(r.lifecycle, RecordLifecycle::Tombstoned { .. }))
.unwrap_or(false);
let is_new = existing.is_none() || is_tombstoned;
let old_affected_files: Vec<String> = existing
.as_ref()
.filter(|_| !is_tombstoned)
.and_then(|r| r.payload_as::<GotchaRecord>())
.map(|g| g.affected_files)
.unwrap_or_default();
let gotcha = GotchaRecord {
rule: input.rule.clone(),
reason: input.reason.clone(),
severity: map_severity(&input.severity),
affected_files: input.affected_files.clone(),
ref_url: input.ref_url.clone(),
discovered_session: if is_new {
now
} else {
existing
.as_ref()
.and_then(|r| r.payload_as::<GotchaRecord>())
.map(|g| g.discovered_session)
.unwrap_or(now)
},
confirmed: false, };
let mut record = match existing {
Some(mut r) if !is_tombstoned => {
r.updated_at = now;
r.version.logical_clock += 1;
r.version.wall_clock = now;
r
}
_ => Record {
key: key.clone(),
value: String::new(),
category: Category::Gotcha,
priority: StorePriority::Normal,
tags: vec![],
created_at: now,
updated_at: now,
ref_url: None,
staleness: StalenessScore::fresh(),
lifecycle: RecordLifecycle::Active,
version: RecordVersion {
device_id: crate::store::stable_device_id(),
logical_clock: 1,
wall_clock: now,
},
quality: QualityScore::layer0_default(),
access_count: 0,
last_accessed: 0,
source: RecordSource::StaticAnalysis,
confidence: ConfidenceScore::for_new_record(&RecordSource::StaticAnalysis),
gap_analysis_score: 0.0,
payload: None,
},
};
record.value = format!("{} because {}", input.rule, input.reason);
record.category = Category::Gotcha;
record.lifecycle = RecordLifecycle::Active;
record.priority = map_priority(&input.priority);
record.tags = input.tags.clone();
let source = match input.source.as_deref() {
Some("developer_manual") => RecordSource::DeveloperManual,
Some("import") => RecordSource::Import,
_ => RecordSource::ClaudeEnrich,
};
record.source = source.clone();
record.confidence = ConfidenceScore::for_new_record(&source);
if is_tombstoned {
record.confidence.confirmation_count = 0;
}
record.payload = serde_json::to_value(&gotcha).ok();
record.quality = quality::analyze(&record);
let file_link_updates =
compute_file_link_updates(store, key, &old_affected_files, &input.affected_files).await;
let (audit_key, audit_bytes) = make_audit(ctx, request_id, "gotcha_upsert", key, true, None)
.ok_or_else(|| (ErrorCode::Internal, "audit serialization failed".into()))?;
let mut ops: Vec<KnowledgeWriteOp<'_>> = Vec::new();
ops.push(KnowledgeWriteOp::PutRecord {
key,
record: &record,
});
for (fkey, frec) in &file_link_updates {
ops.push(KnowledgeWriteOp::PutRecord {
key: fkey.as_str(),
record: frec,
});
}
ops.push(KnowledgeWriteOp::PutRaw {
key: &audit_key,
value: &audit_bytes,
});
store
.transact_knowledge(&ops)
.await
.map_err(|e| (ErrorCode::StoreError, format!("transact failed: {e}")))?;
Ok((record, is_new, old_affected_files))
}
pub(crate) async fn handle_gotcha_confirm(
store: &Store,
ctx: &RequestContext,
request_id: Uuid,
input: &protocol::GotchaConfirmInput,
) -> HandlerResult {
let now = now_secs();
let key = &input.key;
if !key.starts_with("gotcha:") {
return Err((
ErrorCode::ValidationFailed,
"confirm only applies to gotcha: keys".into(),
));
}
let (record, affected_files) =
retry_on_write_conflict(|| confirm_commit_once(store, ctx, request_id, key, now)).await?;
let confidence_val = record.confidence.value;
let quality_val = record.quality.value;
sync_has_gotcha_edges(store, key, &[], &affected_files).await;
for file_path in &affected_files {
let consulted_key = format!("session:consulted:file:{file_path}");
let _ = store.delete(&consulted_key).await;
}
if let Err(e) = crate::store::enforcement::record_event(
store,
crate::store::enforcement::EnforcementEventType::ControlChanged {
change_kind: crate::store::enforcement::ControlChangeKind::Confirmed,
},
crate::store::enforcement::SubjectKind::Control,
key.clone(),
"developer".to_string(),
None,
"control_confirmed".to_string(),
None,
)
.await
{
tracing::warn!("gotcha_confirm: enforcement event recording failed for {key}: {e}");
}
let _ = crate::store::extraction::mark_outcome(
store,
key,
crate::store::extraction::ExtractionOutcome::Confirmed,
)
.await;
Ok(serde_json::json!({
"ok": true,
"key": key,
"confirmed": true,
"confidence": confidence_val,
"quality": quality_val,
}))
}
async fn confirm_commit_once(
store: &Store,
ctx: &RequestContext,
request_id: Uuid,
key: &str,
now: u64,
) -> Result<(Record, Vec<String>), (ErrorCode, String)> {
let mut record = store
.get(key)
.await
.map_err(|e| (ErrorCode::StoreError, format!("store read: {e}")))?
.ok_or_else(|| (ErrorCode::NotFound, format!("record not found: {key}")))?;
if record.category != Category::Gotcha {
return Err((
ErrorCode::ValidationFailed,
format!("{key} is not a gotcha record"),
));
}
if !matches!(record.lifecycle, RecordLifecycle::Active) {
return Err((
ErrorCode::InvalidStateTransition,
format!("{key} is tombstoned — cannot confirm"),
));
}
if let Some(ref mut payload) = record.payload {
if let Some(obj) = payload.as_object_mut() {
if let Some(sev) = obj
.get("severity")
.and_then(|v| v.as_str())
.map(|s| s.to_lowercase())
{
obj.insert("severity".to_string(), serde_json::Value::String(sev));
}
obj.insert("confirmed".to_string(), serde_json::Value::Bool(true));
}
}
record.source = RecordSource::DeveloperManual;
record.confidence.value = ConfidenceScore::base_for_source(&RecordSource::DeveloperManual);
record.confidence.confirmation_count += 1;
record.quality = quality::analyze(&record);
record.updated_at = now;
record.version.logical_clock += 1;
record.version.wall_clock = now;
let affected_files: Vec<String> = record
.payload_as::<GotchaRecord>()
.map(|g| g.affected_files)
.unwrap_or_default();
let file_link_updates = compute_file_link_updates(store, key, &[], &affected_files).await;
let confirmation_updates = compute_confirmation_propagation(store, &affected_files).await;
let (audit_key, audit_bytes) =
make_audit(ctx, request_id, "gotcha_confirm", key, true, None)
.ok_or_else(|| (ErrorCode::Internal, "audit serialization failed".into()))?;
let mut ops: Vec<KnowledgeWriteOp<'_>> = Vec::new();
ops.push(KnowledgeWriteOp::PutRecord {
key,
record: &record,
});
for (fkey, frec) in &file_link_updates {
ops.push(KnowledgeWriteOp::PutRecord {
key: fkey.as_str(),
record: frec,
});
}
for (fkey, frec) in &confirmation_updates {
ops.push(KnowledgeWriteOp::PutRecord {
key: fkey.as_str(),
record: frec,
});
}
ops.push(KnowledgeWriteOp::PutRaw {
key: &audit_key,
value: &audit_bytes,
});
store
.transact_knowledge(&ops)
.await
.map_err(|e| (ErrorCode::StoreError, format!("transact failed: {e}")))?;
Ok((record, affected_files))
}
pub(crate) async fn handle_gotcha_tombstone(
store: &Store,
ctx: &RequestContext,
request_id: Uuid,
input: &protocol::GotchaTombstoneInput,
) -> HandlerResult {
let now = now_secs();
let key = &input.key;
if !key.starts_with("gotcha:") {
return Err((
ErrorCode::ValidationFailed,
"tombstone only applies to gotcha: keys".into(),
));
}
let (affected_files, neg_exemplar_data) =
retry_on_write_conflict(|| tombstone_commit_once(store, ctx, request_id, key, now)).await?;
sync_has_gotcha_edges(store, key, &affected_files, &[]).await;
if let Err(e) = crate::store::enforcement::record_event(
store,
crate::store::enforcement::EnforcementEventType::ControlChanged {
change_kind: crate::store::enforcement::ControlChangeKind::Deleted,
},
crate::store::enforcement::SubjectKind::Control,
key.clone(),
"developer".to_string(),
None,
"control_deleted".to_string(),
None,
)
.await
{
tracing::warn!("gotcha_tombstone: enforcement event recording failed for {key}: {e}");
}
if let Some((rule, reason, severity)) = neg_exemplar_data.as_ref() {
match crate::store::negative_exemplar::write_on_tombstone(
store,
key,
rule,
reason,
severity,
&affected_files,
)
.await
{
Ok(n) => tracing::debug!(
"gotcha_tombstone (mcp): negative_exemplar archived for {key} across {n} dirname(s)"
),
Err(e) => tracing::warn!(
"gotcha_tombstone (mcp): negative_exemplar write failed for {key}: {e}"
),
}
}
let _ = crate::store::extraction::mark_outcome(
store,
key,
crate::store::extraction::ExtractionOutcome::Tombstoned,
)
.await;
Ok(serde_json::json!({"ok": true, "key": key, "tombstoned": true}))
}
async fn tombstone_commit_once(
store: &Store,
ctx: &RequestContext,
request_id: Uuid,
key: &str,
now: u64,
) -> Result<
(
Vec<String>,
Option<(String, String, crate::store::Priority)>,
),
(ErrorCode, String),
> {
let mut record = store
.get(key)
.await
.map_err(|e| (ErrorCode::StoreError, format!("store read: {e}")))?
.ok_or_else(|| (ErrorCode::NotFound, format!("record not found: {key}")))?;
let gotcha_snapshot = record.payload_as::<GotchaRecord>();
let affected_files: Vec<String> = gotcha_snapshot
.as_ref()
.map(|g| g.affected_files.clone())
.unwrap_or_default();
let neg_exemplar_data: Option<(String, String, crate::store::Priority)> = gotcha_snapshot
.as_ref()
.map(|g| (g.rule.clone(), g.reason.clone(), g.severity.clone()));
record.lifecycle = RecordLifecycle::Tombstoned {
reason: TombstoneReason::ManualDeletion,
at: now,
};
record.updated_at = now;
record.version.logical_clock += 1;
record.version.wall_clock = now;
let file_link_updates = compute_file_link_updates(store, key, &affected_files, &[]).await;
let (audit_key, audit_bytes) = make_audit(ctx, request_id, "gotcha_tombstone", key, true, None)
.ok_or_else(|| (ErrorCode::Internal, "audit serialization failed".into()))?;
let mut ops: Vec<KnowledgeWriteOp<'_>> = Vec::new();
ops.push(KnowledgeWriteOp::PutRecord {
key,
record: &record,
});
for (fkey, frec) in &file_link_updates {
ops.push(KnowledgeWriteOp::PutRecord {
key: fkey.as_str(),
record: frec,
});
}
ops.push(KnowledgeWriteOp::PutRaw {
key: &audit_key,
value: &audit_bytes,
});
store
.transact_knowledge(&ops)
.await
.map_err(|e| (ErrorCode::StoreError, format!("transact failed: {e}")))?;
Ok((affected_files, neg_exemplar_data))
}
pub(crate) async fn handle_file_enrich(
store: &Store,
ctx: &RequestContext,
request_id: Uuid,
input: &protocol::FileEnrichInput,
) -> HandlerResult {
let now = now_secs();
let file_key = format!("file:{}", input.path);
let mut record = store
.get(&file_key)
.await
.map_err(|e| (ErrorCode::StoreError, format!("store read: {e}")))?
.ok_or_else(|| {
(
ErrorCode::NotFound,
format!("file record not found: {file_key} (must be created by init/reparse)"),
)
})?;
if input.purpose.is_empty() && record.value.is_empty() {
return Err((
ErrorCode::ValidationFailed,
"purpose must not be empty".into(),
));
}
if !matches!(record.lifecycle, RecordLifecycle::Active) {
return Err((
ErrorCode::InvalidStateTransition,
format!("{file_key} is tombstoned"),
));
}
let was_confirmed =
record.source == RecordSource::DeveloperManual || record.confidence.value >= 0.80;
if let Some(ref mut payload) = record.payload {
if let Some(obj) = payload.as_object_mut() {
if !input.purpose.is_empty() {
obj.insert(
"purpose".to_string(),
serde_json::Value::String(input.purpose.clone()),
);
}
if !input.entry_points.is_empty() {
obj.insert(
"entry_points".to_string(),
serde_json::json!(input.entry_points),
);
}
if !input.decision_keys.is_empty() {
obj.insert(
"decision_keys".to_string(),
serde_json::json!(input.decision_keys),
);
}
if !input.todos.is_empty() {
obj.insert("todos".to_string(), serde_json::json!(input.todos));
}
}
}
if !input.purpose.is_empty() {
record.value = input.purpose.clone();
}
record.updated_at = now;
record.version.logical_clock += 1;
record.version.wall_clock = now;
record.priority = map_priority(&input.priority);
if !was_confirmed {
record.source = RecordSource::ClaudeEnrich;
record.confidence = ConfidenceScore::for_new_record(&RecordSource::ClaudeEnrich);
}
if !input.tags.is_empty() {
record.tags = input.tags.clone();
}
record.quality = quality::analyze(&record);
let confidence_val = record.confidence.value;
let quality_val = record.quality.value;
let tier_label = format!("{:?}", record.quality.tier);
let (audit_key, audit_bytes) =
make_audit(ctx, request_id, "file_enrich", &file_key, true, None)
.ok_or_else(|| (ErrorCode::Internal, "audit serialization failed".into()))?;
let ops = vec![
KnowledgeWriteOp::PutRecord {
key: &file_key,
record: &record,
},
KnowledgeWriteOp::PutRaw {
key: &audit_key,
value: &audit_bytes,
},
];
store
.transact_knowledge(&ops)
.await
.map_err(|e| (ErrorCode::StoreError, format!("transact failed: {e}")))?;
Ok(serde_json::json!({
"ok": true,
"key": file_key,
"confidence": confidence_val,
"quality": quality_val,
"tier": tier_label,
}))
}
pub(crate) async fn handle_file_reparse(
store: &Store,
ctx: &RequestContext,
request_id: Uuid,
input: &protocol::FileReparseInput,
repo_root: &std::path::Path,
) -> HandlerResult {
if input.path.is_empty() {
return Err((ErrorCode::ValidationFailed, "path must not be empty".into()));
}
let staged = crate::analysis::reparse::reparse_staged(store, repo_root, &input.path)
.await
.map_err(|e| (ErrorCode::StoreError, format!("reparse failed: {e}")))?;
let Some((file_key, record)) = staged else {
let (audit_key, audit_bytes) =
make_audit(ctx, request_id, "file_reparse", &input.path, true, None)
.ok_or_else(|| (ErrorCode::Internal, "audit serialization failed".into()))?;
let ops = vec![KnowledgeWriteOp::PutRaw {
key: &audit_key,
value: &audit_bytes,
}];
store
.transact_knowledge(&ops)
.await
.map_err(|e| (ErrorCode::StoreError, format!("audit write failed: {e}")))?;
return Ok(serde_json::json!({"ok": true}));
};
let (audit_key, audit_bytes) =
make_audit(ctx, request_id, "file_reparse", &input.path, true, None)
.ok_or_else(|| (ErrorCode::Internal, "audit serialization failed".into()))?;
let ops = vec![
KnowledgeWriteOp::PutRecord {
key: &file_key,
record: &record,
},
KnowledgeWriteOp::PutRaw {
key: &audit_key,
value: &audit_bytes,
},
];
store
.transact_knowledge(&ops)
.await
.map_err(|e| (ErrorCode::StoreError, format!("transact failed: {e}")))?;
if let Some(fr) = record.payload_as::<FileRecord>() {
if let Err(e) = crate::health::staleness::cascade_staleness_to_gotchas(store, &fr).await {
tracing::warn!(
"file_reparse: staleness cascade failed for {}: {e}",
input.path
);
}
}
Ok(serde_json::json!({"ok": true}))
}
pub(crate) async fn handle_doc_capture(
store: &Store,
ctx: &RequestContext,
request_id: Uuid,
input: &protocol::DocCaptureInput,
repo_root: &std::path::Path,
) -> HandlerResult {
if input.path.is_empty() {
return Err((ErrorCode::ValidationFailed, "path must not be empty".into()));
}
let abs_path = repo_root.join(&input.path);
let content = std::fs::read_to_string(&abs_path).unwrap_or_default();
let purpose = crate::store::session::extract_doc_comment(&input.path, &content);
if purpose.is_empty() {
if let Some((ak, ab)) = make_audit(ctx, request_id, "doc_capture", &input.path, true, None)
{
let _ = store.put_raw(&ak, &ab).await;
}
return Ok(serde_json::json!({"ok": true}));
}
let file_key = format!("file:{}", input.path);
let mut record = match store.get(&file_key).await {
Ok(Some(r)) => r,
_ => {
if let Some((ak, ab)) =
make_audit(ctx, request_id, "doc_capture", &input.path, true, None)
{
let _ = store.put_raw(&ak, &ab).await;
}
return Ok(serde_json::json!({"ok": true}));
}
};
if record.source != RecordSource::StaticAnalysis {
if let Some((ak, ab)) = make_audit(ctx, request_id, "doc_capture", &input.path, true, None)
{
let _ = store.put_raw(&ak, &ab).await;
}
return Ok(serde_json::json!({"ok": true}));
}
if let Some(mut fr) = record.payload_as::<FileRecord>() {
fr.purpose = purpose.clone();
record.payload = serde_json::to_value(&fr).ok();
}
let now = now_secs();
record.value = purpose;
record.source = RecordSource::SessionHook;
record.confidence.value = 0.65;
record.quality = QualityScore::doc_comment_default();
record.updated_at = now;
record.version.logical_clock += 1;
record.version.wall_clock = now;
let (audit_key, audit_bytes) =
make_audit(ctx, request_id, "doc_capture", &input.path, true, None)
.ok_or_else(|| (ErrorCode::Internal, "audit serialization failed".into()))?;
let ops = vec![
KnowledgeWriteOp::PutRecord {
key: &file_key,
record: &record,
},
KnowledgeWriteOp::PutRaw {
key: &audit_key,
value: &audit_bytes,
},
];
store
.transact_knowledge(&ops)
.await
.map_err(|e| (ErrorCode::StoreError, format!("transact failed: {e}")))?;
Ok(serde_json::json!({"ok": true}))
}
pub(crate) async fn handle_decision_upsert(
store: &Store,
ctx: &RequestContext,
request_id: Uuid,
input: &protocol::DecisionUpsertInput,
) -> HandlerResult {
let now = now_secs();
let key = format!("decision:{}", input.slug);
if input.slug.is_empty() {
return Err((ErrorCode::ValidationFailed, "slug must not be empty".into()));
}
if input.value.is_empty() {
return Err((
ErrorCode::ValidationFailed,
"value must not be empty".into(),
));
}
let existing = store
.get(&key)
.await
.map_err(|e| (ErrorCode::StoreError, format!("store read: {e}")))?;
let was_confirmed = existing
.as_ref()
.map(|r| r.source == RecordSource::DeveloperManual || r.confidence.value >= 0.80)
.unwrap_or(false);
let mut record = match existing {
Some(mut r) => {
r.updated_at = now;
r.version.logical_clock += 1;
r.version.wall_clock = now;
r
}
None => Record {
key: key.clone(),
value: String::new(),
category: Category::Decision,
priority: StorePriority::Normal,
tags: vec![],
created_at: now,
updated_at: now,
ref_url: None,
staleness: StalenessScore::fresh(),
lifecycle: RecordLifecycle::Active,
version: RecordVersion {
device_id: crate::store::stable_device_id(),
logical_clock: 1,
wall_clock: now,
},
quality: QualityScore::layer0_default(),
access_count: 0,
last_accessed: 0,
source: RecordSource::StaticAnalysis,
confidence: ConfidenceScore::for_new_record(&RecordSource::StaticAnalysis),
gap_analysis_score: 0.0,
payload: None,
},
};
record.value = input.value.clone();
record.category = Category::Decision;
record.lifecycle = RecordLifecycle::Active;
record.priority = map_priority(&input.priority);
record.tags = input.tags.clone();
record.payload = Some(serde_json::json!({
"summary": input.summary,
"rationale": input.rationale,
}));
if !was_confirmed {
record.source = RecordSource::ClaudeEnrich;
record.confidence = ConfidenceScore::for_new_record(&RecordSource::ClaudeEnrich);
}
record.quality = quality::analyze(&record);
let confidence_val = record.confidence.value;
let quality_val = record.quality.value;
let tier_label = format!("{:?}", record.quality.tier);
let (audit_key, audit_bytes) = make_audit(ctx, request_id, "decision_upsert", &key, true, None)
.ok_or_else(|| (ErrorCode::Internal, "audit serialization failed".into()))?;
let ops = vec![
KnowledgeWriteOp::PutRecord {
key: &key,
record: &record,
},
KnowledgeWriteOp::PutRaw {
key: &audit_key,
value: &audit_bytes,
},
];
store
.transact_knowledge(&ops)
.await
.map_err(|e| (ErrorCode::StoreError, format!("transact failed: {e}")))?;
Ok(serde_json::json!({
"ok": true,
"key": key,
"confidence": confidence_val,
"quality": quality_val,
"tier": tier_label,
}))
}
pub(crate) async fn handle_dev_note_upsert(
store: &Store,
ctx: &RequestContext,
request_id: Uuid,
input: &protocol::DevNoteUpsertInput,
) -> HandlerResult {
let now = now_secs();
if input.text.is_empty() {
return Err((ErrorCode::ValidationFailed, "text must not be empty".into()));
}
let key = match &input.key {
Some(k) => {
if !k.starts_with("dev_note:") {
return Err((
ErrorCode::ValidationFailed,
"key must start with dev_note:".into(),
));
}
k.clone()
}
None => {
let slug: String = input
.text
.chars()
.take(30)
.collect::<String>()
.to_lowercase()
.replace(|c: char| !c.is_alphanumeric(), "-");
format!("dev_note:{slug}-{now}")
}
};
let existing = store
.get(&key)
.await
.map_err(|e| (ErrorCode::StoreError, format!("store read: {e}")))?;
let mut record = match existing {
Some(mut r) => {
r.updated_at = now;
r.version.logical_clock += 1;
r.version.wall_clock = now;
r
}
None => Record {
key: key.clone(),
value: String::new(),
category: Category::DevNote,
priority: StorePriority::Normal,
tags: vec![],
created_at: now,
updated_at: now,
ref_url: None,
staleness: StalenessScore::fresh(),
lifecycle: RecordLifecycle::Active,
version: RecordVersion {
device_id: crate::store::stable_device_id(),
logical_clock: 1,
wall_clock: now,
},
quality: QualityScore::layer0_default(),
access_count: 0,
last_accessed: 0,
source: RecordSource::DeveloperManual,
confidence: ConfidenceScore::for_new_record(&RecordSource::DeveloperManual),
gap_analysis_score: 0.0,
payload: None,
},
};
record.value = input.text.clone();
record.category = Category::DevNote;
record.lifecycle = RecordLifecycle::Active;
record.priority = map_priority(&input.priority);
if !input.tags.is_empty() {
record.tags = input.tags.clone();
}
record.quality = quality::analyze(&record);
let quality_val = record.quality.value;
let tier_label = format!("{:?}", record.quality.tier);
let (audit_key, audit_bytes) = make_audit(ctx, request_id, "dev_note_upsert", &key, true, None)
.ok_or_else(|| (ErrorCode::Internal, "audit serialization failed".into()))?;
let ops = vec![
KnowledgeWriteOp::PutRecord {
key: &key,
record: &record,
},
KnowledgeWriteOp::PutRaw {
key: &audit_key,
value: &audit_bytes,
},
];
store
.transact_knowledge(&ops)
.await
.map_err(|e| (ErrorCode::StoreError, format!("transact failed: {e}")))?;
Ok(serde_json::json!({
"ok": true,
"key": key,
"quality": quality_val,
"tier": tier_label,
}))
}
pub(crate) async fn handle_mem_get(
store: &Store,
graph: &Arc<tokio::sync::RwLock<crate::graph::Graph>>,
ctx: &RequestContext,
request_id: Uuid,
input: &protocol::MemGetInput,
) -> HandlerResult {
if input.key.is_empty() {
return Err((ErrorCode::ValidationFailed, "key must not be empty".into()));
}
let record = match store.get(&input.key).await {
Ok(Some(r)) => {
if matches!(r.lifecycle, RecordLifecycle::Tombstoned { .. }) {
None
} else {
Some(r)
}
}
Ok(None) => None,
Err(e) => return Err((ErrorCode::StoreError, format!("store read: {e}"))),
};
let response = match &record {
Some(r) => {
let mut agent_json = super::tools::record_to_agent_json(r);
if r.category == Category::File {
if let Some(payload) = &r.payload {
if let Ok(fr) =
serde_json::from_value::<crate::store::record::FileRecord>(payload.clone())
{
use crate::analysis::blast_radius::BlastTier;
if let Some(ref br) = fr.blast_radius {
if matches!(br.tier, BlastTier::High | BlastTier::Critical) {
let warning = format!(
"HIGH IMPACT FILE: {} files directly depend on this. Modify with extra care.",
br.direct
);
if let Some(obj) = agent_json.as_object_mut() {
obj.insert("warnings".into(), serde_json::json!([warning]));
}
}
}
let blast_tier = fr
.blast_radius
.as_ref()
.map(|b| b.tier)
.unwrap_or(BlastTier::Isolated);
let cluster_size = {
let path = input.key.strip_prefix("file:").unwrap_or(&input.key);
let mut size = 0u32;
if let Ok(Some(idx_rec)) = store.get("cluster:index").await {
if let Some(payload) = idx_rec.payload {
if let Ok(idx) = serde_json::from_value::<
crate::analysis::clusters::ClusterIndex,
>(payload)
{
for cluster in &idx.clusters {
if cluster.members.iter().any(|m| m == path) {
size = cluster.size;
break;
}
}
}
}
}
size
};
let depth = crate::health::enrichment::enrichment_depth(
fr.line_count,
blast_tier,
cluster_size,
fr.gotcha_keys.len(),
None, );
if let Some(obj) = agent_json.as_object_mut() {
obj.insert(
"enrichment_depth_hint".into(),
serde_json::json!(depth.as_str()),
);
}
}
}
}
agent_json
}
None => serde_json::Value::Null,
};
let receipt = match crate::store::session::consultation_receipt_staged(&input.key, None) {
Ok(r) => r,
Err(e) => {
tracing::warn!(
request_id = %request_id,
key = %input.key,
"mem_get: consultation receipt staging failed: {e}"
);
if let Some((ak, ab)) =
make_session_audit(ctx, request_id, "mem_get", &input.key, true, None)
{
let _ = store.transact_sessions_raw(&[(&ak, &ab)]).await;
}
return Ok(response);
}
};
let (audit_key, audit_bytes) =
make_session_audit(ctx, request_id, "mem_get", &input.key, true, None)
.ok_or_else(|| (ErrorCode::Internal, "audit serialization failed".into()))?;
let writes: Vec<(&str, &[u8])> = vec![(&receipt.0, &receipt.1), (&audit_key, &audit_bytes)];
if let Err(e) = store.transact_sessions_raw(&writes).await {
tracing::warn!(
request_id = %request_id,
key = %input.key,
"mem_get: sessions transaction failed (fail-open): {e}"
);
}
let _ = crate::store::enforcement::record_event(
store,
crate::store::enforcement::EnforcementEventType::ReceiptMinted,
crate::store::enforcement::SubjectKind::File,
input.key.clone(),
"claude".to_string(),
None,
"consultation_requested".to_string(),
None,
)
.await;
if let Some(mut r) = record {
r.access_count += 1;
let key_owned = input.key.clone();
let graph_clone = Arc::clone(graph);
tokio::task::spawn(async move {
let g = graph_clone.read().await;
let s = g.store();
let _ = s.put(&key_owned, &r).await;
let agg_key = crate::store::session::today_key("analytics:hit_");
let _ = crate::store::session::upsert_daily_agg(s, &agg_key, &key_owned).await;
});
}
Ok(response)
}
pub(crate) async fn handle_mem_query(
store: &Store,
graph_ref: &crate::graph::Graph,
input: &protocol::MemQueryInput,
) -> HandlerResult {
use crate::graph::EdgeKind;
use crate::store::record::Category;
const MAX_QUERY_LIMIT: usize = 50;
let limit = (input.limit as usize).min(MAX_QUERY_LIMIT);
match input.mode {
protocol::QueryMode::Text => {
let scored = match store.search_scored(&input.query, limit).await {
Ok(r) => r,
Err(e) => return Err((ErrorCode::StoreError, format!("search: {e}"))),
};
let arr: Vec<serde_json::Value> = scored
.iter()
.filter(|(_, r)| {
matches!(r.lifecycle, RecordLifecycle::Active)
&& !matches!(r.category, Category::Session | Category::Analytics)
})
.map(|(score, r)| {
let mut obj = super::tools::record_to_agent_json(r);
if let serde_json::Value::Object(ref mut map) = obj {
map.insert(
"relevance".into(),
serde_json::json!((*score * 1000.0).round() / 1000.0),
);
}
obj
})
.collect();
Ok(serde_json::Value::Array(arr))
}
protocol::QueryMode::Tag => {
let query_lower = input.query.to_lowercase();
let mut matched: Vec<serde_json::Value> = Vec::new();
for ns in &[
"gotcha:",
"decision:",
"file:",
"stage:",
"dev_note:",
"dep:",
] {
if matched.len() >= limit {
break;
}
let records = match store.scan_prefix(ns).await {
Ok(rs) => rs,
Err(e) => return Err((ErrorCode::StoreError, format!("scan {ns}: {e}"))),
};
for record in records {
if matched.len() >= limit {
break;
}
if !matches!(record.lifecycle, RecordLifecycle::Active) {
continue;
}
if record
.tags
.iter()
.any(|t| t.to_lowercase().contains(&query_lower))
{
matched.push(super::tools::record_to_agent_json(&record));
}
}
}
Ok(serde_json::Value::Array(matched))
}
protocol::QueryMode::Graph => {
const GOTCHA_LIMIT: usize = 10;
const COCHANGE_LIMIT: usize = 5;
const IMPORT_LIMIT: usize = 5;
const DECISION_LIMIT: usize = 3;
const NOTE_LIMIT: usize = 3;
let edge_groups: &[(EdgeKind, &str, usize)] = &[
(EdgeKind::HasGotcha, "gotchas", GOTCHA_LIMIT),
(EdgeKind::CoChanges, "co_changes", COCHANGE_LIMIT),
(EdgeKind::Imports, "imports", IMPORT_LIMIT),
(EdgeKind::AffectedBy, "decisions", DECISION_LIMIT),
(EdgeKind::HasNote, "notes", NOTE_LIMIT),
];
let mut result = serde_json::Map::new();
result.insert(
"seed".to_string(),
serde_json::Value::String(input.query.clone()),
);
let mut summary_parts: Vec<String> = Vec::new();
let mut available: Vec<Vec<String>> = edge_groups
.iter()
.map(|(kind, _, cap)| {
graph_ref
.neighbors(&input.query, kind)
.into_iter()
.take(*cap)
.collect()
})
.collect();
let mut quotas: Vec<usize> = vec![0; edge_groups.len()];
let mut remaining = limit;
loop {
if remaining == 0 {
break;
}
let mut handed_out = 0usize;
for (i, slot_keys) in available.iter().enumerate() {
if remaining == 0 {
break;
}
if quotas[i] < slot_keys.len() {
quotas[i] += 1;
remaining -= 1;
handed_out += 1;
}
}
if handed_out == 0 {
break;
}
}
for (i, (kind, group_name, _)) in edge_groups.iter().enumerate() {
let keys = std::mem::take(&mut available[i]);
let mut group_records: Vec<serde_json::Value> = Vec::new();
for key in keys.iter().take(quotas[i]) {
if let Ok(Some(record)) = store.get(key).await {
if matches!(record.lifecycle, RecordLifecycle::Active) {
let mut entry = serde_json::Map::new();
entry.insert(
"key".into(),
serde_json::Value::String(record.key.clone()),
);
entry.insert(
"relationship".into(),
serde_json::Value::String(format!("{kind:?}")),
);
entry.insert(
"value".into(),
serde_json::Value::String(record.value.clone()),
);
entry.insert(
"confidence".into(),
serde_json::json!(record.confidence.value),
);
entry.insert("quality".into(), serde_json::json!(record.quality.value));
if let Some(payload) = &record.payload {
if let Some(confirmed) = payload.get("confirmed") {
entry.insert("confirmed".into(), confirmed.clone());
}
}
group_records.push(serde_json::Value::Object(entry));
}
}
}
if !group_records.is_empty() {
summary_parts.push(format!("{} {}", group_records.len(), group_name));
}
result.insert(
group_name.to_string(),
serde_json::Value::Array(group_records),
);
}
if remaining > 0 {
let dep_keys = graph_ref.neighbors(&input.query, &EdgeKind::DependencyAffects);
let mut dep_added = 0usize;
for key in dep_keys.iter().take(DECISION_LIMIT.min(remaining)) {
if let Ok(Some(record)) = store.get(key).await {
if matches!(record.lifecycle, RecordLifecycle::Active) {
let mut entry = serde_json::Map::new();
entry.insert(
"key".into(),
serde_json::Value::String(record.key.clone()),
);
entry.insert(
"relationship".into(),
serde_json::Value::String("DependencyAffects".to_string()),
);
entry.insert(
"value".into(),
serde_json::Value::String(record.value.clone()),
);
entry.insert(
"confidence".into(),
serde_json::json!(record.confidence.value),
);
entry.insert("quality".into(), serde_json::json!(record.quality.value));
if let Some(decisions) = result.get_mut("decisions") {
if let Some(arr) = decisions.as_array_mut() {
arr.push(serde_json::Value::Object(entry));
dep_added += 1;
}
}
}
}
}
let _ = dep_added; }
let summary = if summary_parts.is_empty() {
"No related records found".to_string()
} else {
summary_parts.join(", ")
};
result.insert("summary".to_string(), serde_json::Value::String(summary));
Ok(serde_json::Value::Object(result))
}
protocol::QueryMode::Semantic => Err((
ErrorCode::ValidationFailed,
"semantic search requires --features semantic (not enabled)".into(),
)),
}
}
pub(crate) async fn handle_mem_bootstrap(
store: &Store,
graph_ref: &crate::graph::Graph,
graph_arc: &Arc<tokio::sync::RwLock<crate::graph::Graph>>,
ctx: &RequestContext,
request_id: Uuid,
input: &protocol::MemBootstrapInput,
) -> Result<String, (ErrorCode, String)> {
let context_files = &input.context_files;
let assembly = super::tools::assemble_context_packet(store, graph_ref, context_files).await;
let (accepted, error_code) = match &assembly {
Ok(_) => (true, None),
Err(_) => (false, Some(ErrorCode::Internal)),
};
let mut session_writes: Vec<(String, Vec<u8>)> = Vec::new();
let bootstrap_agg_key = crate::store::session::today_key("analytics:bootstrap_");
if let Ok(staged) =
crate::store::session::upsert_daily_agg_staged(store, &bootstrap_agg_key, "__bootstrap__")
.await
{
session_writes.push(staged);
}
for file in context_files {
let file_key = if file.starts_with("file:") {
file.clone()
} else {
format!("file:{file}")
};
if let Ok(receipt) = crate::store::session::consultation_receipt_staged(&file_key, None) {
session_writes.push(receipt);
}
let hit_agg_key = crate::store::session::today_key("analytics:hit_");
if let Ok(staged) =
crate::store::session::upsert_daily_agg_staged(store, &hit_agg_key, &file_key).await
{
session_writes.push(staged);
}
}
let audit = make_session_audit(ctx, request_id, "mem_bootstrap", "", accepted, error_code)
.ok_or_else(|| (ErrorCode::Internal, "audit serialization failed".into()))?;
session_writes.push(audit);
let write_refs: Vec<(&str, &[u8])> = session_writes
.iter()
.map(|(k, v)| (k.as_str(), v.as_slice()))
.collect();
if let Err(e) = store.transact_sessions_raw(&write_refs).await {
tracing::warn!(
request_id = %request_id,
"mem_bootstrap: sessions transaction failed: {e}"
);
}
if assembly.is_ok() && !context_files.is_empty() {
let files_owned: Vec<String> = context_files.clone();
let graph_clone = Arc::clone(graph_arc);
tokio::task::spawn(async move {
let g = graph_clone.read().await;
let s = g.store();
for file in &files_owned {
let file_key = if file.starts_with("file:") {
file.clone()
} else {
format!("file:{file}")
};
if let Ok(Some(mut record)) = s.get(&file_key).await {
record.access_count += 1;
record.last_accessed = now_secs();
let _ = s.put(&file_key, &record).await;
}
}
});
}
match assembly {
Ok(packet) => Ok(packet.injection_string),
Err(e) => Err((ErrorCode::Internal, format!("bootstrap assembly: {e}"))),
}
}
use std::collections::HashSet;
async fn compute_file_link_updates(
store: &Store,
gotcha_key: &str,
old_files: &[String],
new_files: &[String],
) -> Vec<(String, Record)> {
let old_set: HashSet<&str> = old_files.iter().map(String::as_str).collect();
let new_set: HashSet<&str> = new_files.iter().map(String::as_str).collect();
let now = now_secs();
let mut updates = Vec::new();
for file_path in new_set.difference(&old_set) {
let file_key = format!("file:{file_path}");
match store.get(&file_key).await {
Ok(Some(mut record)) => {
if add_gotcha_key_to_record(&mut record, gotcha_key) {
record.updated_at = now;
record.version.logical_clock += 1;
record.version.wall_clock = now;
updates.push((file_key, record));
}
}
Ok(None) => {
let mut stub = Record::layer0_file_stub(
file_key.clone(),
crate::store::stable_device_id(),
1,
now,
);
let mut fr = FileRecord::layer0_stub(
*file_path,
vec![],
vec![],
vec![],
0,
0,
0,
None,
false,
0,
now,
);
fr.gotcha_keys = vec![gotcha_key.to_string()];
stub.payload = serde_json::to_value(&fr).ok();
updates.push((file_key, stub));
}
Err(_) => {}
}
}
for file_path in old_set.difference(&new_set) {
let file_key = format!("file:{file_path}");
if let Ok(Some(mut record)) = store.get(&file_key).await {
if remove_gotcha_key_from_record(&mut record, gotcha_key) {
record.updated_at = now;
record.version.logical_clock += 1;
record.version.wall_clock = now;
updates.push((file_key, record));
}
}
}
updates
}
async fn sync_has_gotcha_edges(
store: &Store,
gotcha_key: &str,
old_files: &[String],
new_files: &[String],
) {
use std::collections::HashSet;
let old_set: HashSet<&str> = old_files.iter().map(String::as_str).collect();
let new_set: HashSet<&str> = new_files.iter().map(String::as_str).collect();
let ts = now_secs().to_le_bytes();
for file_path in &new_set {
if !old_set.contains(*file_path) {
let file_key = format!("file:{file_path}");
let edge_key = Edge::new(&file_key, EdgeKind::HasGotcha, gotcha_key).to_key();
if let Err(e) = store.put_raw(&edge_key, &ts).await {
tracing::warn!("sync_has_gotcha_edges: add failed {file_key} → {gotcha_key}: {e}");
}
}
}
for file_path in &old_set {
if !new_set.contains(*file_path) {
let file_key = format!("file:{file_path}");
let edge_key = Edge::new(&file_key, EdgeKind::HasGotcha, gotcha_key).to_key();
if let Err(e) = store.delete(&edge_key).await {
tracing::warn!(
"sync_has_gotcha_edges: remove failed {file_key} → {gotcha_key}: {e}"
);
}
}
}
}
async fn compute_confirmation_propagation(
store: &Store,
affected_files: &[String],
) -> Vec<(String, Record)> {
let now = now_secs();
let mut updates = Vec::new();
for file_path in affected_files {
let file_key = format!("file:{file_path}");
if let Ok(Some(mut record)) = store.get(&file_key).await {
record.confidence.confirmation_count += 1;
record.updated_at = now;
record.version.logical_clock += 1;
record.version.wall_clock = now;
updates.push((file_key, record));
}
}
updates
}
fn add_gotcha_key_to_record(record: &mut Record, gotcha_key: &str) -> bool {
let Some(payload) = record.payload.as_mut() else {
record.payload = Some(serde_json::json!({ "gotcha_keys": [gotcha_key] }));
return true;
};
let Some(obj) = payload.as_object_mut() else {
record.payload = Some(serde_json::json!({ "gotcha_keys": [gotcha_key] }));
return true;
};
match obj.get_mut("gotcha_keys") {
Some(existing) => {
if let Some(arr) = existing.as_array_mut() {
if arr.iter().any(|v| v.as_str() == Some(gotcha_key)) {
return false; }
arr.push(serde_json::Value::String(gotcha_key.to_string()));
true
} else {
*existing = serde_json::json!([gotcha_key]);
true
}
}
None => {
obj.insert("gotcha_keys".into(), serde_json::json!([gotcha_key]));
true
}
}
}
fn remove_gotcha_key_from_record(record: &mut Record, gotcha_key: &str) -> bool {
let Some(payload) = record.payload.as_mut() else {
return false;
};
let Some(obj) = payload.as_object_mut() else {
return false;
};
let Some(existing) = obj.get_mut("gotcha_keys") else {
return false;
};
let Some(arr) = existing.as_array_mut() else {
return false;
};
let before = arr.len();
arr.retain(|v| v.as_str() != Some(gotcha_key));
arr.len() != before
}
pub(crate) async fn handle_record_import(
store: &Store,
ctx: &RequestContext,
request_id: Uuid,
input: &protocol::RecordImportInput,
) -> HandlerResult {
const CHUNK: usize = 200;
let mut imported: u64 = 0;
let mut skipped: u64 = 0;
let mut chunk_buf: Vec<&Record> = Vec::with_capacity(CHUNK);
let valid_prefixes = [
"gotcha:",
"decision:",
"dev_note:",
"file:",
"stage:",
"dep:",
];
let mut accepted_refs: Vec<&Record> = Vec::with_capacity(input.records.len());
for r in &input.records {
let key_str = r.key.as_str();
if !valid_prefixes.iter().any(|p| key_str.starts_with(p)) {
skipped += 1;
continue;
}
if crate::store::Durability::for_key(key_str) != crate::store::Durability::Immediate {
skipped += 1;
continue;
}
accepted_refs.push(r);
}
for chunk in accepted_refs.chunks(CHUNK) {
chunk_buf.clear();
chunk_buf.extend(chunk.iter().copied());
let chunk_target = format!("record_import:{}records", chunk_buf.len());
let audit = make_audit(ctx, request_id, "record_import", &chunk_target, true, None)
.ok_or_else(|| (ErrorCode::Internal, "audit serialization failed".into()))?;
let mut ops: Vec<KnowledgeWriteOp<'_>> = Vec::with_capacity(chunk_buf.len() + 1);
for r in &chunk_buf {
ops.push(KnowledgeWriteOp::PutRecord {
key: r.key.as_str(),
record: r,
});
}
ops.push(KnowledgeWriteOp::PutRaw {
key: &audit.0,
value: &audit.1,
});
store.transact_knowledge(&ops).await.map_err(|e| {
(
ErrorCode::StoreError,
format!("import transact failed: {e}"),
)
})?;
imported += chunk_buf.len() as u64;
}
Ok(serde_json::json!({
"ok": true,
"imported": imported,
"skipped": skipped,
}))
}
pub(crate) async fn handle_mem_set(
graph_arc: &Arc<tokio::sync::RwLock<crate::graph::Graph>>,
_ctx: &RequestContext,
_request_id: Uuid,
params: &crate::mcp::types::MemSetParams,
) -> String {
match params.action.as_str() {
"confirm" => apply_mem_set_confirm(graph_arc, ¶ms.key).await,
"delete" => apply_mem_set_delete(graph_arc, ¶ms.key).await,
"write" | "" => apply_mem_set_write(graph_arc, params).await,
other => serde_json::json!({
"error": format!("unknown action: {other}. Valid: write, confirm, delete")
})
.to_string(),
}
}
async fn apply_mem_set_write(
graph_arc: &Arc<tokio::sync::RwLock<crate::graph::Graph>>,
params: &crate::mcp::types::MemSetParams,
) -> String {
let graph = graph_arc.read().await;
let store = graph.store();
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let valid_prefix = ["gotcha:", "decision:", "dev_note:"]
.iter()
.any(|p| params.key.starts_with(p));
if !valid_prefix {
return serde_json::json!({
"error": "key must start with gotcha:, decision:, or dev_note:"
})
.to_string();
}
let category = match params.category.as_str() {
"Gotcha" => Category::Gotcha,
"Decision" => Category::Decision,
"DevNote" => Category::DevNote,
other => {
return serde_json::json!({
"error": format!("unknown category: {other}. Valid: Gotcha, Decision, DevNote")
})
.to_string();
}
};
let priority = match params.priority.as_str() {
"Critical" => StorePriority::Critical,
"High" => StorePriority::High,
"Low" => StorePriority::Low,
_ => StorePriority::Normal,
};
let existing_record =
match super::tools::resolve_existing_for_write(store.get(¶ms.key).await) {
Ok(record) => record,
Err(error_json) => return error_json,
};
let is_tombstoned = existing_record
.as_ref()
.map(|r| matches!(r.lifecycle, RecordLifecycle::Tombstoned { .. }))
.unwrap_or(false);
let expected_category = match params.key.split(':').next().unwrap_or("") {
"gotcha" => Category::Gotcha,
"decision" => Category::Decision,
"dev_note" => Category::DevNote,
_ => unreachable!("key prefix already validated"),
};
if category != expected_category {
return serde_json::json!({
"error": format!(
"key prefix requires category {expected_category:?}, got {category:?}"
)
})
.to_string();
}
let is_new_record = existing_record.is_none() || is_tombstoned;
if is_new_record {
let check_payload = match ¶ms.payload {
serde_json::Value::String(s) => serde_json::from_str::<serde_json::Value>(s)
.unwrap_or_else(|_| params.payload.clone()),
_ => params.payload.clone(),
};
let obj = check_payload.as_object();
if let Err(msg) = match &category {
Category::Gotcha => {
let valid = obj.is_some_and(|o| {
let rule = o.get("rule").and_then(|v| v.as_str()).unwrap_or("");
let reason = o.get("reason").and_then(|v| v.as_str()).unwrap_or("");
!rule.is_empty() && !reason.is_empty()
});
if valid {
Ok(())
} else {
Err("gotcha requires payload with non-empty 'rule' and 'reason'")
}
}
Category::Decision => {
let valid = obj.is_some_and(|o| {
let summary = o.get("summary").and_then(|v| v.as_str()).unwrap_or("");
let rationale = o.get("rationale").and_then(|v| v.as_str()).unwrap_or("");
!summary.is_empty() && !rationale.is_empty()
});
if valid {
Ok(())
} else {
Err("decision requires payload with non-empty 'summary' and 'rationale'")
}
}
Category::DevNote => {
if params.value.is_empty() {
Err("dev_note requires non-empty value")
} else {
Ok(())
}
}
_ => Ok(()),
} {
return serde_json::json!({"error": msg}).to_string();
}
}
let was_confirmed = existing_record
.as_ref()
.map(|r| {
!is_tombstoned
&& (r.source == RecordSource::DeveloperManual || r.confidence.value >= 0.80)
})
.unwrap_or(false);
let old_affected_files: Vec<String> = existing_record
.as_ref()
.filter(|r| r.key.starts_with("gotcha:"))
.and_then(|r| r.payload_as::<GotchaRecord>())
.map(|g| g.affected_files)
.unwrap_or_default();
let mut record = match existing_record {
Some(existing) => existing,
_ => Record {
key: params.key.clone(),
value: String::new(),
category: category.clone(),
priority: StorePriority::Normal,
tags: vec![],
created_at: now,
updated_at: now,
ref_url: None,
staleness: StalenessScore::fresh(),
lifecycle: RecordLifecycle::Active,
version: RecordVersion {
device_id: crate::store::stable_device_id(),
logical_clock: 0,
wall_clock: now,
},
quality: QualityScore::layer0_default(),
access_count: 0,
last_accessed: 0,
source: RecordSource::StaticAnalysis,
confidence: ConfidenceScore::for_new_record(&RecordSource::StaticAnalysis),
gap_analysis_score: 0.0,
payload: Some(serde_json::json!({})),
},
};
if is_tombstoned {
record.confidence.confirmation_count = 0;
}
record.lifecycle = RecordLifecycle::Active;
record.value = params.value.clone();
record.category = category;
record.updated_at = now;
record.version.logical_clock += 1;
record.version.wall_clock = now;
record.priority = priority;
if was_confirmed {
if !params.tags.is_empty() {
record.tags = params.tags.clone();
}
} else {
record.source = RecordSource::ClaudeEnrich;
record.confidence = ConfidenceScore::for_new_record(&RecordSource::ClaudeEnrich);
record.tags = params.tags.clone();
}
let new_payload = match ¶ms.payload {
serde_json::Value::String(s) => {
serde_json::from_str::<serde_json::Value>(s).unwrap_or_else(|_| params.payload.clone())
}
other => other.clone(),
};
if new_payload.is_object() && !new_payload.as_object().is_none_or(|o| o.is_empty()) {
if let Some(existing_payload) = &record.payload {
let mut merged = existing_payload.clone();
if let (Some(base), Some(overlay)) = (merged.as_object_mut(), new_payload.as_object()) {
for (k, v) in overlay {
if k == "gotcha_keys" {
if let (Some(existing_arr), Some(new_arr)) = (
base.get(k).and_then(|e| e.as_array()).cloned(),
v.as_array(),
) {
let mut union = existing_arr;
for item in new_arr {
if !union.contains(item) {
union.push(item.clone());
}
}
base.insert(k.clone(), serde_json::Value::Array(union));
continue;
}
}
base.insert(k.clone(), v.clone());
}
record.payload = Some(serde_json::Value::Object(base.clone()));
} else {
record.payload = Some(new_payload);
}
} else {
record.payload = Some(new_payload);
}
}
if record.key.starts_with("gotcha:") {
if let Some(ref mut payload) = record.payload {
if let Some(obj) = payload.as_object_mut() {
if let Some(sev) = obj
.get("severity")
.and_then(|v| v.as_str())
.map(|s| s.to_lowercase())
{
obj.insert("severity".to_string(), serde_json::Value::String(sev));
}
}
}
}
if !was_confirmed {
record.confidence = ConfidenceScore::for_new_record(&RecordSource::ClaudeEnrich);
}
record.quality = quality::analyze(&record);
let tier_label = format!("{:?}", record.quality.tier);
let record_key = record.key.clone();
if let Err(e) = store.put(&record.key, &record).await {
return serde_json::json!({"error": e.to_string()}).to_string();
}
let affected_files: Vec<String> = if record_key.starts_with("gotcha:") {
record
.payload
.as_ref()
.and_then(|p| p.get("affected_files"))
.and_then(|v| serde_json::from_value::<Vec<String>>(v.clone()).ok())
.unwrap_or_default()
} else {
vec![]
};
if record_key.starts_with("gotcha:") {
if let Err(e) = crate::store::gotcha_ops::sync_gotcha_file_links(
store,
&record_key,
&old_affected_files,
&affected_files,
)
.await
{
tracing::warn!("mem_set: file link sync failed for {record_key}: {e}");
crate::store::repair::mark_dirty(
store,
&record_key,
&format!("mem_set link sync failed: {e}"),
)
.await;
}
}
let old_affected_set: HashSet<&str> = old_affected_files.iter().map(String::as_str).collect();
let new_affected_set: HashSet<&str> = affected_files.iter().map(String::as_str).collect();
drop(graph);
if record_key.starts_with("gotcha:") {
let mut graph = graph_arc.write().await;
for file_path in old_affected_set.difference(&new_affected_set) {
let file_key = format!("file:{file_path}");
if let Err(e) = graph
.remove_edge(&file_key, &EdgeKind::HasGotcha, &record_key)
.await
{
tracing::warn!(
"mem_set: stale edge removal failed for {file_key} → {record_key}: {e}"
);
crate::store::repair::mark_dirty(
graph.store(),
&record_key,
&format!("mem_set edge remove failed: {e}"),
)
.await;
}
}
for file_path in new_affected_set.difference(&old_affected_set) {
let file_key = format!("file:{file_path}");
if let Err(e) = graph
.add_edge(&file_key, EdgeKind::HasGotcha, &record_key)
.await
{
tracing::warn!("mem_set: edge add failed for {file_key} → {record_key}: {e}");
crate::store::repair::mark_dirty(
graph.store(),
&record_key,
&format!("mem_set edge add failed: {e}"),
)
.await;
}
}
}
serde_json::json!({
"ok": true,
"key": record_key,
"confidence": record.confidence.value,
"quality": record.quality.value,
"tier": tier_label,
})
.to_string()
}
async fn apply_mem_set_confirm(
graph_arc: &Arc<tokio::sync::RwLock<crate::graph::Graph>>,
key: &str,
) -> String {
if !key.starts_with("gotcha:") {
return serde_json::json!({"error": "confirm action only applies to gotcha: keys"})
.to_string();
}
const MAX_RETRIES: usize = 3;
let mut last_err: Option<String> = None;
for attempt in 0..MAX_RETRIES {
let result = {
let graph = graph_arc.read().await;
let store = graph.store();
try_confirm_once(store, key).await
};
match result {
Ok((rec, files)) => {
return finalize_confirm(graph_arc, key, &rec, &files).await;
}
Err(e) => {
let msg = format!("{e}");
if msg.contains("write conflict") && attempt + 1 < MAX_RETRIES {
tracing::debug!(
"confirm {key}: write conflict (attempt {}), retrying",
attempt + 1
);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
last_err = Some(msg);
continue;
}
return serde_json::json!({"error": msg}).to_string();
}
}
}
serde_json::json!({"error": format!("store put: {}", last_err.unwrap_or_default())}).to_string()
}
async fn try_confirm_once(store: &Store, key: &str) -> anyhow::Result<(Record, Vec<String>)> {
let mut record = store
.get(key)
.await?
.ok_or_else(|| anyhow::anyhow!("record not found: {key}"))?;
if record.category != Category::Gotcha {
anyhow::bail!("{key} is not a gotcha record");
}
if !matches!(record.lifecycle, RecordLifecycle::Active) {
anyhow::bail!("{key} is tombstoned — cannot confirm a deleted record");
}
if let Some(ref mut payload) = record.payload {
if let Some(obj) = payload.as_object_mut() {
if let Some(sev) = obj
.get("severity")
.and_then(|v| v.as_str())
.map(|s| s.to_lowercase())
{
obj.insert("severity".to_string(), serde_json::Value::String(sev));
}
obj.insert("confirmed".to_string(), serde_json::Value::Bool(true));
}
}
record.source = RecordSource::DeveloperManual;
record.confidence.value = ConfidenceScore::base_for_source(&RecordSource::DeveloperManual);
record.confidence.confirmation_count += 1;
record.quality = quality::analyze(&record);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
record.updated_at = now;
record.version.logical_clock += 1;
record.version.wall_clock = now;
let affected_files: Vec<String> = record
.payload_as::<GotchaRecord>()
.map(|g| g.affected_files)
.unwrap_or_default();
store.put(key, &record).await?;
if let Err(e) = crate::store::enforcement::record_event(
store,
crate::store::enforcement::EnforcementEventType::ControlChanged {
change_kind: crate::store::enforcement::ControlChangeKind::Confirmed,
},
crate::store::enforcement::SubjectKind::Control,
key.to_string(),
"developer".to_string(),
None,
"control_confirmed".to_string(),
None,
)
.await
{
tracing::warn!("confirm: enforcement event recording failed for {key}: {e}");
}
Ok((record, affected_files))
}
async fn finalize_confirm(
graph_arc: &Arc<tokio::sync::RwLock<crate::graph::Graph>>,
key: &str,
record: &Record,
affected_files: &[String],
) -> String {
let graph = graph_arc.read().await;
let store = graph.store();
for file_path in affected_files {
let file_key = format!("file:{file_path}");
if let Ok(Some(mut file_record)) = store.get(&file_key).await {
let needs_link = file_record
.payload
.as_ref()
.and_then(|p| p.get("gotcha_keys"))
.and_then(|v| v.as_array())
.map(|arr| !arr.iter().any(|v| v.as_str() == Some(key)))
.unwrap_or(true);
if needs_link {
if let Some(ref mut payload) = file_record.payload {
if let Some(obj) = payload.as_object_mut() {
let arr = obj.entry("gotcha_keys").or_insert(serde_json::json!([]));
if let Some(arr) = arr.as_array_mut() {
arr.push(serde_json::Value::String(key.to_string()));
}
}
}
let _ = store.put(&file_key, &file_record).await;
}
}
let consulted_key = format!("session:consulted:{file_key}");
let _ = store.delete(&consulted_key).await;
}
crate::store::gotcha_ops::propagate_confirmation_to_files(store, affected_files).await;
let _ = crate::store::session::log_hit(store, key).await;
let confidence_value = record.confidence.value;
let quality_value = record.quality.value;
drop(graph);
if !affected_files.is_empty() {
let mut g = graph_arc.write().await;
for file_path in affected_files {
let file_key = format!("file:{file_path}");
let _ = g.add_edge(&file_key, EdgeKind::HasGotcha, key).await;
}
}
serde_json::json!({
"ok": true,
"key": key,
"confirmed": true,
"confidence": confidence_value,
"quality": quality_value,
})
.to_string()
}
async fn apply_mem_set_delete(
graph_arc: &Arc<tokio::sync::RwLock<crate::graph::Graph>>,
key: &str,
) -> String {
let affected_files = {
let graph = graph_arc.read().await;
let store = graph.store();
if !key.starts_with("gotcha:") {
return serde_json::json!({"error": "delete action only applies to gotcha: keys"})
.to_string();
}
let record = match store.get(key).await {
Ok(Some(r)) => r,
Ok(None) => {
return serde_json::json!({"error": format!("record not found: {key}")}).to_string()
}
Err(e) => return serde_json::json!({"error": format!("store get: {e}")}).to_string(),
};
let affected: Vec<String> = record
.payload_as::<GotchaRecord>()
.map(|g| g.affected_files)
.unwrap_or_default();
if let Err(e) =
crate::store::gotcha_ops::apply_gotcha_tombstone(store, key, &affected).await
{
return serde_json::json!({"error": format!("tombstone failed: {e}")}).to_string();
}
affected
};
{
let mut graph = graph_arc.write().await;
for file_path in &affected_files {
let file_key = format!("file:{file_path}");
if let Err(e) = graph
.remove_edge(&file_key, &EdgeKind::HasGotcha, key)
.await
{
tracing::warn!(
"mem_set_delete: in-memory edge cleanup failed for {file_key} → {key}: {e}"
);
}
}
}
serde_json::json!({"ok": true, "key": key, "tombstoned": true}).to_string()
}
#[cfg(test)]
mod link_sync_tests {
use super::*;
use crate::store::record::{Category, FileRecord, RecordLifecycle};
use crate::store::Store;
#[tokio::test]
async fn compute_file_link_updates_creates_stub_for_unindexed_file() {
let dir = tempfile::TempDir::new().expect("tempdir");
let store = Store::open(dir.path()).await.expect("open store");
let updates =
compute_file_link_updates(&store, "gotcha:x", &[], &["src/new.rs".to_string()]).await;
assert_eq!(updates.len(), 1, "one file-record update staged");
let (key, rec) = &updates[0];
assert_eq!(key, "file:src/new.rs");
let fr: FileRecord = rec.payload_as().expect("stub payload is a FileRecord");
assert!(
fr.gotcha_keys.contains(&"gotcha:x".to_string()),
"staged stub must carry the gotcha key (got {:?})",
fr.gotcha_keys
);
assert!(matches!(rec.category, Category::File));
assert!(matches!(rec.lifecycle, RecordLifecycle::Active));
store.close().await.expect("close");
}
#[tokio::test]
async fn compute_file_link_updates_updates_existing_record() {
let dir = tempfile::TempDir::new().expect("tempdir");
let store = Store::open(dir.path()).await.expect("open store");
let mut seed = Record::layer0_file_stub("file:src/exists.rs", uuid::Uuid::new_v4(), 1, 1);
let fr0 = FileRecord::layer0_stub(
"src/exists.rs",
vec![],
vec![],
vec![],
0,
0,
0,
None,
false,
0,
1,
);
seed.payload = serde_json::to_value(&fr0).ok();
store.put("file:src/exists.rs", &seed).await.expect("seed");
let updates =
compute_file_link_updates(&store, "gotcha:y", &[], &["src/exists.rs".to_string()])
.await;
assert_eq!(updates.len(), 1);
let fr: FileRecord = updates[0].1.payload_as().expect("payload");
assert!(fr.gotcha_keys.contains(&"gotcha:y".to_string()));
store.close().await.expect("close");
}
}