use std::collections::HashSet;
use std::env;
use std::fs;
use std::io::Write;
use std::path::{Component, Path, PathBuf};
use chrono::Utc;
use clap::Args;
use cortex_core::{
attest, compose_policy_outcomes, AttestationPreimage, Attestor, AuditRecord, Event, EventId,
EventSource, InMemoryAttestor, LineageBinding, Outcome, PolicyContribution, PolicyDecision,
PolicyOutcome, SourceIdentity, TrustTier, SCHEMA_VERSION_ATTESTATION,
};
use cortex_ledger::{
JsonlLog, APPEND_ATTESTATION_REQUIRED_RULE_ID, APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
APPEND_RUNTIME_MODE_RULE_ID,
};
use crate::exit::Exit;
use crate::output::{self, Envelope};
use crate::paths::DataLayout;
pub const INGEST_USER_ATTESTATION_RULE_ID: &str = "ingest.user_attestation";
pub const INGEST_EVENT_SOURCE_TIER_GATE_RULE_ID: &str = "ingest.event_source_tier_gate";
pub const INGEST_SYMLINK_ROOT_VALIDATION_RULE_ID: &str = "ingest.symlink_root_validation";
pub const INGEST_MINIMUM_TRUST_TIER: TrustTier = TrustTier::Observed;
const AUDIT_RECORDS_FILENAME: &str = "audit_records.jsonl";
pub const OPERATION_INGEST_USER_ATTESTED: &str = "ingest_user_attested";
#[derive(Debug, Args)]
pub struct IngestArgs {
#[arg(value_name = "SESSION")]
pub session: PathBuf,
#[arg(long = "event-log", value_name = "PATH")]
pub event_log: Option<PathBuf>,
#[arg(long, value_name = "PATH")]
pub db: Option<PathBuf>,
#[arg(long = "user-attestation", value_name = "KEY_PATH")]
pub user_attestation: Option<PathBuf>,
}
#[derive(Debug)]
pub(crate) struct IngestError {
pub(crate) exit: Exit,
pub(crate) detail: String,
}
impl IngestError {
fn new(exit: Exit, detail: impl Into<String>) -> Self {
Self {
exit,
detail: detail.into(),
}
}
}
#[derive(Debug, Clone)]
pub struct IngestOutcome {
pub appended: Vec<EventId>,
pub skipped: Vec<EventId>,
pub head: Option<String>,
pub attested_user_events: Vec<EventId>,
}
pub fn run(args: IngestArgs) -> Exit {
match run_inner(args) {
Ok(outcome) => {
if output::json_enabled() {
let payload = serde_json::json!({
"appended": outcome.appended.iter().map(EventId::to_string).collect::<Vec<_>>(),
"skipped": outcome.skipped.iter().map(EventId::to_string).collect::<Vec<_>>(),
"attested_user_events": outcome
.attested_user_events
.iter()
.map(EventId::to_string)
.collect::<Vec<_>>(),
"appended_count": outcome.appended.len(),
"skipped_count": outcome.skipped.len(),
"attested_user_count": outcome.attested_user_events.len(),
"chain_head": outcome.head,
});
let envelope = Envelope::new("cortex.ingest", Exit::Ok, payload);
output::emit(&envelope, Exit::Ok)
} else {
print_summary(&outcome);
Exit::Ok
}
}
Err(e) => {
if output::json_enabled() {
let payload = serde_json::json!({
"appended": Vec::<String>::new(),
"skipped": Vec::<String>::new(),
"appended_count": 0,
"skipped_count": 0,
"status": "error",
"detail": e.detail,
});
let envelope = Envelope::new("cortex.ingest", e.exit, payload);
output::emit(&envelope, e.exit)
} else {
e.exit
}
}
}
}
pub fn run_inner(args: IngestArgs) -> Result<IngestOutcome, IngestError> {
let layout = DataLayout::resolve(args.db, args.event_log)
.map_err(|e| IngestError::new(e, "failed to resolve data layout"))?;
validate_session_input_path(&args.session)
.map_err(|e| IngestError::new(e, format!("invalid session path `{}`", args.session.display())))?;
let raw = fs::read(&args.session).map_err(|io_err| {
let detail = format!(
"cannot read session file `{}`: {io_err}",
args.session.display()
);
eprintln!("ingest: {detail}");
IngestError::new(Exit::PreconditionUnmet, detail)
})?;
let events = parse_events(&raw)?;
let user_event_ids: Vec<EventId> = events
.iter()
.filter(|e| matches!(e.source, EventSource::User))
.map(|e| e.id)
.collect();
let decision =
ingest_policy_decision(&events, &user_event_ids, args.user_attestation.is_some());
if matches!(
decision.final_outcome,
PolicyOutcome::Reject | PolicyOutcome::Quarantine
) {
emit_ingest_policy_refusal(&decision, &user_event_ids);
return Err(IngestError::new(
Exit::PreconditionUnmet,
"ingest refused by ADR 0026 policy; see stderr for contributor details",
));
}
let attestor = match args.user_attestation.as_deref() {
Some(key_path) => Some(
load_attestor_from_key_file(key_path)
.map_err(|e| IngestError::new(e, "failed to load --user-attestation key"))?,
),
None => None,
};
let mut log = JsonlLog::open(&layout.event_log_path)
.map_err(|e| IngestError::new(map_jsonl_err(e), "failed to open event log"))?;
let existing_ids = collect_existing_ids(&log)
.map_err(|e| IngestError::new(e, "failed to read existing event ids from log"))?;
let chain_base: u64 = existing_ids.len() as u64;
let ledger_id = derive_ledger_id(&layout.event_log_path);
let audit_records_path = layout.data_dir.join(AUDIT_RECORDS_FILENAME);
let mut appended = Vec::new();
let mut skipped = Vec::new();
let mut attested_user_events = Vec::new();
let mut audit_rows: Vec<AuditRecord> = Vec::new();
for event in events {
if existing_ids.contains(&event.id) {
skipped.push(event.id);
continue;
}
let is_user = matches!(event.source, EventSource::User);
if is_user {
let attestor = attestor
.as_ref()
.expect("user events refused above when no attestor present");
let chain_position = chain_base + appended.len() as u64;
let preimage = build_user_preimage(&event, &ledger_id, chain_position, attestor);
let attestation = attest(&preimage, attestor);
let actor_json = serde_json::json!({
"kind": "user",
"event_id": event.id.to_string(),
"key_id": attestation.key_id,
"signature_hex": hex_lower(&attestation.signature),
"signed_at": attestation.signed_at,
});
let row = AuditRecord::new(
actor_json,
OPERATION_INGEST_USER_ATTESTED.to_string(),
event.id.to_string(),
attestation.signed_at,
Outcome::Success,
);
audit_rows.push(row);
attested_user_events.push(event.id);
}
let id = event.id;
let policy = ingest_append_policy(&event.source, is_user);
log.append(event, &policy)
.map_err(|e| IngestError::new(map_jsonl_err(e), format!("failed to append event {id}")))?;
appended.push(id);
}
if !audit_rows.is_empty() {
append_audit_rows(&audit_records_path, &audit_rows)
.map_err(|e| IngestError::new(e, "failed to write audit records"))?;
}
Ok(IngestOutcome {
appended,
skipped,
head: log.head().map(str::to_owned),
attested_user_events,
})
}
#[must_use]
pub fn ingest_policy_decision(
events: &[Event],
user_event_ids: &[EventId],
attestor_present: bool,
) -> PolicyDecision {
let symlink_contribution = PolicyContribution::new(
INGEST_SYMLINK_ROOT_VALIDATION_RULE_ID,
PolicyOutcome::Allow,
"session input path resolved under a permitted root with no symlink or parent-traversal escape",
)
.expect("static policy contribution is valid");
let user_attestation_contribution = if user_event_ids.is_empty() {
PolicyContribution::new(
INGEST_USER_ATTESTATION_RULE_ID,
PolicyOutcome::Allow,
"input contains no EventSource::User events; operator attestation is not required",
)
.expect("static policy contribution is valid")
} else if attestor_present {
PolicyContribution::new(
INGEST_USER_ATTESTATION_RULE_ID,
PolicyOutcome::Allow,
format!(
"operator attestation envelope supplied for {} EventSource::User event(s)",
user_event_ids.len()
),
)
.expect("static policy contribution is valid")
} else {
let ids: Vec<String> = user_event_ids.iter().map(EventId::to_string).collect();
PolicyContribution::new(
INGEST_USER_ATTESTATION_RULE_ID,
PolicyOutcome::Reject,
format!(
"policy.ingest.user_attestation.missing: {} EventSource::User event(s) supplied without --user-attestation; offending ids=[{}]",
ids.len(),
ids.join(", ")
),
)
.expect("static policy contribution is valid")
};
let tier_contribution = match first_event_below_minimum_tier(events, attestor_present) {
Some((event_id, tier)) => PolicyContribution::new(
INGEST_EVENT_SOURCE_TIER_GATE_RULE_ID,
PolicyOutcome::Reject,
format!(
"policy.ingest.event_source_tier_gate.below_minimum: event {event_id} has tier {tier:?} which is below required {INGEST_MINIMUM_TRUST_TIER:?} (ADR 0019 §3)"
),
)
.expect("static policy contribution is valid"),
None => PolicyContribution::new(
INGEST_EVENT_SOURCE_TIER_GATE_RULE_ID,
PolicyOutcome::Allow,
format!(
"every event meets the ingest trust tier floor of {INGEST_MINIMUM_TRUST_TIER:?} (ADR 0019 §3)"
),
)
.expect("static policy contribution is valid"),
};
compose_policy_outcomes(
vec![
symlink_contribution,
user_attestation_contribution,
tier_contribution,
],
None,
)
}
#[must_use]
pub fn event_source_trust_tier(source: &EventSource, attestor_present: bool) -> TrustTier {
match source {
EventSource::User => {
if attestor_present {
TrustTier::Operator
} else {
TrustTier::Untrusted
}
}
EventSource::ManualCorrection => {
if attestor_present {
TrustTier::Operator
} else {
TrustTier::Untrusted
}
}
EventSource::ChildAgent { model } => {
if model.trim().is_empty() {
TrustTier::Untrusted
} else {
TrustTier::Observed
}
}
EventSource::Tool { name } => {
if name.trim().is_empty() {
TrustTier::Untrusted
} else {
TrustTier::Observed
}
}
EventSource::Runtime | EventSource::ExternalOutcome => TrustTier::Observed,
}
}
fn first_event_below_minimum_tier(
events: &[Event],
attestor_present: bool,
) -> Option<(EventId, TrustTier)> {
for event in events {
let tier = event_source_trust_tier(&event.source, attestor_present);
if tier < INGEST_MINIMUM_TRUST_TIER {
return Some((event.id, tier));
}
}
None
}
fn emit_ingest_policy_refusal(decision: &PolicyDecision, user_event_ids: &[EventId]) {
let contributing: Vec<&str> = decision
.contributing
.iter()
.map(|contribution| contribution.rule_id.as_str())
.collect();
eprintln!(
"ingest: refused by ADR 0026 policy outcome {:?}; contributing rules: [{}]; no state was changed",
decision.final_outcome,
contributing.join(", ")
);
for contribution in decision
.contributing
.iter()
.chain(decision.discarded.iter())
{
eprintln!(
" - {}: {}",
contribution.rule_id.as_str(),
contribution.reason
);
}
if !user_event_ids.is_empty() {
let ids: Vec<String> = user_event_ids.iter().map(EventId::to_string).collect();
eprintln!(
" EventSource::User event id(s) in input: [{}]",
ids.join(", ")
);
}
}
fn validate_session_input_path(path: &Path) -> Result<(), Exit> {
if path
.components()
.any(|component| matches!(component, Component::ParentDir))
{
eprintln!(
"ingest: refusing session path `{}`: parent-directory traversal is not accepted; no state was changed.",
path.display(),
);
return Err(Exit::PreconditionUnmet);
}
let root = session_input_root(path)?;
let canonical_root = fs::canonicalize(&root).map_err(|e| {
eprintln!(
"ingest: cannot inspect session root `{}`: {e}; no state was changed.",
root.display(),
);
Exit::PreconditionUnmet
})?;
let canonical_session = fs::canonicalize(path).map_err(|e| {
eprintln!(
"ingest: cannot inspect session path `{}`: {e}; no state was changed.",
path.display(),
);
Exit::PreconditionUnmet
})?;
if !canonical_session.starts_with(&canonical_root) {
eprintln!(
"ingest: refusing session path `{}`: symlink resolves outside root `{}`; no state was changed.",
path.display(),
canonical_root.display(),
);
return Err(Exit::PreconditionUnmet);
}
Ok(())
}
fn session_input_root(path: &Path) -> Result<PathBuf, Exit> {
let cwd = env::current_dir().map_err(|e| {
eprintln!("ingest: cannot inspect current directory: {e}; no state was changed.");
Exit::PreconditionUnmet
})?;
if path.is_absolute() {
if path.starts_with(&cwd) {
return Ok(cwd);
}
return path
.parent()
.filter(|parent| !parent.as_os_str().is_empty())
.map(Path::to_path_buf)
.ok_or(Exit::PreconditionUnmet);
}
Ok(cwd)
}
fn collect_existing_ids(log: &JsonlLog) -> Result<HashSet<EventId>, Exit> {
let mut ids = HashSet::new();
for item in log.iter().map_err(map_jsonl_err)? {
let e = item.map_err(map_jsonl_err)?;
ids.insert(e.id);
}
Ok(ids)
}
fn ingest_append_policy(source: &EventSource, is_attested_user: bool) -> PolicyDecision {
let attestation_reason = if matches!(source, EventSource::User) {
debug_assert!(
is_attested_user,
"ingest: User event reached append without prior attestation gate"
);
"ingest: user-source event attested by --user-attestation preimage"
} else {
"ingest: non-user event does not require user attestation"
};
compose_policy_outcomes(
vec![
PolicyContribution::new(
APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
PolicyOutcome::Allow,
"ingest: event source tier gate satisfied (user events attested upstream)",
)
.expect("static policy contribution is valid"),
PolicyContribution::new(
APPEND_ATTESTATION_REQUIRED_RULE_ID,
PolicyOutcome::Allow,
attestation_reason,
)
.expect("static policy contribution is valid"),
PolicyContribution::new(
APPEND_RUNTIME_MODE_RULE_ID,
PolicyOutcome::Warn,
"ingest: unsigned local-development ledger row (ADR 0037 §2 DevOnly)",
)
.expect("static policy contribution is valid"),
],
None,
)
}
fn parse_events(raw: &[u8]) -> Result<Vec<Event>, IngestError> {
let make_err = |detail: String| -> IngestError {
eprintln!("ingest: {detail}");
IngestError::new(Exit::Usage, detail)
};
let value: serde_json::Value = serde_json::from_slice(raw).map_err(|parse_err| {
make_err(format!(
"failed to parse session JSON: {parse_err}; \
expected an object with an \"events\" array, a bare array of events, \
or a single event object"
))
})?;
match value {
serde_json::Value::Object(mut map) => {
if let Some(events_val) = map.remove("events") {
let events: Vec<Event> =
serde_json::from_value(events_val).map_err(|e| {
make_err(format!(
"failed to parse session JSON: \"events\" array contains \
invalid event objects: {e}"
))
})?;
Ok(events)
} else {
let event: Event =
serde_json::from_value(serde_json::Value::Object(map)).map_err(|e| {
make_err(format!(
"failed to parse session JSON: top-level object is not a \
valid event and has no \"events\" key: {e}"
))
})?;
Ok(vec![event])
}
}
serde_json::Value::Array(_) => {
let events: Vec<Event> = serde_json::from_value(value).map_err(|e| {
make_err(format!(
"failed to parse session JSON: top-level array contains \
invalid event objects (expected array of event objects, \
e.g. [{{\"id\": ...}}], not primitives): {e}"
))
})?;
Ok(events)
}
_ => Err(make_err(
"failed to parse session JSON: wrong shape — expected object with \
\"events\" array or array of events, got a primitive (string, number, \
boolean, or null)"
.to_string(),
)),
}
}
fn load_attestor_from_key_file(path: &Path) -> Result<InMemoryAttestor, Exit> {
let bytes = fs::read(path).map_err(|e| {
eprintln!(
"ingest: cannot read --user-attestation key file `{}`: {e}",
path.display(),
);
Exit::PreconditionUnmet
})?;
if bytes.len() != 32 {
eprintln!(
"ingest: --user-attestation key file `{}` must be exactly 32 raw bytes \
(Ed25519 seed); got {} bytes",
path.display(),
bytes.len(),
);
return Err(Exit::PreconditionUnmet);
}
let mut seed = [0u8; 32];
seed.copy_from_slice(&bytes);
Ok(InMemoryAttestor::from_seed(&seed))
}
pub fn build_user_preimage(
event: &Event,
ledger_id: &str,
chain_position: u64,
attestor: &InMemoryAttestor,
) -> AttestationPreimage {
AttestationPreimage {
schema_version: SCHEMA_VERSION_ATTESTATION,
source: SourceIdentity::User,
event_id: event.id.to_string(),
payload_hash: cortex_ledger::payload_hash(&event.payload),
session_id: event.session_id.clone().unwrap_or_default(),
ledger_id: ledger_id.to_string(),
lineage: LineageBinding::ChainPosition(chain_position),
signed_at: Utc::now(),
key_id: attestor.key_id().to_string(),
}
}
pub fn derive_ledger_id(event_log_path: &Path) -> String {
event_log_path
.file_stem()
.and_then(|s| s.to_str())
.map_or_else(|| "events".to_string(), str::to_owned)
}
fn append_audit_rows(path: &Path, rows: &[AuditRecord]) -> Result<(), Exit> {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() && !parent.exists() {
fs::create_dir_all(parent).map_err(|_| Exit::PreconditionUnmet)?;
}
}
let mut f = fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.map_err(|_| Exit::PreconditionUnmet)?;
for row in rows {
let line = serde_json::to_string(row).map_err(|_| Exit::Internal)?;
writeln!(f, "{line}").map_err(|_| Exit::Internal)?;
}
f.sync_all().map_err(|_| Exit::Internal)?;
Ok(())
}
fn hex_lower(bytes: &[u8]) -> String {
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
s.push_str(&format!("{b:02x}"));
}
s
}
fn map_jsonl_err(e: cortex_ledger::JsonlError) -> Exit {
use cortex_ledger::JsonlError as J;
match e {
J::Decode { .. } | J::ChainBroken(_) => Exit::ChainCorruption,
J::Validation(_) => Exit::PreconditionUnmet,
J::Encode(_) | J::Io { .. } => Exit::Internal,
}
}
fn print_summary(out: &IngestOutcome) {
for id in &out.appended {
println!("appended {id}");
}
for id in &out.skipped {
println!("skipped {id} (already in log)");
}
println!("appended_count = {}", out.appended.len());
println!("skipped_count = {}", out.skipped.len());
if !out.attested_user_events.is_empty() {
println!("attested_user_count = {}", out.attested_user_events.len());
}
match &out.head {
Some(h) => println!("chain_head = {h}"),
None => println!("chain_head = <empty>"),
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use cortex_core::{canonical, verify, Attestation, EventSource, EventType, SCHEMA_VERSION};
use std::io::{Read, Write};
use std::path::PathBuf;
use tempfile::tempdir;
fn write_session(path: &std::path::Path, events: &[Event]) {
let envelope = serde_json::json!({"events": events});
let mut f = std::fs::File::create(path).unwrap();
f.write_all(serde_json::to_string_pretty(&envelope).unwrap().as_bytes())
.unwrap();
}
fn make_event() -> Event {
Event {
id: EventId::new(),
schema_version: SCHEMA_VERSION,
observed_at: Utc::now(),
recorded_at: Utc::now(),
source: EventSource::ChildAgent {
model: "test-model".into(),
},
event_type: EventType::AgentResponse,
trace_id: None,
session_id: Some("test".into()),
domain_tags: vec![],
payload: serde_json::json!({"text": "hello"}),
payload_hash: String::new(),
prev_event_hash: None,
event_hash: String::new(),
}
}
#[test]
fn ingest_then_reingest_is_noop() {
let tmp = tempdir().unwrap();
let session_path = tmp.path().join("session.json");
let event = make_event();
write_session(&session_path, &[event]);
let args = IngestArgs {
session: session_path.clone(),
event_log: Some(tmp.path().join("events.jsonl")),
db: Some(tmp.path().join("cortex.db")),
user_attestation: None,
};
let first = run_inner(args).unwrap();
assert_eq!(first.appended.len(), 1);
assert_eq!(first.skipped.len(), 0);
let head_after_first = first.head.clone();
assert!(head_after_first.is_some());
let args2 = IngestArgs {
session: session_path,
event_log: Some(tmp.path().join("events.jsonl")),
db: Some(tmp.path().join("cortex.db")),
user_attestation: None,
};
let second = run_inner(args2).unwrap();
assert_eq!(second.appended.len(), 0, "re-ingest must append nothing");
assert_eq!(second.skipped.len(), 1);
assert_eq!(
second.head, head_after_first,
"chain head must be unchanged across idempotent re-ingest"
);
}
#[test]
fn parse_envelope_array_or_single() {
let e = make_event();
let envelope = serde_json::to_vec(&serde_json::json!({"events": [e.clone()]})).unwrap();
assert_eq!(parse_events(&envelope).unwrap().len(), 1);
let array = serde_json::to_vec(&serde_json::json!([e.clone(), e.clone()])).unwrap();
assert_eq!(parse_events(&array).unwrap().len(), 2);
let single = serde_json::to_vec(&e).unwrap();
assert_eq!(parse_events(&single).unwrap().len(), 1);
assert_eq!(parse_events(b"not json").unwrap_err().exit, Exit::Usage);
}
fn forged_user_fixture() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("fixtures")
.join("forged-user-event.json")
}
fn attested_key_fixture() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("fixtures")
.join("attested-user-event-key.bin")
}
#[test]
fn ingest_rejects_user_source_without_attestation() {
let tmp = tempdir().unwrap();
let log = tmp.path().join("events.jsonl");
let args = IngestArgs {
session: forged_user_fixture(),
event_log: Some(log.clone()),
db: Some(tmp.path().join("cortex.db")),
user_attestation: None,
};
let err = run_inner(args).expect_err("must refuse User events without flag");
assert_eq!(err.exit, Exit::PreconditionUnmet);
if log.exists() {
assert_eq!(
std::fs::metadata(&log).unwrap().len(),
0,
"no event should have been appended"
);
}
}
#[test]
fn ingest_rejects_parent_traversal_session_path_without_mutation() {
let tmp = tempdir().unwrap();
let session_path = tmp.path().join("session.json");
let traversal_path = tmp.path().join("inputs").join("..").join("session.json");
let log = tmp.path().join("events.jsonl");
let event = make_event();
write_session(&session_path, &[event]);
let args = IngestArgs {
session: traversal_path,
event_log: Some(log.clone()),
db: Some(tmp.path().join("cortex.db")),
user_attestation: None,
};
let err = run_inner(args).expect_err("parent traversal must be rejected");
assert_eq!(err.exit, Exit::PreconditionUnmet);
assert!(
!log.exists(),
"parent traversal rejection must happen before opening the event log"
);
}
#[cfg(unix)]
#[test]
fn ingest_rejects_root_escaping_session_symlink_without_mutation() {
use std::os::unix::fs::symlink;
let tmp = tempdir().unwrap();
let outside = tempdir().unwrap();
let outside_session = outside.path().join("session.json");
let symlink_session = tmp.path().join("session-link.json");
let log = tmp.path().join("events.jsonl");
let event = make_event();
write_session(&outside_session, &[event]);
symlink(&outside_session, &symlink_session).unwrap();
let args = IngestArgs {
session: symlink_session,
event_log: Some(log.clone()),
db: Some(tmp.path().join("cortex.db")),
user_attestation: None,
};
let err = run_inner(args).expect_err("root-escaping symlink must be rejected");
assert_eq!(err.exit, Exit::PreconditionUnmet);
assert!(
!log.exists(),
"symlink rejection must happen before opening the event log"
);
}
#[test]
fn ingest_accepts_user_source_with_valid_attestation() {
let tmp = tempdir().unwrap();
let log = tmp.path().join("events.jsonl");
let db = tmp.path().join("cortex.db");
let args = IngestArgs {
session: forged_user_fixture(),
event_log: Some(log.clone()),
db: Some(db),
user_attestation: Some(attested_key_fixture()),
};
let out = run_inner(args).expect("must accept with valid attestation");
assert_eq!(out.appended.len(), 1);
assert_eq!(out.attested_user_events.len(), 1);
let audit_path = tmp.path().join(AUDIT_RECORDS_FILENAME);
assert!(
audit_path.exists(),
"audit_records.jsonl must be written when an attested User event is ingested"
);
let raw = std::fs::read_to_string(&audit_path).unwrap();
let row: AuditRecord = serde_json::from_str(raw.lines().next().unwrap()).unwrap();
assert_eq!(row.operation, OPERATION_INGEST_USER_ATTESTED);
assert_eq!(row.target_ref, out.attested_user_events[0].to_string());
assert!(matches!(row.outcome, Outcome::Success));
}
#[test]
fn ingest_audit_row_has_signature_field() {
let tmp = tempdir().unwrap();
let log = tmp.path().join("events.jsonl");
let args = IngestArgs {
session: forged_user_fixture(),
event_log: Some(log),
db: Some(tmp.path().join("cortex.db")),
user_attestation: Some(attested_key_fixture()),
};
let _ = run_inner(args).expect("attested ingest");
let audit_path = tmp.path().join(AUDIT_RECORDS_FILENAME);
let mut s = String::new();
std::fs::File::open(&audit_path)
.unwrap()
.read_to_string(&mut s)
.unwrap();
let row: AuditRecord = serde_json::from_str(s.lines().next().unwrap()).unwrap();
let actor = &row.actor_json;
assert_eq!(actor["kind"], "user");
assert!(actor["event_id"].is_string(), "event_id must be present");
assert!(actor["key_id"].is_string(), "key_id must be present");
let sig_hex = actor["signature_hex"]
.as_str()
.expect("signature_hex must be a string");
assert_eq!(
sig_hex.len(),
128,
"Ed25519 signature is 64 bytes -> 128 hex chars"
);
assert!(sig_hex.chars().all(|c| c.is_ascii_hexdigit()));
assert!(actor["signed_at"].is_string(), "signed_at must be present");
}
#[test]
fn ingest_signature_verifies_against_canonical_preimage() {
let tmp = tempdir().unwrap();
let log = tmp.path().join("events.jsonl");
let args = IngestArgs {
session: forged_user_fixture(),
event_log: Some(log.clone()),
db: Some(tmp.path().join("cortex.db")),
user_attestation: Some(attested_key_fixture()),
};
let out = run_inner(args).expect("attested ingest");
let raw_events = std::fs::read_to_string(&log).unwrap();
let event: Event =
serde_json::from_str(raw_events.lines().next().expect("one event line")).unwrap();
assert_eq!(event.id, out.appended[0]);
let audit_path = tmp.path().join(AUDIT_RECORDS_FILENAME);
let raw_audit = std::fs::read_to_string(&audit_path).unwrap();
let row: AuditRecord =
serde_json::from_str(raw_audit.lines().next().expect("one audit line")).unwrap();
let actor = &row.actor_json;
let key_id = actor["key_id"].as_str().unwrap().to_string();
let sig_hex = actor["signature_hex"].as_str().unwrap();
let signed_at: chrono::DateTime<chrono::Utc> =
serde_json::from_value(actor["signed_at"].clone()).unwrap();
let preimage = AttestationPreimage {
schema_version: SCHEMA_VERSION_ATTESTATION,
source: SourceIdentity::User,
event_id: event.id.to_string(),
payload_hash: event.payload_hash.clone(),
session_id: event.session_id.clone().unwrap_or_default(),
ledger_id: derive_ledger_id(&log),
lineage: LineageBinding::ChainPosition(0),
signed_at,
key_id: key_id.clone(),
};
let mut sig_bytes = [0u8; 64];
for (i, byte_out) in sig_bytes.iter_mut().enumerate() {
let hi = (sig_hex.as_bytes()[i * 2] as char).to_digit(16).unwrap();
let lo = (sig_hex.as_bytes()[i * 2 + 1] as char)
.to_digit(16)
.unwrap();
*byte_out = ((hi << 4) | lo) as u8;
}
let attestation = Attestation {
key_id: key_id.clone(),
signature: sig_bytes,
signed_at,
};
let key_bytes = std::fs::read(attested_key_fixture()).unwrap();
let mut seed = [0u8; 32];
seed.copy_from_slice(&key_bytes);
let attestor = InMemoryAttestor::from_seed(&seed);
assert_eq!(
attestor.key_id(),
key_id,
"derived key_id must match audit row's key_id"
);
verify(&preimage, &attestation, &attestor.verifying_key(), &key_id)
.expect("persisted attestation must verify against canonical preimage");
let _ = canonical::canonical_signing_input(&preimage);
}
#[test]
fn ingest_rejects_wrong_size_key_file() {
let tmp = tempdir().unwrap();
let bad_key = tmp.path().join("short.bin");
std::fs::write(&bad_key, b"too-short").unwrap();
let args = IngestArgs {
session: forged_user_fixture(),
event_log: Some(tmp.path().join("events.jsonl")),
db: Some(tmp.path().join("cortex.db")),
user_attestation: Some(bad_key),
};
let err = run_inner(args).expect_err("short key file must be rejected");
assert_eq!(err.exit, Exit::PreconditionUnmet);
}
fn user_event() -> Event {
Event {
id: EventId::new(),
schema_version: SCHEMA_VERSION,
observed_at: Utc::now(),
recorded_at: Utc::now(),
source: EventSource::User,
event_type: EventType::UserMessage,
trace_id: None,
session_id: Some("test".into()),
domain_tags: vec![],
payload: serde_json::json!({"text": "operator message"}),
payload_hash: String::new(),
prev_event_hash: None,
event_hash: String::new(),
}
}
fn unnamed_tool_event() -> Event {
Event {
id: EventId::new(),
schema_version: SCHEMA_VERSION,
observed_at: Utc::now(),
recorded_at: Utc::now(),
source: EventSource::Tool { name: "".into() },
event_type: EventType::ToolResult,
trace_id: None,
session_id: Some("test".into()),
domain_tags: vec![],
payload: serde_json::json!({"result": "ok"}),
payload_hash: String::new(),
prev_event_hash: None,
event_hash: String::new(),
}
}
fn manual_correction_event() -> Event {
Event {
id: EventId::new(),
schema_version: SCHEMA_VERSION,
observed_at: Utc::now(),
recorded_at: Utc::now(),
source: EventSource::ManualCorrection,
event_type: EventType::UserMessage,
trace_id: None,
session_id: Some("test".into()),
domain_tags: vec![],
payload: serde_json::json!({"text": "operator correction"}),
payload_hash: String::new(),
prev_event_hash: None,
event_hash: String::new(),
}
}
#[test]
fn policy_user_without_attestation_rejects_with_user_attestation_missing_rule() {
let event = user_event();
let decision = ingest_policy_decision(&[event.clone()], &[event.id], false);
assert_eq!(decision.final_outcome, PolicyOutcome::Reject);
let user_rule = decision
.contributing
.iter()
.chain(decision.discarded.iter())
.find(|c| c.rule_id.as_str() == INGEST_USER_ATTESTATION_RULE_ID)
.expect("user attestation contributor is present");
assert_eq!(user_rule.outcome, PolicyOutcome::Reject);
assert!(user_rule
.reason
.contains("policy.ingest.user_attestation.missing"));
}
#[test]
fn policy_tool_below_minimum_rejects_with_event_source_tier_gate_rule() {
let event = unnamed_tool_event();
let decision = ingest_policy_decision(&[event.clone()], &[], false);
assert_eq!(decision.final_outcome, PolicyOutcome::Reject);
let tier_rule = decision
.contributing
.iter()
.chain(decision.discarded.iter())
.find(|c| c.rule_id.as_str() == INGEST_EVENT_SOURCE_TIER_GATE_RULE_ID)
.expect("tier gate contributor is present");
assert_eq!(tier_rule.outcome, PolicyOutcome::Reject);
assert!(tier_rule
.reason
.contains("policy.ingest.event_source_tier_gate.below_minimum"));
assert!(tier_rule.reason.contains(&event.id.to_string()));
}
#[test]
fn policy_manual_correction_without_attestor_is_below_tier() {
let event = manual_correction_event();
let decision = ingest_policy_decision(&[event], &[], false);
assert_eq!(decision.final_outcome, PolicyOutcome::Reject);
}
#[test]
fn policy_well_formed_child_agent_event_allows() {
let event = make_event();
let decision = ingest_policy_decision(&[event], &[], false);
assert_eq!(decision.final_outcome, PolicyOutcome::Allow);
assert_eq!(decision.contributing.len(), 3);
}
#[test]
fn policy_attested_user_event_allows() {
let event = user_event();
let decision = ingest_policy_decision(&[event.clone()], &[event.id], true);
assert_eq!(decision.final_outcome, PolicyOutcome::Allow);
}
#[test]
fn event_source_trust_tier_mapping_matches_adr_0019() {
assert_eq!(
event_source_trust_tier(&EventSource::User, false),
TrustTier::Untrusted
);
assert_eq!(
event_source_trust_tier(&EventSource::User, true),
TrustTier::Operator
);
assert_eq!(
event_source_trust_tier(&EventSource::ManualCorrection, false),
TrustTier::Untrusted
);
assert_eq!(
event_source_trust_tier(&EventSource::ManualCorrection, true),
TrustTier::Operator
);
assert_eq!(
event_source_trust_tier(
&EventSource::ChildAgent {
model: "replay".into()
},
false
),
TrustTier::Observed
);
assert_eq!(
event_source_trust_tier(&EventSource::ChildAgent { model: "".into() }, false),
TrustTier::Untrusted
);
assert_eq!(
event_source_trust_tier(
&EventSource::Tool {
name: "auditor".into()
},
false
),
TrustTier::Observed
);
assert_eq!(
event_source_trust_tier(&EventSource::Runtime, false),
TrustTier::Observed
);
assert_eq!(
event_source_trust_tier(&EventSource::ExternalOutcome, false),
TrustTier::Observed
);
}
#[test]
fn run_inner_refuses_tool_event_below_tier_without_mutation() {
let tmp = tempdir().unwrap();
let session_path = tmp.path().join("session.json");
let event = unnamed_tool_event();
write_session(&session_path, &[event]);
let log = tmp.path().join("events.jsonl");
let args = IngestArgs {
session: session_path,
event_log: Some(log.clone()),
db: Some(tmp.path().join("cortex.db")),
user_attestation: None,
};
let err = run_inner(args).expect_err("below-tier tool must be refused");
assert_eq!(err.exit, Exit::PreconditionUnmet);
if log.exists() {
assert_eq!(
std::fs::metadata(&log).unwrap().len(),
0,
"below-tier tool ingest must not append to the JSONL log"
);
}
}
#[test]
fn run_inner_allows_named_child_agent_event_through_policy() {
let tmp = tempdir().unwrap();
let session_path = tmp.path().join("session.json");
let event = make_event();
write_session(&session_path, &[event]);
let args = IngestArgs {
session: session_path,
event_log: Some(tmp.path().join("events.jsonl")),
db: Some(tmp.path().join("cortex.db")),
user_attestation: None,
};
let outcome = run_inner(args).expect("policy allow path must succeed");
assert_eq!(outcome.appended.len(), 1);
assert_eq!(outcome.skipped.len(), 0);
}
}