use anyhow::Result;
use serde_json::{json, Map, Value as JsonValue};
use crate::db;
use crate::extensions::ThreadAttachmentProvider;
use crate::state::thread_transfer::{
content_digest_for, serialized_len, sha256_hex, ThreadAttachmentExportContext,
ThreadAttachmentExportResult, ThreadAttachmentImportPreview, ThreadDiagnostic,
ValidatedAttachmentMetadata,
};
const EXTENSION_TYPE: &str = "fixity";
const EXTENSION_NAME: &str = "ccd-fixity";
const ATTACHMENT_PURPOSE: &str = "handoff_scoped_fixity";
const CONTENT_KIND: &str = "ccd.fixity";
const SCHEMA_VERSION: u32 = 1;
const MAX_FIXITY_ITEMS: usize = 3;
const MAX_SUMMARY_CHARS: usize = 280;
const PENDING_SCAN_LIMIT: usize = 32;
const OMITTED_FIELDS: [&str; 4] = [
"local_evidence_id",
"source_ref",
"host_reference",
"provider_reference",
];
pub(crate) static FIXITY: Fixity = Fixity;
pub(crate) struct Fixity;
impl ThreadAttachmentProvider for Fixity {
fn name(&self) -> &'static str {
"fixity"
}
fn extension_type(&self) -> &'static str {
EXTENSION_TYPE
}
fn export_thread_attachments(
&self,
context: &ThreadAttachmentExportContext<'_>,
) -> Result<ThreadAttachmentExportResult> {
export_fixity(context)
}
fn preview_thread_attachment_import(
&self,
index: usize,
attachment: &ValidatedAttachmentMetadata,
) -> Option<ThreadAttachmentImportPreview> {
if attachment.extension_type != EXTENSION_TYPE {
return None;
}
Some(match validate_fixity_attachment(index, attachment) {
Ok(()) => ThreadAttachmentImportPreview {
decision: "accepted",
reason: "accepted",
blocks_resume: false,
message: format!(
"Attachment {} contains valid handoff-scoped Fixity signals.",
index
),
},
Err((reason, message)) => ThreadAttachmentImportPreview {
decision: "rejected",
reason,
blocks_resume: attachment.required_for_resume,
message,
},
})
}
}
fn export_fixity(
context: &ThreadAttachmentExportContext<'_>,
) -> Result<ThreadAttachmentExportResult> {
let pods_root = context.layout.pods_root();
let db = db::StateDb::open(&context.layout.state_db_path(), Some(&pods_root))?;
let records = db::memory_evidence::list_pending_recent(db.conn(), PENDING_SCAN_LIMIT)?;
let mut items = Vec::new();
let mut eligible_seen = 0_usize;
let mut budget_limited = false;
for record in records.iter().filter(|record| is_fixity_source(record)) {
eligible_seen += 1;
if items.len() >= MAX_FIXITY_ITEMS {
continue;
}
let item = record_to_fixity_item(record);
let mut candidate_items = items.clone();
candidate_items.push(item.clone());
let content = fixity_content(&candidate_items);
if serialized_len(&content)? > context.per_attachment_limit_bytes {
budget_limited = true;
continue;
}
items.push(item);
}
let mut diagnostics = Vec::new();
if items.is_empty() {
diagnostics.push(ThreadDiagnostic {
severity: if budget_limited { "warning" } else { "info" },
code: if budget_limited {
"fixity_budget_applied"
} else {
"no_fixity"
},
message: if budget_limited {
"Pending handoff-scoped Fixity signals were available, but no item fit the transfer attachment budget.".to_owned()
} else {
"No pending handoff-scoped Fixity signals were available for transfer.".to_owned()
},
});
return Ok(ThreadAttachmentExportResult {
attachments: Vec::new(),
diagnostics,
});
}
if budget_limited {
diagnostics.push(ThreadDiagnostic {
severity: "warning",
code: "fixity_budget_applied",
message: format!(
"Exported {} Fixity item(s); additional eligible pending evidence did not fit the transfer attachment budget.",
items.len()
),
});
} else if eligible_seen > items.len() {
diagnostics.push(ThreadDiagnostic {
severity: "warning",
code: "fixity_cap_applied",
message: format!(
"Exported {} Fixity item(s); additional eligible pending evidence was left for the normal memory-evidence flow.",
items.len()
),
});
} else {
diagnostics.push(ThreadDiagnostic {
severity: "info",
code: "fixity_included",
message: format!(
"Exported {} pending handoff-scoped Fixity item(s).",
items.len()
),
});
}
diagnostics.push(ThreadDiagnostic {
severity: "info",
code: "fixity_export_read_only",
message:
"Fixity export is read-only; pending evidence remains available to normal extraction."
.to_owned(),
});
let content = fixity_content(&items);
let content_size = serialized_len(&content)?;
let attachment = json!({
"id": "fixity",
"extension_type": EXTENSION_TYPE,
"extension_name": EXTENSION_NAME,
"purpose": ATTACHMENT_PURPOSE,
"schema_version": SCHEMA_VERSION,
"portability": "portable",
"required_for_resume": false,
"trust": "extension_local",
"redaction": "contains_sensitive_omissions",
"content_encoding": "json",
"content_digest": content_digest_for(&content)?,
"size_bytes": content_size,
"content": content,
});
Ok(ThreadAttachmentExportResult {
attachments: vec![attachment],
diagnostics,
})
}
fn is_fixity_source(record: &db::memory_evidence::MemoryEvidenceRecord) -> bool {
matches!(record.scope.as_str(), "workspace" | "work_stream")
&& fixity_type_for_entry_type(record.entry_type.as_str()).is_some()
}
fn record_to_fixity_item(record: &db::memory_evidence::MemoryEvidenceRecord) -> JsonValue {
let (summary, summary_truncated, summary_redacted) = portable_summary(record.summary.as_str());
let observed_at_epoch_s = record.observed_at_epoch_s;
let source_type = record.entry_type.as_str();
let fixity_type = fixity_type_for_entry_type(record.entry_type.as_str())
.expect("is_fixity_source filters eligible memory evidence entry types");
let candidate_id = fixity_candidate_id(
record.scope.as_str(),
source_type,
record.source_kind.as_str(),
summary.as_str(),
observed_at_epoch_s,
);
json!({
"candidate_id": candidate_id,
"scope": record.scope.clone(),
"type": fixity_type,
"source_type": source_type,
"source_kind": record.source_kind.clone(),
"summary": summary,
"summary_digest": sha256_hex(summary.as_str()),
"summary_truncated": summary_truncated,
"summary_redacted": summary_redacted,
"observed_at_epoch_s": observed_at_epoch_s,
})
}
fn portable_summary(summary: &str) -> (String, bool, bool) {
let (redacted, summary_redacted) = redact_sensitive_fragments(summary);
let (summary, summary_truncated) = bounded_summary(redacted.as_str());
(summary, summary_truncated, summary_redacted)
}
fn bounded_summary(summary: &str) -> (String, bool) {
let char_count = summary.chars().count();
if char_count <= MAX_SUMMARY_CHARS {
return (summary.to_owned(), false);
}
(
summary.chars().take(MAX_SUMMARY_CHARS).collect::<String>(),
true,
)
}
fn redact_sensitive_fragments(summary: &str) -> (String, bool) {
let mut redacted = String::with_capacity(summary.len());
let mut token = String::new();
let mut changed = false;
for ch in summary.chars() {
if ch.is_whitespace() {
changed |= push_redacted_token(&mut redacted, token.as_str());
token.clear();
redacted.push(ch);
} else {
token.push(ch);
}
}
changed |= push_redacted_token(&mut redacted, token.as_str());
(redacted, changed)
}
fn push_redacted_token(output: &mut String, token: &str) -> bool {
if token.is_empty() {
return false;
}
let start = token
.char_indices()
.find(|(_, ch)| !is_wrapper_punctuation(*ch))
.map(|(index, _)| index)
.unwrap_or(token.len());
let end = token
.char_indices()
.rev()
.find(|(_, ch)| !is_wrapper_punctuation(*ch))
.map(|(index, ch)| index + ch.len_utf8())
.unwrap_or(start);
let core = &token[start..end];
let replacement = redaction_marker(core);
output.push_str(&token[..start]);
if let Some(marker) = replacement {
output.push_str(marker);
} else {
output.push_str(core);
}
output.push_str(&token[end..]);
replacement.is_some()
}
fn is_wrapper_punctuation(ch: char) -> bool {
matches!(
ch,
'"' | '\'' | '`' | '(' | ')' | '[' | ']' | '{' | '}' | '<' | '>' | ',' | ';' | ':' | '.'
)
}
fn redaction_marker(token: &str) -> Option<&'static str> {
if looks_like_absolute_path(token) {
return Some("[redacted-path]");
}
if looks_like_email(token) {
return Some("[redacted-email]");
}
if looks_like_sensitive_token(token) {
return Some("[redacted-token]");
}
None
}
fn looks_like_absolute_path(token: &str) -> bool {
if token.starts_with("~/") {
return true;
}
if token.starts_with('/') && token.len() > 1 {
return true;
}
let mut chars = token.chars();
matches!(
(chars.next(), chars.next(), chars.next()),
(Some(drive), Some(':'), Some('\\' | '/')) if drive.is_ascii_alphabetic()
)
}
fn looks_like_email(token: &str) -> bool {
let Some((local, domain)) = token.split_once('@') else {
return false;
};
!local.is_empty()
&& domain.contains('.')
&& domain
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '-' | '.'))
}
fn looks_like_sensitive_token(token: &str) -> bool {
if token.starts_with("sk-") && token.len() >= 20 {
return true;
}
if token.len() >= 32 && token.chars().all(|ch| ch.is_ascii_hexdigit()) {
return true;
}
token.len() >= 40
&& token
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '+' | '='))
&& token.chars().any(|ch| ch.is_ascii_alphabetic())
&& token.chars().any(|ch| ch.is_ascii_digit())
}
fn fixity_content(items: &[JsonValue]) -> JsonValue {
json!({
"kind": CONTENT_KIND,
"schema_version": SCHEMA_VERSION,
"source": "memory_evidence",
"fixity_count": items.len(),
"max_fixity_items": MAX_FIXITY_ITEMS,
"omitted_fields": OMITTED_FIELDS,
"items": items,
})
}
fn validate_fixity_attachment(
index: usize,
attachment: &ValidatedAttachmentMetadata,
) -> Result<(), (&'static str, String)> {
if attachment.purpose != ATTACHMENT_PURPOSE {
return Err((
"extension_rejected_import",
format!(
"Attachment {} uses unsupported Fixity purpose `{}`.",
index, attachment.purpose
),
));
}
if attachment.schema_version != SCHEMA_VERSION {
return Err((
"incompatible_schema_version",
format!(
"Attachment {} uses Fixity schema_version {}; expected {}.",
index, attachment.schema_version, SCHEMA_VERSION
),
));
}
if attachment.portability != "portable" {
return Err((
"extension_rejected_import",
format!(
"Attachment {} uses unsupported Fixity portability `{}`.",
index, attachment.portability
),
));
}
if attachment.trust != "extension_local" {
return Err((
"extension_rejected_import",
format!(
"Attachment {} uses Fixity trust `{}`; expected `extension_local`.",
index, attachment.trust
),
));
}
if attachment.redaction != "contains_sensitive_omissions" {
return Err((
"extension_rejected_import",
format!(
"Attachment {} uses Fixity redaction `{}`; expected `contains_sensitive_omissions`.",
index, attachment.redaction
),
));
}
if attachment.required_for_resume {
return Err((
"extension_rejected_import",
format!(
"Attachment {} marks Fixity as required_for_resume, but Fixity is optional continuity.",
index
),
));
}
let content = json_object(&attachment.content, index, "content")?;
reject_unknown_keys(
content,
&[
"fixity_count",
"items",
"kind",
"max_fixity_items",
"omitted_fields",
"schema_version",
"source",
],
index,
"content",
)?;
if string_field(content, "kind", index, "content")? != CONTENT_KIND {
return Err((
"extension_rejected_import",
format!(
"Attachment {} content kind must be `{}`.",
index, CONTENT_KIND
),
));
}
if string_field(content, "source", index, "content")? != "memory_evidence" {
return Err((
"extension_rejected_import",
format!(
"Attachment {} content source must be `memory_evidence`.",
index
),
));
}
let content_schema_version = u64_field(content, "schema_version", index, "content")?;
if content_schema_version != u64::from(SCHEMA_VERSION) {
return Err((
"incompatible_schema_version",
format!(
"Attachment {} content schema_version {}; expected {}.",
index, content_schema_version, SCHEMA_VERSION
),
));
}
let max_fixity_items = u64_field(content, "max_fixity_items", index, "content")?;
if max_fixity_items != MAX_FIXITY_ITEMS as u64 {
return Err((
"extension_rejected_import",
format!(
"Attachment {} content max_fixity_items {}; expected {}.",
index, max_fixity_items, MAX_FIXITY_ITEMS
),
));
}
validate_omitted_fields(content, index)?;
let items = content
.get("items")
.and_then(JsonValue::as_array)
.ok_or_else(|| {
(
"extension_rejected_import",
format!("Attachment {} content must contain an items array.", index),
)
})?;
if items.len() > MAX_FIXITY_ITEMS {
return Err((
"excluded_size_limit",
format!(
"Attachment {} contains {} Fixity item(s), above the cap of {}.",
index,
items.len(),
MAX_FIXITY_ITEMS
),
));
}
let fixity_count = u64_field(content, "fixity_count", index, "content")?;
if fixity_count != items.len() as u64 {
return Err((
"extension_rejected_import",
format!(
"Attachment {} fixity_count does not match the items array.",
index
),
));
}
for (item_index, item) in items.iter().enumerate() {
validate_fixity_item(index, item_index, item)?;
}
Ok(())
}
fn validate_fixity_item(
attachment_index: usize,
item_index: usize,
item: &JsonValue,
) -> Result<(), (&'static str, String)> {
let context = format!("Fixity item {}", item_index);
let object = json_object(item, attachment_index, context.as_str())?;
reject_unknown_keys(
object,
&[
"candidate_id",
"observed_at_epoch_s",
"scope",
"source_kind",
"source_type",
"summary",
"summary_digest",
"summary_redacted",
"summary_truncated",
"type",
],
attachment_index,
context.as_str(),
)?;
let candidate_id = string_field(object, "candidate_id", attachment_index, context.as_str())?;
let scope = string_field(object, "scope", attachment_index, context.as_str())?;
if !matches!(scope, "workspace" | "work_stream") {
return Err((
"extension_rejected_import",
format!(
"Attachment {} Fixity item {} uses unsupported scope `{}`.",
attachment_index, item_index, scope
),
));
}
let fixity_type = string_field(object, "type", attachment_index, context.as_str())?;
if !matches!(
fixity_type,
"dead_end" | "worked_path" | "constraint" | "observation"
) {
return Err((
"extension_rejected_import",
format!(
"Attachment {} Fixity item {} uses unsupported type `{}`.",
attachment_index, item_index, fixity_type
),
));
}
let source_type = string_field(object, "source_type", attachment_index, context.as_str())?;
let Some(expected_fixity_type) = fixity_type_for_entry_type(source_type) else {
return Err((
"extension_rejected_import",
format!(
"Attachment {} Fixity item {} uses unsupported source_type `{}`.",
attachment_index, item_index, source_type
),
));
};
if expected_fixity_type != fixity_type {
return Err((
"extension_rejected_import",
format!(
"Attachment {} Fixity item {} type `{}` does not match source_type `{}`.",
attachment_index, item_index, fixity_type, source_type
),
));
}
let source_kind = string_field(object, "source_kind", attachment_index, context.as_str())?;
if !matches!(
source_kind,
"transcript" | "session" | "event_stream" | "hook_output" | "log" | "document"
) {
return Err((
"extension_rejected_import",
format!(
"Attachment {} Fixity item {} uses unsupported source_kind `{}`.",
attachment_index, item_index, source_kind
),
));
}
let summary = raw_string_field(object, "summary", attachment_index, context.as_str())?;
if summary.chars().count() > MAX_SUMMARY_CHARS {
return Err((
"extension_rejected_import",
format!(
"Attachment {} Fixity item {} summary exceeds {} characters.",
attachment_index, item_index, MAX_SUMMARY_CHARS
),
));
}
let summary_digest =
string_field(object, "summary_digest", attachment_index, context.as_str())?;
let actual_summary_digest = sha256_hex(summary);
if summary_digest != actual_summary_digest {
return Err((
"extension_rejected_import",
format!(
"Attachment {} Fixity item {} summary_digest does not match summary.",
attachment_index, item_index
),
));
}
let observed_at_epoch_s = u64_field(
object,
"observed_at_epoch_s",
attachment_index,
context.as_str(),
)?;
bool_field(
object,
"summary_truncated",
attachment_index,
context.as_str(),
)?;
bool_field(
object,
"summary_redacted",
attachment_index,
context.as_str(),
)?;
let expected_candidate_id = fixity_candidate_id(
scope,
source_type,
source_kind,
summary,
observed_at_epoch_s,
);
if candidate_id != expected_candidate_id {
return Err((
"extension_rejected_import",
format!(
"Attachment {} Fixity item {} candidate_id does not match portable content.",
attachment_index, item_index
),
));
}
Ok(())
}
fn validate_omitted_fields(
content: &Map<String, JsonValue>,
attachment_index: usize,
) -> Result<(), (&'static str, String)> {
let omitted_fields = content
.get("omitted_fields")
.and_then(JsonValue::as_array)
.ok_or_else(|| {
(
"extension_rejected_import",
format!(
"Attachment {} content must contain an omitted_fields array.",
attachment_index
),
)
})?;
let mut actual = Vec::new();
for field in omitted_fields {
let Some(field) = field.as_str() else {
return Err((
"extension_rejected_import",
format!(
"Attachment {} omitted_fields must contain only strings.",
attachment_index
),
));
};
actual.push(field);
}
actual.sort_unstable();
let mut expected = OMITTED_FIELDS.to_vec();
expected.sort_unstable();
if actual != expected {
return Err((
"extension_rejected_import",
format!(
"Attachment {} omitted_fields does not match the v1 redaction contract.",
attachment_index
),
));
}
Ok(())
}
fn json_object<'a>(
value: &'a JsonValue,
attachment_index: usize,
context: &str,
) -> Result<&'a Map<String, JsonValue>, (&'static str, String)> {
value.as_object().ok_or_else(|| {
(
"extension_rejected_import",
format!(
"Attachment {} {} must be a JSON object.",
attachment_index, context
),
)
})
}
fn reject_unknown_keys(
object: &Map<String, JsonValue>,
allowed: &[&str],
attachment_index: usize,
context: &str,
) -> Result<(), (&'static str, String)> {
let unknown = object
.keys()
.filter(|key| !allowed.contains(&key.as_str()))
.cloned()
.collect::<Vec<_>>();
if unknown.is_empty() {
return Ok(());
}
Err((
"extension_rejected_import",
format!(
"Attachment {} {} contains unsupported field(s): {}.",
attachment_index,
context,
unknown.join(", ")
),
))
}
fn string_field<'a>(
object: &'a Map<String, JsonValue>,
field: &str,
attachment_index: usize,
context: &str,
) -> Result<&'a str, (&'static str, String)> {
object
.get(field)
.and_then(JsonValue::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.ok_or_else(|| {
(
"extension_rejected_import",
format!(
"Attachment {} {} is missing `{}`.",
attachment_index, context, field
),
)
})
}
fn raw_string_field<'a>(
object: &'a Map<String, JsonValue>,
field: &str,
attachment_index: usize,
context: &str,
) -> Result<&'a str, (&'static str, String)> {
object
.get(field)
.and_then(JsonValue::as_str)
.filter(|value| !value.trim().is_empty())
.ok_or_else(|| {
(
"extension_rejected_import",
format!(
"Attachment {} {} is missing `{}`.",
attachment_index, context, field
),
)
})
}
fn u64_field(
object: &Map<String, JsonValue>,
field: &str,
attachment_index: usize,
context: &str,
) -> Result<u64, (&'static str, String)> {
object
.get(field)
.and_then(JsonValue::as_u64)
.ok_or_else(|| {
(
"extension_rejected_import",
format!(
"Attachment {} {} is missing `{}`.",
attachment_index, context, field
),
)
})
}
fn bool_field(
object: &Map<String, JsonValue>,
field: &str,
attachment_index: usize,
context: &str,
) -> Result<bool, (&'static str, String)> {
object
.get(field)
.and_then(JsonValue::as_bool)
.ok_or_else(|| {
(
"extension_rejected_import",
format!(
"Attachment {} {} is missing `{}`.",
attachment_index, context, field
),
)
})
}
fn fixity_type_for_entry_type(entry_type: &str) -> Option<&'static str> {
match entry_type {
"attempt" => Some("dead_end"),
"heuristic" => Some("worked_path"),
"constraint" => Some("constraint"),
"observation" => Some("observation"),
_ => None,
}
}
fn fixity_candidate_id(
scope: &str,
source_type: &str,
source_kind: &str,
summary: &str,
observed_at_epoch_s: u64,
) -> String {
let seed = format!(
"fixity.v1\0{scope}\0{source_type}\0{source_kind}\0{summary}\0{observed_at_epoch_s}"
);
format!("fixity_{}", &sha256_hex(seed.as_str())[..20])
}