use std::collections::BTreeSet;
use std::path::{Path, PathBuf};
use std::process::ExitCode;
use anyhow::{bail, Context, Result};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use sha2::{Digest, Sha256};
use crate::extensions;
use crate::output::CommandReport;
use crate::paths::state::StateLayout;
use crate::profile;
use crate::repo::marker as repo_marker;
use crate::state::compiled as compiled_state;
use crate::state::escalation as escalation_state;
use crate::state::runtime as runtime_state;
use crate::state::session as session_state;
use crate::state::session_gates;
const CAPSULE_KIND: &str = "ccd.thread_transfer";
const CAPSULE_SCHEMA_VERSION: u32 = 1;
const CAPSULE_STABILITY: &str = "experimental";
const KERNEL_CONTINUITY_SCHEMA_VERSION: u32 = 1;
const PER_ATTACHMENT_LIMIT_BYTES: u64 = 16 * 1024;
const TOTAL_ATTACHMENT_LIMIT_BYTES: u64 = 64 * 1024;
const MAX_ATTACHMENTS: usize = 8;
#[derive(Serialize)]
pub struct ThreadExportReport {
command: &'static str,
ok: bool,
path: String,
profile: String,
project_id: String,
locality_id: String,
export: ThreadExportView,
}
#[derive(Serialize)]
struct ThreadExportView {
kind: &'static str,
stability: &'static str,
load_mode: &'static str,
derived: bool,
portable: bool,
notice: &'static str,
capsule: ThreadTransferCapsule,
budget: AttachmentBudgetView,
diagnostics: Vec<ThreadDiagnostic>,
}
#[derive(Serialize)]
pub struct ThreadImportReport {
command: &'static str,
ok: bool,
path: String,
profile: String,
project_id: String,
locality_id: String,
import: ThreadImportView,
}
#[derive(Serialize)]
struct ThreadImportView {
source: String,
mode: &'static str,
stability: &'static str,
dry_run: bool,
write_applied: bool,
mutation: &'static str,
status: &'static str,
capsule: CapsuleSummaryView,
budget: AttachmentBudgetView,
attachments: Vec<AttachmentImportDecision>,
diagnostics: Vec<ThreadDiagnostic>,
}
#[derive(Serialize)]
struct ThreadTransferCapsule {
kind: &'static str,
schema_version: u32,
stability: &'static str,
thread: ThreadEnvelopeView,
kernel_continuity: KernelContinuityView,
attachments: Vec<JsonValue>,
}
#[derive(Serialize)]
struct ThreadEnvelopeView {
thread_id: String,
source_project_id: String,
source_locality_id: String,
source_profile: String,
source_workspace: String,
created_at_epoch_s: u64,
transfer_scope: &'static str,
}
#[derive(Serialize)]
struct KernelContinuityView {
schema_version: u32,
handoff: runtime_state::RuntimeHandoffState,
recovery: runtime_state::RuntimeRecoveryView,
execution_gates: session_gates::ExecutionGatesView,
escalation: escalation_state::EscalationView,
}
#[derive(Serialize)]
struct AttachmentBudgetView {
per_attachment_limit_bytes: u64,
total_limit_bytes: u64,
max_attachments: usize,
attachment_count: usize,
total_attachment_bytes: u64,
}
#[derive(Serialize)]
pub(crate) struct ThreadDiagnostic {
pub(crate) severity: &'static str,
pub(crate) code: &'static str,
pub(crate) message: String,
}
#[derive(Serialize)]
struct CapsuleSummaryView {
kind: String,
schema_version: u32,
#[serde(skip_serializing_if = "Option::is_none")]
thread_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
source_profile: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
source_project_id: Option<String>,
}
#[derive(Serialize)]
struct AttachmentImportDecision {
index: usize,
#[serde(skip_serializing_if = "Option::is_none")]
id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
extension_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
extension_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
purpose: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
schema_version: Option<u32>,
portability: String,
#[serde(skip_serializing_if = "Option::is_none")]
trust: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
redaction: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
content_encoding: Option<String>,
required_for_resume: bool,
size_bytes: u64,
#[serde(skip_serializing_if = "Option::is_none")]
declared_size_bytes: Option<u64>,
decision: &'static str,
reason: &'static str,
blocks_resume: bool,
message: String,
}
#[derive(Deserialize)]
struct ThreadTransferCapsuleInput {
kind: String,
schema_version: u32,
#[serde(default)]
thread: JsonValue,
#[serde(default)]
attachments: Vec<JsonValue>,
}
#[derive(Clone, Default, Deserialize)]
struct AttachmentMetadataInput {
id: Option<String>,
extension_type: Option<String>,
extension_name: Option<String>,
purpose: Option<String>,
schema_version: Option<u32>,
portability: Option<String>,
trust: Option<String>,
redaction: Option<String>,
content_encoding: Option<String>,
content_digest: Option<String>,
required_for_resume: Option<bool>,
size_bytes: Option<u64>,
content: Option<JsonValue>,
}
pub(crate) struct ThreadAttachmentExportContext<'a> {
pub(crate) layout: &'a StateLayout,
pub(crate) per_attachment_limit_bytes: u64,
}
pub(crate) struct ThreadAttachmentExportResult {
pub(crate) attachments: Vec<JsonValue>,
pub(crate) diagnostics: Vec<ThreadDiagnostic>,
}
pub(crate) struct ThreadAttachmentImportPreview {
pub(crate) decision: &'static str,
pub(crate) reason: &'static str,
pub(crate) blocks_resume: bool,
pub(crate) message: String,
}
pub(crate) struct ValidatedAttachmentMetadata {
pub(crate) id: String,
pub(crate) extension_type: String,
pub(crate) extension_name: Option<String>,
pub(crate) purpose: String,
pub(crate) schema_version: u32,
pub(crate) portability: String,
pub(crate) trust: String,
pub(crate) redaction: String,
pub(crate) content_encoding: String,
pub(crate) required_for_resume: bool,
pub(crate) declared_size_bytes: u64,
pub(crate) content_size_bytes: u64,
pub(crate) content: JsonValue,
}
impl CommandReport for ThreadExportReport {
fn exit_code(&self) -> ExitCode {
ExitCode::SUCCESS
}
fn render_text(&self) {
println!(
"Exported thread transfer capsule for profile `{}` in linked project `{}`.",
self.profile, self.project_id
);
println!(
"Capsule `{}` contains {} extension continuity attachments ({} bytes).",
self.export.capsule.thread.thread_id,
self.export.budget.attachment_count,
self.export.budget.total_attachment_bytes
);
println!("{}", self.export.notice);
}
}
impl CommandReport for ThreadImportReport {
fn exit_code(&self) -> ExitCode {
if self.ok {
ExitCode::SUCCESS
} else {
ExitCode::from(1)
}
}
fn render_text(&self) {
println!(
"Previewed thread transfer capsule for profile `{}` in linked project `{}`.",
self.profile, self.project_id
);
println!(
"Import status: {}; {} attachment decision(s); no state was mutated.",
self.import.status,
self.import.attachments.len()
);
for diagnostic in &self.import.diagnostics {
println!("- {}: {}", diagnostic.code, diagnostic.message);
}
}
}
pub fn export(repo_root: &Path, explicit_profile: Option<&str>) -> Result<ThreadExportReport> {
let profile = profile::resolve(explicit_profile)?;
let layout = StateLayout::resolve(repo_root, profile.clone())?;
ensure_profile_exists(&layout, repo_root, "export")?;
let locality_id = ensure_repo_linked(repo_root)?;
let runtime = runtime_state::load_runtime_state(repo_root, &layout, &locality_id)?;
let escalation_entries = escalation_state::load_for_layout(&layout)?;
let escalation = escalation_state::build_view(&layout, &escalation_entries);
let created_at_epoch_s = session_state::now_epoch_s()?;
let thread_id = stable_thread_id(profile.as_str(), &locality_id);
let (attachment_candidates, mut diagnostics) =
extensions::thread_transfer_attachment_candidates(&ThreadAttachmentExportContext {
layout: &layout,
per_attachment_limit_bytes: PER_ATTACHMENT_LIMIT_BYTES,
});
let (attachments, budget_diagnostics) =
enforce_export_attachment_budget(attachment_candidates)?;
diagnostics.extend(budget_diagnostics);
let budget = attachment_budget(&attachments)?;
Ok(ThreadExportReport {
command: "thread-export",
ok: true,
path: repo_root.display().to_string(),
profile: profile.to_string(),
project_id: locality_id.clone(),
locality_id: locality_id.clone(),
export: ThreadExportView {
kind: "thread_transfer_capsule",
stability: CAPSULE_STABILITY,
load_mode: "native_canonical",
derived: true,
portable: true,
notice: "Portable resume context only. CCD owns the transfer envelope; extensions own payload semantics.",
capsule: ThreadTransferCapsule {
kind: CAPSULE_KIND,
schema_version: CAPSULE_SCHEMA_VERSION,
stability: CAPSULE_STABILITY,
thread: ThreadEnvelopeView {
thread_id,
source_project_id: locality_id.clone(),
source_locality_id: locality_id,
source_profile: profile.to_string(),
source_workspace: repo_root.display().to_string(),
created_at_epoch_s,
transfer_scope: "portable_resume_context",
},
kernel_continuity: KernelContinuityView {
schema_version: KERNEL_CONTINUITY_SCHEMA_VERSION,
handoff: runtime.state.handoff,
recovery: runtime_state::recovery_view(&runtime.recovery),
execution_gates: runtime.execution_gates.view,
escalation,
},
attachments,
},
budget,
diagnostics,
},
})
}
pub fn import_preview(
repo_root: &Path,
explicit_profile: Option<&str>,
from: &Path,
preview_requested: bool,
write_requested: bool,
) -> Result<ThreadImportReport> {
if preview_requested && write_requested {
bail!("`ccd thread import` accepts either `--preview` or `--write`, not both");
}
if write_requested {
bail!("`ccd thread import --write` is not implemented yet; run `ccd thread import --preview` to inspect the capsule without mutating state");
}
if !preview_requested {
bail!("`ccd thread import` is preview-first; pass `--preview` to inspect the capsule without mutating state");
}
let profile = profile::resolve(explicit_profile)?;
let layout = StateLayout::resolve(repo_root, profile.clone())?;
ensure_profile_exists(&layout, repo_root, "import")?;
let locality_id = ensure_repo_linked(repo_root)?;
let source_path = display_source_path(from);
let capsule = read_capsule(from)?;
validate_capsule_header(&capsule)?;
let attachment_decisions = evaluate_attachments(&capsule.attachments)?;
let budget = attachment_budget(&capsule.attachments)?;
let diagnostics = import_diagnostics(&attachment_decisions, &budget);
let status = import_status(&attachment_decisions);
let ok = status != "blocked";
Ok(ThreadImportReport {
command: "thread-import-preview",
ok,
path: repo_root.display().to_string(),
profile: profile.to_string(),
project_id: locality_id.clone(),
locality_id,
import: ThreadImportView {
source: source_path,
mode: "preview",
stability: CAPSULE_STABILITY,
dry_run: true,
write_applied: false,
mutation: "none",
status,
capsule: capsule_summary(&capsule),
budget,
attachments: attachment_decisions,
diagnostics,
},
})
}
fn read_capsule(from: &Path) -> Result<ThreadTransferCapsuleInput> {
let contents = std::fs::read_to_string(from).with_context(|| {
format!(
"failed to read thread transfer capsule from {}",
from.display()
)
})?;
let value: JsonValue =
serde_json::from_str(&contents).context("failed to parse thread transfer JSON")?;
let capsule_value = if value.get("kind").is_some() {
value
} else if let Some(capsule) = value.pointer("/export/capsule") {
capsule.clone()
} else {
bail!(
"thread transfer input must be a `{}` capsule or a `thread export` report containing `export.capsule`",
CAPSULE_KIND
);
};
serde_json::from_value(capsule_value).context("failed to parse thread transfer capsule")
}
fn validate_capsule_header(capsule: &ThreadTransferCapsuleInput) -> Result<()> {
if capsule.kind != CAPSULE_KIND {
bail!(
"unsupported thread transfer kind `{}`; expected `{}`",
capsule.kind,
CAPSULE_KIND
);
}
if capsule.schema_version != CAPSULE_SCHEMA_VERSION {
bail!(
"unsupported thread transfer schema_version {}; expected {}",
capsule.schema_version,
CAPSULE_SCHEMA_VERSION
);
}
Ok(())
}
fn evaluate_attachments(attachments: &[JsonValue]) -> Result<Vec<AttachmentImportDecision>> {
let mut decisions = Vec::with_capacity(attachments.len());
let mut seen_tuples = BTreeSet::new();
let mut cumulative_content_bytes = 0_u64;
for (index, attachment) in attachments.iter().enumerate() {
let metadata: AttachmentMetadataInput = match serde_json::from_value(attachment.clone()) {
Ok(metadata) => metadata,
Err(error) => {
decisions.push(invalid_metadata_decision(
index,
None,
0,
None,
format!("Attachment {index} has invalid metadata types: {error}."),
));
continue;
}
};
let validated = match validate_attachment_metadata(index, &metadata) {
Ok(validated) => validated,
Err(decision) => {
decisions.push(decision);
continue;
}
};
cumulative_content_bytes =
cumulative_content_bytes.saturating_add(validated.content_size_bytes);
let tuple = (
validated.extension_type.clone(),
validated.extension_name.clone().unwrap_or_default(),
validated.purpose.clone(),
);
let (decision, reason, blocks_resume, message) = if index >= MAX_ATTACHMENTS {
(
"rejected",
"excluded_size_limit",
validated.required_for_resume,
format!(
"Attachment {} exceeds the maximum of {} extension continuity attachments.",
index, MAX_ATTACHMENTS
),
)
} else if !seen_tuples.insert(tuple) {
(
"rejected",
"duplicate_attachment",
validated.required_for_resume,
format!(
"Attachment {} duplicates an earlier `(extension_type, extension_name, purpose)` tuple.",
index
),
)
} else if validated.content_size_bytes > PER_ATTACHMENT_LIMIT_BYTES {
(
"rejected",
"excluded_size_limit",
validated.required_for_resume,
format!(
"Attachment {} content is {} bytes, above the {} byte per-attachment limit.",
index, validated.content_size_bytes, PER_ATTACHMENT_LIMIT_BYTES
),
)
} else if cumulative_content_bytes > TOTAL_ATTACHMENT_LIMIT_BYTES {
(
"rejected",
"excluded_size_limit",
validated.required_for_resume,
format!(
"Attachment {} would raise total attachment content to {} bytes, above the {} byte capsule limit.",
index, cumulative_content_bytes, TOTAL_ATTACHMENT_LIMIT_BYTES
),
)
} else if validated.portability == "local_only" {
(
"skipped",
"excluded_local_only",
validated.required_for_resume,
format!(
"Attachment {} is local_only and is excluded from portable resume import.",
index
),
)
} else if let Some(import_preview) =
extensions::preview_thread_attachment_import(index, &validated)
{
(
import_preview.decision,
import_preview.reason,
import_preview.blocks_resume,
import_preview.message,
)
} else {
(
"rejected",
"unknown_extension",
validated.required_for_resume,
format!(
"Attachment {} targets extension `{}` but no extension importer is registered in this build.",
index, validated.extension_type
),
)
};
decisions.push(AttachmentImportDecision {
index,
id: Some(validated.id),
extension_type: Some(validated.extension_type),
extension_name: validated.extension_name,
purpose: Some(validated.purpose),
schema_version: Some(validated.schema_version),
portability: validated.portability,
trust: Some(validated.trust),
redaction: Some(validated.redaction),
content_encoding: Some(validated.content_encoding),
required_for_resume: validated.required_for_resume,
size_bytes: validated.content_size_bytes,
declared_size_bytes: Some(validated.declared_size_bytes),
decision,
reason,
blocks_resume,
message,
});
}
Ok(decisions)
}
fn validate_attachment_metadata(
index: usize,
metadata: &AttachmentMetadataInput,
) -> Result<ValidatedAttachmentMetadata, AttachmentImportDecision> {
let required_for_resume = metadata.required_for_resume.unwrap_or(true);
let content_size_bytes = metadata
.content
.as_ref()
.and_then(|content| serialized_len(content).ok())
.unwrap_or(0);
let mut missing_or_invalid = Vec::new();
let id = required_non_empty(metadata.id.as_deref(), "id", &mut missing_or_invalid);
let extension_type = required_non_empty(
metadata.extension_type.as_deref(),
"extension_type",
&mut missing_or_invalid,
);
let purpose = required_non_empty(
metadata.purpose.as_deref(),
"purpose",
&mut missing_or_invalid,
);
let schema_version = match metadata.schema_version {
Some(version) if version > 0 => Some(version),
Some(_) => {
missing_or_invalid.push("schema_version");
None
}
None => {
missing_or_invalid.push("schema_version");
None
}
};
if metadata.required_for_resume.is_none() {
missing_or_invalid.push("required_for_resume");
}
let portability = required_allowed_value(
metadata.portability.as_deref(),
"portability",
&["portable", "reference", "local_only"],
&mut missing_or_invalid,
);
let trust = required_allowed_value(
metadata.trust.as_deref(),
"trust",
&[
"kernel_local",
"extension_local",
"external_reference",
"untrusted_external",
],
&mut missing_or_invalid,
);
let redaction = required_allowed_value(
metadata.redaction.as_deref(),
"redaction",
&["none", "redacted", "contains_sensitive_omissions"],
&mut missing_or_invalid,
);
let content_encoding = required_non_empty(
metadata.content_encoding.as_deref(),
"content_encoding",
&mut missing_or_invalid,
);
let declared_size_bytes = match metadata.size_bytes {
Some(size_bytes) => Some(size_bytes),
None => {
missing_or_invalid.push("size_bytes");
None
}
};
let content_digest = required_non_empty(
metadata.content_digest.as_deref(),
"content_digest",
&mut missing_or_invalid,
);
if metadata.content.is_none() {
missing_or_invalid.push("content");
}
if !missing_or_invalid.is_empty() {
return Err(invalid_metadata_decision(
index,
Some(metadata),
content_size_bytes,
declared_size_bytes,
format!(
"Attachment {} is missing or has invalid required metadata: {}.",
index,
missing_or_invalid.join(", ")
),
));
}
let content_encoding = content_encoding.expect("validated content_encoding");
if content_encoding != "json" {
return Err(attachment_decision(
index,
metadata,
content_size_bytes,
declared_size_bytes,
"rejected",
"unsupported_content_encoding",
required_for_resume,
format!(
"Attachment {} uses unsupported content_encoding `{}`; v1 accepts only `json`.",
index, content_encoding
),
));
}
let declared_size_bytes = declared_size_bytes.expect("validated size_bytes");
if declared_size_bytes != content_size_bytes {
return Err(invalid_metadata_decision(
index,
Some(metadata),
content_size_bytes,
Some(declared_size_bytes),
format!(
"Attachment {} declares size_bytes {} but canonical JSON content is {} bytes.",
index, declared_size_bytes, content_size_bytes
),
));
}
let expected_digest = content_digest.expect("validated content_digest");
let actual_digest = content_digest_for(metadata.content.as_ref().expect("validated content"))
.map_err(|error| {
invalid_metadata_decision(
index,
Some(metadata),
content_size_bytes,
Some(declared_size_bytes),
format!("Attachment {index} content could not be digested: {error}."),
)
})?;
if expected_digest != actual_digest {
return Err(attachment_decision(
index,
metadata,
content_size_bytes,
Some(declared_size_bytes),
"rejected",
"invalid_content_digest",
required_for_resume,
format!(
"Attachment {} content_digest does not match canonical JSON content.",
index
),
));
}
Ok(ValidatedAttachmentMetadata {
id: id.expect("validated id"),
extension_type: extension_type.expect("validated extension_type"),
extension_name: metadata
.extension_name
.as_deref()
.filter(|value| !value.trim().is_empty())
.map(str::to_owned),
purpose: purpose.expect("validated purpose"),
schema_version: schema_version.expect("validated schema_version"),
portability: portability.expect("validated portability"),
trust: trust.expect("validated trust"),
redaction: redaction.expect("validated redaction"),
content_encoding,
required_for_resume,
declared_size_bytes,
content_size_bytes,
content: metadata
.content
.as_ref()
.expect("validated content")
.clone(),
})
}
fn enforce_export_attachment_budget(
attachments: Vec<JsonValue>,
) -> Result<(Vec<JsonValue>, Vec<ThreadDiagnostic>)> {
let mut included = Vec::new();
let mut diagnostics = Vec::new();
let mut seen_tuples = BTreeSet::new();
let mut cumulative_content_bytes = 0_u64;
for (index, attachment) in attachments.into_iter().enumerate() {
let metadata: AttachmentMetadataInput = match serde_json::from_value(attachment.clone()) {
Ok(metadata) => metadata,
Err(error) => {
diagnostics.push(ThreadDiagnostic {
severity: "warning",
code: "extension_rejected_export",
message: format!(
"Attachment candidate {} has invalid metadata types and was omitted: {}.",
index, error
),
});
continue;
}
};
let validated = match validate_attachment_metadata(index, &metadata) {
Ok(validated) => validated,
Err(decision) => {
diagnostics.push(ThreadDiagnostic {
severity: "warning",
code: decision.reason,
message: format!(
"Attachment candidate {} was omitted during export: {}",
index, decision.message
),
});
continue;
}
};
let tuple = (
validated.extension_type.clone(),
validated.extension_name.clone().unwrap_or_default(),
validated.purpose.clone(),
);
if validated.portability == "local_only" {
diagnostics.push(ThreadDiagnostic {
severity: "info",
code: "excluded_local_only",
message: format!(
"Attachment candidate {} is local_only and was omitted from the portable transfer capsule.",
index
),
});
continue;
}
if included.len() >= MAX_ATTACHMENTS {
diagnostics.push(ThreadDiagnostic {
severity: "warning",
code: "excluded_size_limit",
message: format!(
"Attachment candidate {} exceeds the maximum of {} extension continuity attachments and was omitted.",
index, MAX_ATTACHMENTS
),
});
continue;
}
if !seen_tuples.insert(tuple) {
diagnostics.push(ThreadDiagnostic {
severity: "warning",
code: "duplicate_attachment",
message: format!(
"Attachment candidate {} duplicates an earlier `(extension_type, extension_name, purpose)` tuple and was omitted.",
index
),
});
continue;
}
if validated.content_size_bytes > PER_ATTACHMENT_LIMIT_BYTES {
diagnostics.push(ThreadDiagnostic {
severity: "warning",
code: "excluded_size_limit",
message: format!(
"Attachment candidate {} content is {} bytes, above the {} byte per-attachment limit, and was omitted.",
index, validated.content_size_bytes, PER_ATTACHMENT_LIMIT_BYTES
),
});
continue;
}
let next_cumulative = cumulative_content_bytes.saturating_add(validated.content_size_bytes);
if next_cumulative > TOTAL_ATTACHMENT_LIMIT_BYTES {
diagnostics.push(ThreadDiagnostic {
severity: "warning",
code: "excluded_size_limit",
message: format!(
"Attachment candidate {} would raise total attachment content to {} bytes, above the {} byte capsule limit, and was omitted.",
index, next_cumulative, TOTAL_ATTACHMENT_LIMIT_BYTES
),
});
continue;
}
cumulative_content_bytes = next_cumulative;
included.push(attachment);
}
Ok((included, diagnostics))
}
fn required_non_empty(
value: Option<&str>,
field: &'static str,
missing_or_invalid: &mut Vec<&'static str>,
) -> Option<String> {
match value.map(str::trim).filter(|value| !value.is_empty()) {
Some(value) => Some(value.to_owned()),
None => {
missing_or_invalid.push(field);
None
}
}
}
fn required_allowed_value(
value: Option<&str>,
field: &'static str,
allowed: &[&str],
missing_or_invalid: &mut Vec<&'static str>,
) -> Option<String> {
let value = required_non_empty(value, field, missing_or_invalid)?;
if allowed.contains(&value.as_str()) {
Some(value)
} else {
missing_or_invalid.push(field);
None
}
}
fn invalid_metadata_decision(
index: usize,
metadata: Option<&AttachmentMetadataInput>,
size_bytes: u64,
declared_size_bytes: Option<u64>,
message: String,
) -> AttachmentImportDecision {
let blocks_resume = metadata
.and_then(|metadata| metadata.required_for_resume)
.unwrap_or(true);
match metadata {
Some(metadata) => attachment_decision(
index,
metadata,
size_bytes,
declared_size_bytes.or(metadata.size_bytes),
"rejected",
"invalid_attachment_metadata",
blocks_resume,
message,
),
None => AttachmentImportDecision {
index,
id: None,
extension_type: None,
extension_name: None,
purpose: None,
schema_version: None,
portability: "invalid".to_owned(),
trust: None,
redaction: None,
content_encoding: None,
required_for_resume: true,
size_bytes,
declared_size_bytes,
decision: "rejected",
reason: "invalid_attachment_metadata",
blocks_resume,
message,
},
}
}
fn attachment_decision(
index: usize,
metadata: &AttachmentMetadataInput,
size_bytes: u64,
declared_size_bytes: Option<u64>,
decision: &'static str,
reason: &'static str,
blocks_resume: bool,
message: String,
) -> AttachmentImportDecision {
AttachmentImportDecision {
index,
id: metadata.id.clone(),
extension_type: metadata.extension_type.clone(),
extension_name: metadata.extension_name.clone(),
purpose: metadata.purpose.clone(),
schema_version: metadata.schema_version,
portability: metadata
.portability
.clone()
.unwrap_or_else(|| "unspecified".to_owned()),
trust: metadata.trust.clone(),
redaction: metadata.redaction.clone(),
content_encoding: metadata.content_encoding.clone(),
required_for_resume: metadata.required_for_resume.unwrap_or(blocks_resume),
size_bytes,
declared_size_bytes,
decision,
reason,
blocks_resume,
message,
}
}
fn import_status(decisions: &[AttachmentImportDecision]) -> &'static str {
if decisions.iter().any(|decision| decision.blocks_resume) {
return "blocked";
}
if decisions
.iter()
.any(|decision| decision.decision != "accepted")
{
return "degraded";
}
"complete"
}
fn import_diagnostics(
decisions: &[AttachmentImportDecision],
budget: &AttachmentBudgetView,
) -> Vec<ThreadDiagnostic> {
let mut diagnostics = Vec::new();
if budget.attachment_count > MAX_ATTACHMENTS {
diagnostics.push(ThreadDiagnostic {
severity: "error",
code: "excluded_size_limit",
message: format!(
"Capsule contains {} attachments, above the maximum of {}.",
budget.attachment_count, MAX_ATTACHMENTS
),
});
}
if budget.total_attachment_bytes > TOTAL_ATTACHMENT_LIMIT_BYTES {
diagnostics.push(ThreadDiagnostic {
severity: "error",
code: "excluded_size_limit",
message: format!(
"Capsule attachments use {} bytes, above the {} byte total limit.",
budget.total_attachment_bytes, TOTAL_ATTACHMENT_LIMIT_BYTES
),
});
}
if decisions.is_empty() {
diagnostics.push(ThreadDiagnostic {
severity: "info",
code: "no_extension_attachments",
message: "Capsule does not contain extension continuity attachments.".to_owned(),
});
}
for decision in decisions {
if decision.decision != "accepted" {
diagnostics.push(ThreadDiagnostic {
severity: if decision.required_for_resume {
"error"
} else {
"warning"
},
code: decision.reason,
message: decision.message.clone(),
});
}
}
diagnostics
}
fn capsule_summary(capsule: &ThreadTransferCapsuleInput) -> CapsuleSummaryView {
CapsuleSummaryView {
kind: capsule.kind.clone(),
schema_version: capsule.schema_version,
thread_id: json_string_at(&capsule.thread, "/thread_id")
.or_else(|| json_string_at(&capsule.thread, "/id")),
source_profile: json_string_at(&capsule.thread, "/source_profile"),
source_project_id: json_string_at(&capsule.thread, "/source_project_id")
.or_else(|| json_string_at(&capsule.thread, "/source_locality_id")),
}
}
fn attachment_budget(attachments: &[JsonValue]) -> Result<AttachmentBudgetView> {
let mut total_attachment_bytes = 0_u64;
for attachment in attachments {
if let Some(content) = attachment.get("content") {
total_attachment_bytes =
total_attachment_bytes.saturating_add(serialized_len(content)?);
}
}
Ok(AttachmentBudgetView {
per_attachment_limit_bytes: PER_ATTACHMENT_LIMIT_BYTES,
total_limit_bytes: TOTAL_ATTACHMENT_LIMIT_BYTES,
max_attachments: MAX_ATTACHMENTS,
attachment_count: attachments.len(),
total_attachment_bytes,
})
}
pub(crate) fn serialized_len(value: &JsonValue) -> Result<u64> {
Ok(serde_json::to_vec(value)?.len() as u64)
}
pub(crate) fn content_digest_for(value: &JsonValue) -> Result<String> {
let bytes = serde_json::to_vec(value)?;
let mut hasher = Sha256::new();
hasher.update(bytes);
Ok(format!("sha256:{:x}", hasher.finalize()))
}
pub(crate) fn sha256_hex(value: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(value.as_bytes());
format!("{:x}", hasher.finalize())
}
fn json_string_at(value: &JsonValue, pointer: &str) -> Option<String> {
value
.pointer(pointer)
.and_then(JsonValue::as_str)
.map(str::to_owned)
}
fn stable_thread_id(profile: &str, locality_id: &str) -> String {
let seed = format!("{}\0{}", profile, locality_id);
let digest = compiled_state::sha256_string(&seed);
format!("thread_{}", &digest[..26])
}
fn display_source_path(path: &Path) -> String {
absolutize_path(path)
.unwrap_or_else(|| path.to_path_buf())
.display()
.to_string()
}
fn absolutize_path(path: &Path) -> Option<PathBuf> {
if path.is_absolute() {
return Some(path.to_path_buf());
}
Some(std::env::current_dir().ok()?.join(path))
}
fn ensure_profile_exists(layout: &StateLayout, repo_root: &Path, command: &str) -> Result<()> {
let profile_root = layout.profile_root();
if profile_root.is_dir() {
return Ok(());
}
bail!(
"profile `{}` does not exist at {}; bootstrap it with `ccd attach --path {}` before using `ccd thread {}`",
layout.profile(),
profile_root.display(),
repo_root.display(),
command
)
}
fn ensure_repo_linked(repo_root: &Path) -> Result<String> {
let Some(marker) = repo_marker::load(repo_root)? else {
bail!(
"repo is not linked: {} is missing; run `ccd attach --path {}` or `ccd link --path {}` first",
repo_root.join(repo_marker::MARKER_FILE).display(),
repo_root.display(),
repo_root.display()
)
};
Ok(marker.locality_id)
}