1use crate::{run::is_safe_run_id, RunnerSpikeArchive};
2use assay_runner_schema::{
3 BindingWindow, CapabilitySurface, CorrelationBinding, PolicyLayerStatus,
4};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use thiserror::Error;
8
9pub const POLICY_EVENT_SCHEMA: &str = "assay.runner.policy_event.v0";
10
11#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
12pub struct PolicyLayerEvent {
13 pub schema: String,
14 pub run_id: String,
15 pub seq: u64,
16 pub source_event_type: String,
17 pub source: String,
18 pub tool_call_id: String,
19 pub tool: String,
20 pub decision: String,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct PolicyLayerCapture {
25 pub run_id: String,
26 pub policy_layer_ndjson: Vec<u8>,
27 pub capability_surface: CapabilitySurface,
28 pub events: Vec<PolicyLayerEvent>,
29}
30
31#[derive(Debug, Error)]
32pub enum PolicyLayerError {
33 #[error("policy layer run_id must not be empty")]
34 EmptyRunId,
35 #[error("policy layer run_id may only contain ASCII letters, digits, '_' and '-'")]
36 UnsafeRunId,
37 #[error("policy layer run_id mismatch: expected {expected}, found {actual}")]
38 RunIdMismatch { expected: String, actual: String },
39 #[error("policy decision log did not contain decision events")]
40 EmptyDecisionLog,
41 #[error("invalid policy decision json at line {line}: {source}")]
42 InvalidJson {
43 line: usize,
44 source: serde_json::Error,
45 },
46 #[error(
47 "policy decision line {line} must have type assay.tool.decision, found {observed_type:?}"
48 )]
49 UnexpectedEventType {
50 line: usize,
51 observed_type: Option<String>,
52 },
53 #[error("policy decision line {line} missing required field {field}")]
54 MissingRequiredField { line: usize, field: String },
55 #[error("invalid capability surface: {0}")]
56 CapabilitySurface(#[from] assay_runner_schema::CapabilitySurfaceError),
57 #[error("policy event serialization failed: {0}")]
58 Json(#[from] serde_json::Error),
59}
60
61impl PolicyLayerCapture {
62 pub fn from_decision_ndjson(
63 run_id: impl Into<String>,
64 ndjson: &[u8],
65 ) -> Result<Self, PolicyLayerError> {
66 let run_id = run_id.into();
67 if run_id.is_empty() {
68 return Err(PolicyLayerError::EmptyRunId);
69 }
70 if !is_safe_run_id(&run_id) {
71 return Err(PolicyLayerError::UnsafeRunId);
72 }
73
74 let input = String::from_utf8_lossy(ndjson);
75 let mut policy_layer_ndjson = Vec::new();
76 let mut capability_surface = CapabilitySurface::new(run_id.clone());
77 let mut events = Vec::new();
78
79 for (idx, raw) in input.lines().enumerate() {
80 let line = idx + 1;
81 let trimmed = raw.trim();
82 if trimmed.is_empty() {
83 continue;
84 }
85 let value: Value = serde_json::from_str(trimmed)
86 .map_err(|source| PolicyLayerError::InvalidJson { line, source })?;
87 let observed_type = observed_type(&value);
88 if observed_type.as_deref() != Some("assay.tool.decision") {
89 return Err(PolicyLayerError::UnexpectedEventType {
90 line,
91 observed_type,
92 });
93 }
94 let data = value
95 .get("data")
96 .ok_or_else(|| PolicyLayerError::MissingRequiredField {
97 line,
98 field: "data".to_string(),
99 })?;
100 let source = required_string(&value, "source", line)?;
101 let tool = required_data_string(data, "tool", line)?;
102 let tool_call_id = required_data_string(data, "tool_call_id", line)?;
103 let decision = required_data_string(data, "decision", line)?;
104
105 capability_surface.add_mcp_tool(tool.clone());
106 capability_surface.add_policy_decision(format!("{decision}:{tool}"));
107
108 let event = PolicyLayerEvent {
109 schema: POLICY_EVENT_SCHEMA.to_string(),
110 run_id: run_id.clone(),
111 seq: events.len() as u64,
112 source_event_type: "assay.tool.decision".to_string(),
113 source,
114 tool_call_id,
115 tool,
116 decision,
117 };
118 serde_json::to_writer(&mut policy_layer_ndjson, &event)?;
119 policy_layer_ndjson.push(b'\n');
120 events.push(event);
121 }
122
123 if events.is_empty() {
124 return Err(PolicyLayerError::EmptyDecisionLog);
125 }
126
127 Ok(Self {
128 run_id,
129 policy_layer_ndjson,
130 capability_surface,
131 events,
132 })
133 }
134
135 pub fn apply_to_archive(
142 self,
143 archive: &mut RunnerSpikeArchive,
144 ) -> Result<(), PolicyLayerError> {
145 let PolicyLayerCapture {
146 run_id,
147 policy_layer_ndjson,
148 capability_surface,
149 events,
150 } = self;
151
152 if archive.run_id != run_id {
153 return Err(PolicyLayerError::RunIdMismatch {
154 expected: archive.run_id.clone(),
155 actual: run_id,
156 });
157 }
158
159 archive.policy_layer_ndjson = policy_layer_ndjson;
160 archive.capability_surface.merge_from(&capability_surface)?;
161 archive.observation_health.policy_layer = PolicyLayerStatus::Present;
162 archive
163 .observation_health
164 .notes
165 .retain(|note| !note.starts_with("s4_policy_capture:"));
166 archive.observation_health.notes.push(format!(
167 "s4_policy_capture: decision_events={}",
168 events.len()
169 ));
170
171 let kernel_event_count = archive
172 .kernel_layer_ndjson
173 .split(|byte| *byte == b'\n')
174 .filter(|line| !line.is_empty())
175 .count() as u64;
176 if kernel_event_count == 0 {
177 archive
178 .correlation_report
179 .mark_partial("policy_events_without_kernel_events");
180 }
181
182 for event in events {
183 archive.correlation_report.add_binding(CorrelationBinding {
184 tool_call_id: event.tool_call_id,
185 policy_decision: Some(event.decision),
186 kernel_event_count,
187 window: BindingWindow {
188 start: "run_started".to_string(),
189 end: "run_finished".to_string(),
190 },
191 });
192 }
193 Ok(())
194 }
195}
196
197fn required_string(
198 value: &Value,
199 field: &'static str,
200 line: usize,
201) -> Result<String, PolicyLayerError> {
202 value
203 .get(field)
204 .and_then(Value::as_str)
205 .map(str::trim)
206 .filter(|value| !value.is_empty())
207 .map(ToOwned::to_owned)
208 .ok_or_else(|| PolicyLayerError::MissingRequiredField {
209 line,
210 field: field.to_string(),
211 })
212}
213
214fn required_data_string(
215 data: &Value,
216 field: &'static str,
217 line: usize,
218) -> Result<String, PolicyLayerError> {
219 data.get(field)
220 .and_then(Value::as_str)
221 .map(str::trim)
222 .filter(|value| !value.is_empty())
223 .map(ToOwned::to_owned)
224 .ok_or_else(|| PolicyLayerError::MissingRequiredField {
225 line,
226 field: format!("data.{field}"),
227 })
228}
229
230fn observed_type(value: &Value) -> Option<String> {
231 value.get("type").map(|event_type| match event_type {
232 Value::String(event_type) => event_type.clone(),
233 other => other.to_string(),
234 })
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use assay_runner_schema::{CgroupCorrelationStatus, KernelLayerStatus};
241
242 const DECISION: &[u8] = br#"{"specversion":"1.0","id":"evt_decision_001","type":"assay.tool.decision","source":"assay://runner-spike/run_001","time":"2026-05-20T00:00:00Z","data":{"tool":"read_file","decision":"allow","reason_code":"P_TOOL_ALLOWED","tool_call_id":"tc_runner_policy_001"}}
243"#;
244
245 #[test]
246 fn decision_log_records_policy_layer_and_surface() {
247 let capture = PolicyLayerCapture::from_decision_ndjson("run_001", DECISION).unwrap();
248 let policy = String::from_utf8(capture.policy_layer_ndjson.clone()).unwrap();
249
250 assert!(policy.contains(POLICY_EVENT_SCHEMA));
251 assert!(policy.contains("tc_runner_policy_001"));
252 assert!(capture.capability_surface.mcp_tools.contains("read_file"));
253 assert!(capture
254 .capability_surface
255 .policy_decisions
256 .contains("allow:read_file"));
257 }
258
259 #[test]
260 fn apply_marks_policy_present_and_adds_binding() {
261 let capture = PolicyLayerCapture::from_decision_ndjson("run_001", DECISION).unwrap();
262 let mut archive = RunnerSpikeArchive::empty("run_001", "linux");
263 archive.observation_health.kernel_layer = KernelLayerStatus::Complete;
264 archive.observation_health.cgroup_correlation = CgroupCorrelationStatus::Clean;
265 archive.kernel_layer_ndjson = b"{\"schema\":\"assay.runner.kernel_event.v0\"}\n".to_vec();
266
267 capture.apply_to_archive(&mut archive).unwrap();
268
269 assert_eq!(
270 archive.observation_health.policy_layer,
271 PolicyLayerStatus::Present
272 );
273 assert_eq!(archive.correlation_report.bindings.len(), 1);
274 assert_eq!(
275 archive.correlation_report.bindings[0].tool_call_id,
276 "tc_runner_policy_001"
277 );
278 assert_eq!(archive.correlation_report.bindings[0].kernel_event_count, 1);
279 }
280
281 #[test]
282 fn empty_decision_log_is_rejected() {
283 assert!(matches!(
284 PolicyLayerCapture::from_decision_ndjson("run_001", b"\n"),
285 Err(PolicyLayerError::EmptyDecisionLog)
286 ));
287 }
288
289 #[test]
290 fn unexpected_event_type_reports_observed_type() {
291 let err = PolicyLayerCapture::from_decision_ndjson(
292 "run_001",
293 br#"{"type":"assay.other.event","source":"assay://test","data":{}}
294"#,
295 )
296 .unwrap_err();
297
298 assert!(matches!(
299 err,
300 PolicyLayerError::UnexpectedEventType {
301 line: 1,
302 observed_type: Some(ref observed)
303 } if observed == "assay.other.event"
304 ));
305 assert!(err.to_string().contains("assay.other.event"));
306 }
307
308 #[test]
309 fn missing_top_level_source_is_not_reported_as_data_source() {
310 let err = PolicyLayerCapture::from_decision_ndjson(
311 "run_001",
312 br#"{"type":"assay.tool.decision","data":{"tool":"read_file","decision":"allow","tool_call_id":"tc_runner_policy_001"}}
313"#,
314 )
315 .unwrap_err();
316
317 assert!(matches!(
318 err,
319 PolicyLayerError::MissingRequiredField {
320 line: 1,
321 ref field
322 } if field == "source"
323 ));
324 assert!(err.to_string().contains("missing required field source"));
325 assert!(!err.to_string().contains("data.source"));
326 }
327
328 #[test]
329 fn policy_without_kernel_events_marks_correlation_partial() {
330 let capture = PolicyLayerCapture::from_decision_ndjson("run_001", DECISION).unwrap();
331 let mut archive = RunnerSpikeArchive::empty("run_001", "linux");
332
333 capture.apply_to_archive(&mut archive).unwrap();
334
335 assert_eq!(
336 archive.correlation_report.status,
337 assay_runner_schema::CorrelationStatus::Partial
338 );
339 assert!(archive
340 .correlation_report
341 .ambiguities
342 .iter()
343 .any(|ambiguity| ambiguity == "policy_events_without_kernel_events"));
344 }
345}