Skip to main content

assay_runner_core/
policy.rs

1use crate::{run::is_safe_run_id, RunnerSpikeArchive};
2use assay_runner_schema::{
3    BindingWindow, CapabilitySurface, CorrelationBinding, PolicyLayerStatus,
4};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use thiserror::Error;
8
9pub const POLICY_EVENT_SCHEMA: &str = "assay.runner.policy_event.v0";
10
11#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
12pub struct PolicyLayerEvent {
13    pub schema: String,
14    pub run_id: String,
15    pub seq: u64,
16    pub source_event_type: String,
17    pub source: String,
18    pub tool_call_id: String,
19    pub tool: String,
20    pub decision: String,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct PolicyLayerCapture {
25    pub run_id: String,
26    pub policy_layer_ndjson: Vec<u8>,
27    pub capability_surface: CapabilitySurface,
28    pub events: Vec<PolicyLayerEvent>,
29}
30
31#[derive(Debug, Error)]
32pub enum PolicyLayerError {
33    #[error("policy layer run_id must not be empty")]
34    EmptyRunId,
35    #[error("policy layer run_id may only contain ASCII letters, digits, '_' and '-'")]
36    UnsafeRunId,
37    #[error("policy layer run_id mismatch: expected {expected}, found {actual}")]
38    RunIdMismatch { expected: String, actual: String },
39    #[error("policy decision log did not contain decision events")]
40    EmptyDecisionLog,
41    #[error("invalid policy decision json at line {line}: {source}")]
42    InvalidJson {
43        line: usize,
44        source: serde_json::Error,
45    },
46    #[error(
47        "policy decision line {line} must have type assay.tool.decision, found {observed_type:?}"
48    )]
49    UnexpectedEventType {
50        line: usize,
51        observed_type: Option<String>,
52    },
53    #[error("policy decision line {line} missing required field {field}")]
54    MissingRequiredField { line: usize, field: String },
55    #[error("invalid capability surface: {0}")]
56    CapabilitySurface(#[from] assay_runner_schema::CapabilitySurfaceError),
57    #[error("policy event serialization failed: {0}")]
58    Json(#[from] serde_json::Error),
59}
60
61impl PolicyLayerCapture {
62    pub fn from_decision_ndjson(
63        run_id: impl Into<String>,
64        ndjson: &[u8],
65    ) -> Result<Self, PolicyLayerError> {
66        let run_id = run_id.into();
67        if run_id.is_empty() {
68            return Err(PolicyLayerError::EmptyRunId);
69        }
70        if !is_safe_run_id(&run_id) {
71            return Err(PolicyLayerError::UnsafeRunId);
72        }
73
74        let input = String::from_utf8_lossy(ndjson);
75        let mut policy_layer_ndjson = Vec::new();
76        let mut capability_surface = CapabilitySurface::new(run_id.clone());
77        let mut events = Vec::new();
78
79        for (idx, raw) in input.lines().enumerate() {
80            let line = idx + 1;
81            let trimmed = raw.trim();
82            if trimmed.is_empty() {
83                continue;
84            }
85            let value: Value = serde_json::from_str(trimmed)
86                .map_err(|source| PolicyLayerError::InvalidJson { line, source })?;
87            let observed_type = observed_type(&value);
88            if observed_type.as_deref() != Some("assay.tool.decision") {
89                return Err(PolicyLayerError::UnexpectedEventType {
90                    line,
91                    observed_type,
92                });
93            }
94            let data = value
95                .get("data")
96                .ok_or_else(|| PolicyLayerError::MissingRequiredField {
97                    line,
98                    field: "data".to_string(),
99                })?;
100            let source = required_string(&value, "source", line)?;
101            let tool = required_data_string(data, "tool", line)?;
102            let tool_call_id = required_data_string(data, "tool_call_id", line)?;
103            let decision = required_data_string(data, "decision", line)?;
104
105            capability_surface.add_mcp_tool(tool.clone());
106            capability_surface.add_policy_decision(format!("{decision}:{tool}"));
107
108            let event = PolicyLayerEvent {
109                schema: POLICY_EVENT_SCHEMA.to_string(),
110                run_id: run_id.clone(),
111                seq: events.len() as u64,
112                source_event_type: "assay.tool.decision".to_string(),
113                source,
114                tool_call_id,
115                tool,
116                decision,
117            };
118            serde_json::to_writer(&mut policy_layer_ndjson, &event)?;
119            policy_layer_ndjson.push(b'\n');
120            events.push(event);
121        }
122
123        if events.is_empty() {
124            return Err(PolicyLayerError::EmptyDecisionLog);
125        }
126
127        Ok(Self {
128            run_id,
129            policy_layer_ndjson,
130            capability_surface,
131            events,
132        })
133    }
134
135    /// Apply this policy capture to the archive.
136    ///
137    /// Must be called after any kernel-layer capture has been applied:
138    /// `kernel_event_count` is read from `archive.kernel_layer_ndjson` at apply
139    /// time. Calling this on an archive with no kernel events marks the
140    /// correlation report partial with `policy_events_without_kernel_events`.
141    pub fn apply_to_archive(
142        self,
143        archive: &mut RunnerSpikeArchive,
144    ) -> Result<(), PolicyLayerError> {
145        let PolicyLayerCapture {
146            run_id,
147            policy_layer_ndjson,
148            capability_surface,
149            events,
150        } = self;
151
152        if archive.run_id != run_id {
153            return Err(PolicyLayerError::RunIdMismatch {
154                expected: archive.run_id.clone(),
155                actual: run_id,
156            });
157        }
158
159        archive.policy_layer_ndjson = policy_layer_ndjson;
160        archive.capability_surface.merge_from(&capability_surface)?;
161        archive.observation_health.policy_layer = PolicyLayerStatus::Present;
162        archive
163            .observation_health
164            .notes
165            .retain(|note| !note.starts_with("s4_policy_capture:"));
166        archive.observation_health.notes.push(format!(
167            "s4_policy_capture: decision_events={}",
168            events.len()
169        ));
170
171        let kernel_event_count = archive
172            .kernel_layer_ndjson
173            .split(|byte| *byte == b'\n')
174            .filter(|line| !line.is_empty())
175            .count() as u64;
176        if kernel_event_count == 0 {
177            archive
178                .correlation_report
179                .mark_partial("policy_events_without_kernel_events");
180        }
181
182        for event in events {
183            archive.correlation_report.add_binding(CorrelationBinding {
184                tool_call_id: event.tool_call_id,
185                policy_decision: Some(event.decision),
186                kernel_event_count,
187                window: BindingWindow {
188                    start: "run_started".to_string(),
189                    end: "run_finished".to_string(),
190                },
191            });
192        }
193        Ok(())
194    }
195}
196
197fn required_string(
198    value: &Value,
199    field: &'static str,
200    line: usize,
201) -> Result<String, PolicyLayerError> {
202    value
203        .get(field)
204        .and_then(Value::as_str)
205        .map(str::trim)
206        .filter(|value| !value.is_empty())
207        .map(ToOwned::to_owned)
208        .ok_or_else(|| PolicyLayerError::MissingRequiredField {
209            line,
210            field: field.to_string(),
211        })
212}
213
214fn required_data_string(
215    data: &Value,
216    field: &'static str,
217    line: usize,
218) -> Result<String, PolicyLayerError> {
219    data.get(field)
220        .and_then(Value::as_str)
221        .map(str::trim)
222        .filter(|value| !value.is_empty())
223        .map(ToOwned::to_owned)
224        .ok_or_else(|| PolicyLayerError::MissingRequiredField {
225            line,
226            field: format!("data.{field}"),
227        })
228}
229
230fn observed_type(value: &Value) -> Option<String> {
231    value.get("type").map(|event_type| match event_type {
232        Value::String(event_type) => event_type.clone(),
233        other => other.to_string(),
234    })
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use assay_runner_schema::{CgroupCorrelationStatus, KernelLayerStatus};
241
242    const DECISION: &[u8] = br#"{"specversion":"1.0","id":"evt_decision_001","type":"assay.tool.decision","source":"assay://runner-spike/run_001","time":"2026-05-20T00:00:00Z","data":{"tool":"read_file","decision":"allow","reason_code":"P_TOOL_ALLOWED","tool_call_id":"tc_runner_policy_001"}}
243"#;
244
245    #[test]
246    fn decision_log_records_policy_layer_and_surface() {
247        let capture = PolicyLayerCapture::from_decision_ndjson("run_001", DECISION).unwrap();
248        let policy = String::from_utf8(capture.policy_layer_ndjson.clone()).unwrap();
249
250        assert!(policy.contains(POLICY_EVENT_SCHEMA));
251        assert!(policy.contains("tc_runner_policy_001"));
252        assert!(capture.capability_surface.mcp_tools.contains("read_file"));
253        assert!(capture
254            .capability_surface
255            .policy_decisions
256            .contains("allow:read_file"));
257    }
258
259    #[test]
260    fn apply_marks_policy_present_and_adds_binding() {
261        let capture = PolicyLayerCapture::from_decision_ndjson("run_001", DECISION).unwrap();
262        let mut archive = RunnerSpikeArchive::empty("run_001", "linux");
263        archive.observation_health.kernel_layer = KernelLayerStatus::Complete;
264        archive.observation_health.cgroup_correlation = CgroupCorrelationStatus::Clean;
265        archive.kernel_layer_ndjson = b"{\"schema\":\"assay.runner.kernel_event.v0\"}\n".to_vec();
266
267        capture.apply_to_archive(&mut archive).unwrap();
268
269        assert_eq!(
270            archive.observation_health.policy_layer,
271            PolicyLayerStatus::Present
272        );
273        assert_eq!(archive.correlation_report.bindings.len(), 1);
274        assert_eq!(
275            archive.correlation_report.bindings[0].tool_call_id,
276            "tc_runner_policy_001"
277        );
278        assert_eq!(archive.correlation_report.bindings[0].kernel_event_count, 1);
279    }
280
281    #[test]
282    fn empty_decision_log_is_rejected() {
283        assert!(matches!(
284            PolicyLayerCapture::from_decision_ndjson("run_001", b"\n"),
285            Err(PolicyLayerError::EmptyDecisionLog)
286        ));
287    }
288
289    #[test]
290    fn unexpected_event_type_reports_observed_type() {
291        let err = PolicyLayerCapture::from_decision_ndjson(
292            "run_001",
293            br#"{"type":"assay.other.event","source":"assay://test","data":{}}
294"#,
295        )
296        .unwrap_err();
297
298        assert!(matches!(
299            err,
300            PolicyLayerError::UnexpectedEventType {
301                line: 1,
302                observed_type: Some(ref observed)
303            } if observed == "assay.other.event"
304        ));
305        assert!(err.to_string().contains("assay.other.event"));
306    }
307
308    #[test]
309    fn missing_top_level_source_is_not_reported_as_data_source() {
310        let err = PolicyLayerCapture::from_decision_ndjson(
311            "run_001",
312            br#"{"type":"assay.tool.decision","data":{"tool":"read_file","decision":"allow","tool_call_id":"tc_runner_policy_001"}}
313"#,
314        )
315        .unwrap_err();
316
317        assert!(matches!(
318            err,
319            PolicyLayerError::MissingRequiredField {
320                line: 1,
321                ref field
322            } if field == "source"
323        ));
324        assert!(err.to_string().contains("missing required field source"));
325        assert!(!err.to_string().contains("data.source"));
326    }
327
328    #[test]
329    fn policy_without_kernel_events_marks_correlation_partial() {
330        let capture = PolicyLayerCapture::from_decision_ndjson("run_001", DECISION).unwrap();
331        let mut archive = RunnerSpikeArchive::empty("run_001", "linux");
332
333        capture.apply_to_archive(&mut archive).unwrap();
334
335        assert_eq!(
336            archive.correlation_report.status,
337            assay_runner_schema::CorrelationStatus::Partial
338        );
339        assert!(archive
340            .correlation_report
341            .ambiguities
342            .iter()
343            .any(|ambiguity| ambiguity == "policy_events_without_kernel_events"));
344    }
345}