Skip to main content

chio_guards/
data_flow.rs

1//! Data flow guard -- enforces cumulative bytes-read/written limits via session journal.
2//!
3//! This guard reads cumulative data flow statistics from the session journal
4//! and denies requests that would cause the session to exceed configured
5//! byte limits for reads, writes, or combined I/O.
6//!
7//! The guard fails closed: if the session journal is unavailable or returns
8//! an error, the request is denied.
9
10use std::sync::Arc;
11
12use chio_http_session::SessionJournal;
13use chio_kernel::{Guard, GuardContext, KernelError, Verdict};
14
15// ---------------------------------------------------------------------------
16// DataFlowConfig
17// ---------------------------------------------------------------------------
18
19/// Configuration for cumulative data flow limits.
20#[derive(Clone, Debug, Default)]
21pub struct DataFlowConfig {
22    /// Maximum cumulative bytes read per session. None means unlimited.
23    pub max_bytes_read: Option<u64>,
24    /// Maximum cumulative bytes written per session. None means unlimited.
25    pub max_bytes_written: Option<u64>,
26    /// Maximum cumulative bytes (read + written) per session. None means unlimited.
27    pub max_bytes_total: Option<u64>,
28}
29
30// ---------------------------------------------------------------------------
31// DataFlowGuard
32// ---------------------------------------------------------------------------
33
34/// Guard that enforces cumulative data flow limits using the session journal.
35///
36/// Reads the journal's cumulative data flow statistics and denies requests
37/// if any configured limit has been reached.
38pub struct DataFlowGuard {
39    journal: Arc<SessionJournal>,
40    config: DataFlowConfig,
41}
42
43impl DataFlowGuard {
44    /// Create a new guard with the given journal and configuration.
45    pub fn new(journal: Arc<SessionJournal>, config: DataFlowConfig) -> Self {
46        Self { journal, config }
47    }
48}
49
50impl Guard for DataFlowGuard {
51    fn name(&self) -> &str {
52        "data-flow"
53    }
54
55    fn evaluate(&self, _ctx: &GuardContext) -> Result<Verdict, KernelError> {
56        let flow = self.journal.data_flow().map_err(|e| {
57            KernelError::Internal(format!("data-flow guard journal error (fail-closed): {e}"))
58        })?;
59
60        // Check bytes read limit.
61        if let Some(max_read) = self.config.max_bytes_read {
62            if flow.total_bytes_read >= max_read {
63                return Ok(Verdict::Deny);
64            }
65        }
66
67        // Check bytes written limit.
68        if let Some(max_written) = self.config.max_bytes_written {
69            if flow.total_bytes_written >= max_written {
70                return Ok(Verdict::Deny);
71            }
72        }
73
74        // Check total I/O limit.
75        if let Some(max_total) = self.config.max_bytes_total {
76            let total = flow
77                .total_bytes_read
78                .saturating_add(flow.total_bytes_written);
79            if total >= max_total {
80                return Ok(Verdict::Deny);
81            }
82        }
83
84        Ok(Verdict::Allow)
85    }
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91    use chio_http_session::RecordParams;
92
93    fn make_journal(session_id: &str) -> Arc<SessionJournal> {
94        Arc::new(SessionJournal::new(session_id.to_string()))
95    }
96
97    fn make_ctx() -> (
98        chio_kernel::ToolCallRequest,
99        chio_core::capability::ChioScope,
100        String,
101        String,
102    ) {
103        let kp = chio_core::crypto::Keypair::generate();
104        let scope = chio_core::capability::ChioScope::default();
105        let agent_id = kp.public_key().to_hex();
106        let server_id = "srv-test".to_string();
107
108        let cap_body = chio_core::capability::CapabilityTokenBody {
109            id: "cap-test".to_string(),
110            issuer: kp.public_key(),
111            subject: kp.public_key(),
112            scope: scope.clone(),
113            issued_at: 0,
114            expires_at: u64::MAX,
115            delegation_chain: vec![],
116        };
117        let cap = chio_core::capability::CapabilityToken::sign(cap_body, &kp).expect("sign cap");
118
119        let request = chio_kernel::ToolCallRequest {
120            request_id: "req-test".to_string(),
121            capability: cap,
122            tool_name: "read_file".to_string(),
123            server_id: server_id.clone(),
124            agent_id: agent_id.clone(),
125            arguments: serde_json::json!({"path": "/app/src/main.rs"}),
126            dpop_proof: None,
127            governed_intent: None,
128            approval_token: None,
129            model_metadata: None,
130            federated_origin_kernel_id: None,
131        };
132
133        (request, scope, agent_id, server_id)
134    }
135
136    fn guard_ctx<'a>(
137        request: &'a chio_kernel::ToolCallRequest,
138        scope: &'a chio_core::capability::ChioScope,
139        agent_id: &'a String,
140        server_id: &'a String,
141    ) -> chio_kernel::GuardContext<'a> {
142        chio_kernel::GuardContext {
143            request,
144            scope,
145            agent_id,
146            server_id,
147            session_filesystem_roots: None,
148            matched_grant_index: None,
149        }
150    }
151
152    #[test]
153    fn guard_name() {
154        let journal = make_journal("sess-1");
155        let guard = DataFlowGuard::new(journal, DataFlowConfig::default());
156        assert_eq!(guard.name(), "data-flow");
157    }
158
159    #[test]
160    fn unlimited_allows_all() {
161        let journal = make_journal("sess-1");
162        // Add some data flow.
163        journal
164            .record(RecordParams {
165                tool_name: "read_file".to_string(),
166                server_id: "srv".to_string(),
167                agent_id: "agent".to_string(),
168                bytes_read: 1_000_000,
169                bytes_written: 500_000,
170                delegation_depth: 0,
171                allowed: true,
172            })
173            .expect("record");
174
175        let guard = DataFlowGuard::new(journal, DataFlowConfig::default());
176        let (request, scope, agent_id, server_id) = make_ctx();
177        let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
178        assert_eq!(guard.evaluate(&ctx).expect("ok"), Verdict::Allow);
179    }
180
181    #[test]
182    fn denies_when_bytes_read_exceeded() {
183        let journal = make_journal("sess-read");
184        journal
185            .record(RecordParams {
186                tool_name: "read_file".to_string(),
187                server_id: "srv".to_string(),
188                agent_id: "agent".to_string(),
189                bytes_read: 500,
190                bytes_written: 0,
191                delegation_depth: 0,
192                allowed: true,
193            })
194            .expect("record");
195
196        let guard = DataFlowGuard::new(
197            journal,
198            DataFlowConfig {
199                max_bytes_read: Some(500),
200                ..DataFlowConfig::default()
201            },
202        );
203
204        let (request, scope, agent_id, server_id) = make_ctx();
205        let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
206        assert_eq!(guard.evaluate(&ctx).expect("ok"), Verdict::Deny);
207    }
208
209    #[test]
210    fn denies_when_bytes_written_exceeded() {
211        let journal = make_journal("sess-write");
212        journal
213            .record(RecordParams {
214                tool_name: "write_file".to_string(),
215                server_id: "srv".to_string(),
216                agent_id: "agent".to_string(),
217                bytes_read: 0,
218                bytes_written: 1000,
219                delegation_depth: 0,
220                allowed: true,
221            })
222            .expect("record");
223
224        let guard = DataFlowGuard::new(
225            journal,
226            DataFlowConfig {
227                max_bytes_written: Some(999),
228                ..DataFlowConfig::default()
229            },
230        );
231
232        let (request, scope, agent_id, server_id) = make_ctx();
233        let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
234        assert_eq!(guard.evaluate(&ctx).expect("ok"), Verdict::Deny);
235    }
236
237    #[test]
238    fn denies_when_total_exceeded() {
239        let journal = make_journal("sess-total");
240        journal
241            .record(RecordParams {
242                tool_name: "read_file".to_string(),
243                server_id: "srv".to_string(),
244                agent_id: "agent".to_string(),
245                bytes_read: 300,
246                bytes_written: 200,
247                delegation_depth: 0,
248                allowed: true,
249            })
250            .expect("record");
251
252        let guard = DataFlowGuard::new(
253            journal,
254            DataFlowConfig {
255                max_bytes_total: Some(500),
256                ..DataFlowConfig::default()
257            },
258        );
259
260        let (request, scope, agent_id, server_id) = make_ctx();
261        let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
262        assert_eq!(guard.evaluate(&ctx).expect("ok"), Verdict::Deny);
263    }
264
265    #[test]
266    fn allows_when_under_limit() {
267        let journal = make_journal("sess-under");
268        journal
269            .record(RecordParams {
270                tool_name: "read_file".to_string(),
271                server_id: "srv".to_string(),
272                agent_id: "agent".to_string(),
273                bytes_read: 100,
274                bytes_written: 50,
275                delegation_depth: 0,
276                allowed: true,
277            })
278            .expect("record");
279
280        let guard = DataFlowGuard::new(
281            journal,
282            DataFlowConfig {
283                max_bytes_read: Some(1000),
284                max_bytes_written: Some(1000),
285                max_bytes_total: Some(2000),
286            },
287        );
288
289        let (request, scope, agent_id, server_id) = make_ctx();
290        let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
291        assert_eq!(guard.evaluate(&ctx).expect("ok"), Verdict::Allow);
292    }
293
294    #[test]
295    fn cumulative_across_multiple_entries() {
296        let journal = make_journal("sess-cumulative");
297        for _ in 0..5 {
298            journal
299                .record(RecordParams {
300                    tool_name: "read_file".to_string(),
301                    server_id: "srv".to_string(),
302                    agent_id: "agent".to_string(),
303                    bytes_read: 200,
304                    bytes_written: 0,
305                    delegation_depth: 0,
306                    allowed: true,
307                })
308                .expect("record");
309        }
310        // Total bytes_read = 1000.
311
312        let guard = DataFlowGuard::new(
313            journal,
314            DataFlowConfig {
315                max_bytes_read: Some(999),
316                ..DataFlowConfig::default()
317            },
318        );
319
320        let (request, scope, agent_id, server_id) = make_ctx();
321        let ctx = guard_ctx(&request, &scope, &agent_id, &server_id);
322        assert_eq!(guard.evaluate(&ctx).expect("ok"), Verdict::Deny);
323    }
324}