1use std::sync::Arc;
11
12use chio_http_session::SessionJournal;
13use chio_kernel::{Guard, GuardContext, KernelError, Verdict};
14
15#[derive(Clone, Debug, Default)]
21pub struct DataFlowConfig {
22 pub max_bytes_read: Option<u64>,
24 pub max_bytes_written: Option<u64>,
26 pub max_bytes_total: Option<u64>,
28}
29
30pub struct DataFlowGuard {
39 journal: Arc<SessionJournal>,
40 config: DataFlowConfig,
41}
42
43impl DataFlowGuard {
44 pub fn new(journal: Arc<SessionJournal>, config: DataFlowConfig) -> Self {
46 Self { journal, config }
47 }
48}
49
50impl Guard for DataFlowGuard {
51 fn name(&self) -> &str {
52 "data-flow"
53 }
54
55 fn evaluate(&self, _ctx: &GuardContext) -> Result<Verdict, KernelError> {
56 let flow = self.journal.data_flow().map_err(|e| {
57 KernelError::Internal(format!("data-flow guard journal error (fail-closed): {e}"))
58 })?;
59
60 if let Some(max_read) = self.config.max_bytes_read {
62 if flow.total_bytes_read >= max_read {
63 return Ok(Verdict::Deny);
64 }
65 }
66
67 if let Some(max_written) = self.config.max_bytes_written {
69 if flow.total_bytes_written >= max_written {
70 return Ok(Verdict::Deny);
71 }
72 }
73
74 if let Some(max_total) = self.config.max_bytes_total {
76 let total = flow
77 .total_bytes_read
78 .saturating_add(flow.total_bytes_written);
79 if total >= max_total {
80 return Ok(Verdict::Deny);
81 }
82 }
83
84 Ok(Verdict::Allow)
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91 use chio_http_session::RecordParams;
92
93 fn make_journal(session_id: &str) -> Arc<SessionJournal> {
94 Arc::new(SessionJournal::new(session_id.to_string()))
95 }
96
97 fn make_ctx() -> (
98 chio_kernel::ToolCallRequest,
99 chio_core::capability::ChioScope,
100 String,
101 String,
102 ) {
103 let kp = chio_core::crypto::Keypair::generate();
104 let scope = chio_core::capability::ChioScope::default();
105 let agent_id = kp.public_key().to_hex();
106 let server_id = "srv-test".to_string();
107
108 let cap_body = chio_core::capability::CapabilityTokenBody {
109 id: "cap-test".to_string(),
110 issuer: kp.public_key(),
111 subject: kp.public_key(),
112 scope: scope.clone(),
113 issued_at: 0,
114 expires_at: u64::MAX,
115 delegation_chain: vec![],
116 };
117 let cap = chio_core::capability::CapabilityToken::sign(cap_body, &kp).expect("sign cap");
118
119 let request = chio_kernel::ToolCallRequest {
120 request_id: "req-test".to_string(),
121 capability: cap,
122 tool_name: "read_file".to_string(),
123 server_id: server_id.clone(),
124 agent_id: agent_id.clone(),
125 arguments: serde_json::json!({"path": "/app/src/main.rs"}),
126 dpop_proof: None,
127 governed_intent: None,
128 approval_token: None,
129 model_metadata: None,
130 federated_origin_kernel_id: None,
131 };
132
133 (request, scope, agent_id, server_id)
134 }
135
136 fn guard_ctx<'a>(
137 request: &'a chio_kernel::ToolCallRequest,
138 scope: &'a chio_core::capability::ChioScope,
139 agent_id: &'a String,
140 server_id: &'a String,
141 ) -> chio_kernel::GuardContext<'a> {
142 chio_kernel::GuardContext {
143 request,
144 scope,
145 agent_id,
146 server_id,
147 session_filesystem_roots: None,
148 matched_grant_index: None,
149 }
150 }
151
152 #[test]
153 fn guard_name() {
154 let journal = make_journal("sess-1");
155 let guard = DataFlowGuard::new(journal, DataFlowConfig::default());
156 assert_eq!(guard.name(), "data-flow");
157 }
158
159 #[test]
160 fn unlimited_allows_all() {
161 let journal = make_journal("sess-1");
162 journal
164 .record(RecordParams {
165 tool_name: "read_file".to_string(),
166 server_id: "srv".to_string(),
167 agent_id: "agent".to_string(),
168 bytes_read: 1_000_000,
169 bytes_written: 500_000,
170 delegation_depth: 0,
171 allowed: true,
172 })
173 .expect("record");
174
175 let guard = DataFlowGuard::new(journal, DataFlowConfig::default());
176 let (request, scope, agent_id, server_id) = make_ctx();
177 let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
178 assert_eq!(guard.evaluate(&ctx).expect("ok"), Verdict::Allow);
179 }
180
181 #[test]
182 fn denies_when_bytes_read_exceeded() {
183 let journal = make_journal("sess-read");
184 journal
185 .record(RecordParams {
186 tool_name: "read_file".to_string(),
187 server_id: "srv".to_string(),
188 agent_id: "agent".to_string(),
189 bytes_read: 500,
190 bytes_written: 0,
191 delegation_depth: 0,
192 allowed: true,
193 })
194 .expect("record");
195
196 let guard = DataFlowGuard::new(
197 journal,
198 DataFlowConfig {
199 max_bytes_read: Some(500),
200 ..DataFlowConfig::default()
201 },
202 );
203
204 let (request, scope, agent_id, server_id) = make_ctx();
205 let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
206 assert_eq!(guard.evaluate(&ctx).expect("ok"), Verdict::Deny);
207 }
208
209 #[test]
210 fn denies_when_bytes_written_exceeded() {
211 let journal = make_journal("sess-write");
212 journal
213 .record(RecordParams {
214 tool_name: "write_file".to_string(),
215 server_id: "srv".to_string(),
216 agent_id: "agent".to_string(),
217 bytes_read: 0,
218 bytes_written: 1000,
219 delegation_depth: 0,
220 allowed: true,
221 })
222 .expect("record");
223
224 let guard = DataFlowGuard::new(
225 journal,
226 DataFlowConfig {
227 max_bytes_written: Some(999),
228 ..DataFlowConfig::default()
229 },
230 );
231
232 let (request, scope, agent_id, server_id) = make_ctx();
233 let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
234 assert_eq!(guard.evaluate(&ctx).expect("ok"), Verdict::Deny);
235 }
236
237 #[test]
238 fn denies_when_total_exceeded() {
239 let journal = make_journal("sess-total");
240 journal
241 .record(RecordParams {
242 tool_name: "read_file".to_string(),
243 server_id: "srv".to_string(),
244 agent_id: "agent".to_string(),
245 bytes_read: 300,
246 bytes_written: 200,
247 delegation_depth: 0,
248 allowed: true,
249 })
250 .expect("record");
251
252 let guard = DataFlowGuard::new(
253 journal,
254 DataFlowConfig {
255 max_bytes_total: Some(500),
256 ..DataFlowConfig::default()
257 },
258 );
259
260 let (request, scope, agent_id, server_id) = make_ctx();
261 let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
262 assert_eq!(guard.evaluate(&ctx).expect("ok"), Verdict::Deny);
263 }
264
265 #[test]
266 fn allows_when_under_limit() {
267 let journal = make_journal("sess-under");
268 journal
269 .record(RecordParams {
270 tool_name: "read_file".to_string(),
271 server_id: "srv".to_string(),
272 agent_id: "agent".to_string(),
273 bytes_read: 100,
274 bytes_written: 50,
275 delegation_depth: 0,
276 allowed: true,
277 })
278 .expect("record");
279
280 let guard = DataFlowGuard::new(
281 journal,
282 DataFlowConfig {
283 max_bytes_read: Some(1000),
284 max_bytes_written: Some(1000),
285 max_bytes_total: Some(2000),
286 },
287 );
288
289 let (request, scope, agent_id, server_id) = make_ctx();
290 let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
291 assert_eq!(guard.evaluate(&ctx).expect("ok"), Verdict::Allow);
292 }
293
294 #[test]
295 fn cumulative_across_multiple_entries() {
296 let journal = make_journal("sess-cumulative");
297 for _ in 0..5 {
298 journal
299 .record(RecordParams {
300 tool_name: "read_file".to_string(),
301 server_id: "srv".to_string(),
302 agent_id: "agent".to_string(),
303 bytes_read: 200,
304 bytes_written: 0,
305 delegation_depth: 0,
306 allowed: true,
307 })
308 .expect("record");
309 }
310 let guard = DataFlowGuard::new(
313 journal,
314 DataFlowConfig {
315 max_bytes_read: Some(999),
316 ..DataFlowConfig::default()
317 },
318 );
319
320 let (request, scope, agent_id, server_id) = make_ctx();
321 let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
322 assert_eq!(guard.evaluate(&ctx).expect("ok"), Verdict::Deny);
323 }
324}