1use std::collections::BTreeMap;
7
8use serde::{Deserialize, Serialize};
9
10use super::event::{EventType, SessionEvent};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct FileAccess {
15 pub file_path: String,
16 pub agent_instance_id: String,
17 pub timestamp: String,
18 #[serde(skip_serializing_if = "Option::is_none")]
19 pub digest: Option<String>,
20 #[serde(default, skip_serializing_if = "Option::is_none")]
22 pub operation: Option<String>,
23 #[serde(default, skip_serializing_if = "Option::is_none")]
24 pub additions: Option<u32>,
25 #[serde(default, skip_serializing_if = "Option::is_none")]
26 pub deletions: Option<u32>,
27 #[serde(default, skip_serializing_if = "Option::is_none")]
43 pub source: Option<String>,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct PortAccess {
49 pub port: u16,
50 pub agent_instance_id: String,
51 pub timestamp: String,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub protocol: Option<String>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct NetworkConnection {
59 pub destination: String,
60 #[serde(skip_serializing_if = "Option::is_none")]
61 pub port: Option<u16>,
62 pub agent_instance_id: String,
63 pub timestamp: String,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct ProcessExecution {
69 pub process_name: String,
70 pub agent_instance_id: String,
71 pub started_at: String,
72 #[serde(skip_serializing_if = "Option::is_none")]
73 pub exit_code: Option<i32>,
74 #[serde(skip_serializing_if = "Option::is_none")]
75 pub duration_ms: Option<u64>,
76 #[serde(default, skip_serializing_if = "Option::is_none")]
78 pub command: Option<String>,
79 #[serde(default, skip_serializing_if = "Option::is_none")]
82 pub source: Option<String>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct ToolInvocation {
88 pub tool_name: String,
89 pub agent_instance_id: String,
90 pub timestamp: String,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub duration_ms: Option<u64>,
93}
94
95#[derive(Debug, Clone, Default, Serialize, Deserialize)]
97pub struct SideEffects {
98 pub files_read: Vec<FileAccess>,
99 pub files_written: Vec<FileAccess>,
100 pub ports_opened: Vec<PortAccess>,
101 pub network_connections: Vec<NetworkConnection>,
102 pub processes: Vec<ProcessExecution>,
103 pub tool_invocations: Vec<ToolInvocation>,
104}
105
106impl SideEffects {
107 pub fn from_events(events: &[SessionEvent]) -> Self {
109 let mut se = SideEffects::default();
110
111 let mut started_processes: BTreeMap<(String, String), usize> = BTreeMap::new();
114
115 for event in events {
116 match &event.event_type {
117 EventType::AgentReadFile { file_path, digest } => {
118 se.files_read.push(FileAccess {
119 file_path: file_path.clone(),
120 agent_instance_id: event.agent_instance_id.clone(),
121 timestamp: event.timestamp.clone(),
122 digest: digest.clone(),
123 operation: None,
124 additions: None,
125 deletions: None,
126 source: Some(source_from_meta(event, "hook")),
127 });
128 }
129
130 EventType::AgentWroteFile { file_path, digest, operation, additions, deletions } => {
131 se.files_written.push(FileAccess {
132 file_path: file_path.clone(),
133 agent_instance_id: event.agent_instance_id.clone(),
134 timestamp: event.timestamp.clone(),
135 digest: digest.clone(),
136 operation: operation.clone(),
137 additions: *additions,
138 deletions: *deletions,
139 source: Some(source_from_meta(event, "hook")),
140 });
141 }
142
143 EventType::AgentOpenedPort { port, protocol } => {
144 se.ports_opened.push(PortAccess {
145 port: *port,
146 agent_instance_id: event.agent_instance_id.clone(),
147 timestamp: event.timestamp.clone(),
148 protocol: protocol.clone(),
149 });
150 }
151
152 EventType::AgentConnectedNetwork { destination, port } => {
153 se.network_connections.push(NetworkConnection {
154 destination: destination.clone(),
155 port: *port,
156 agent_instance_id: event.agent_instance_id.clone(),
157 timestamp: event.timestamp.clone(),
158 });
159 }
160
161 EventType::AgentStartedProcess { process_name, pid: _, command } => {
162 let idx = se.processes.len();
163 se.processes.push(ProcessExecution {
164 process_name: process_name.clone(),
165 agent_instance_id: event.agent_instance_id.clone(),
166 started_at: event.timestamp.clone(),
167 exit_code: None,
168 duration_ms: None,
169 command: command.clone(),
170 source: Some(source_from_meta(event, "hook")),
171 });
172 started_processes.insert(
173 (event.agent_instance_id.clone(), process_name.clone()),
174 idx,
175 );
176 }
177
178 EventType::AgentCompletedProcess { process_name, exit_code, duration_ms, command } => {
179 let key = (event.agent_instance_id.clone(), process_name.clone());
180 if let Some(&idx) = started_processes.get(&key) {
181 if let Some(proc) = se.processes.get_mut(idx) {
182 proc.exit_code = *exit_code;
183 proc.duration_ms = *duration_ms;
184 if proc.command.is_none() {
185 proc.command = command.clone();
186 }
187 }
188 } else {
189 se.processes.push(ProcessExecution {
190 process_name: process_name.clone(),
191 agent_instance_id: event.agent_instance_id.clone(),
192 started_at: event.timestamp.clone(),
193 exit_code: *exit_code,
194 duration_ms: *duration_ms,
195 command: command.clone(),
196 source: Some(source_from_meta(event, "hook")),
197 });
198 }
199 }
200
201 EventType::AgentCalledTool { tool_name, duration_ms, .. } => {
202 se.tool_invocations.push(ToolInvocation {
203 tool_name: tool_name.clone(),
204 agent_instance_id: event.agent_instance_id.clone(),
205 timestamp: event.timestamp.clone(),
206 duration_ms: *duration_ms,
207 });
208
209 promote_mcp_called_tool(event, tool_name, &mut se);
226 }
227
228 _ => {}
229 }
230 }
231
232 se
233 }
234
235 pub fn summary(&self) -> SideEffectSummary {
237 SideEffectSummary {
238 files_read: self.files_read.len() as u32,
239 files_written: self.files_written.len() as u32,
240 ports_opened: self.ports_opened.len() as u32,
241 network_connections: self.network_connections.len() as u32,
242 processes: self.processes.len() as u32,
243 tool_invocations: self.tool_invocations.len() as u32,
244 }
245 }
246}
247
248#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct SideEffectSummary {
251 pub files_read: u32,
252 pub files_written: u32,
253 pub ports_opened: u32,
254 pub network_connections: u32,
255 pub processes: u32,
256 pub tool_invocations: u32,
257}
258
259fn meta_string(event: &SessionEvent, dotted_path: &str) -> Option<String> {
280 let mut cur = event.meta.as_ref()?;
281 for segment in dotted_path.split('.') {
282 cur = cur.get(segment)?;
283 }
284 cur.as_str().map(|s| s.to_string())
285}
286
287fn first_meta_string(event: &SessionEvent, paths: &[&str]) -> Option<String> {
289 for path in paths {
290 if let Some(v) = meta_string(event, path) {
291 if !v.is_empty() {
292 return Some(v);
293 }
294 }
295 }
296 None
297}
298
299fn source_from_meta(event: &SessionEvent, default: &str) -> String {
322 event
323 .meta
324 .as_ref()
325 .and_then(|m| m.get("source"))
326 .and_then(|v| v.as_str())
327 .filter(|s| !s.is_empty())
328 .map(|s| s.to_string())
329 .unwrap_or_else(|| default.to_string())
330}
331
332#[derive(Debug, Clone, Copy, PartialEq, Eq)]
337enum ToolCategory {
338 Read,
339 Write,
340 Process,
341 Unknown,
342}
343
344fn classify_tool(tool_name: &str) -> ToolCategory {
345 let t = tool_name.to_lowercase();
346 if t.contains("bash") || t.contains("shell") || t.contains("exec")
348 || t.contains("run_command") || t.contains("ran_command")
349 {
350 return ToolCategory::Process;
351 }
352 if t.contains("write") || t.contains("edit") || t.contains("create_file")
355 || t.contains("modify") || t.contains("patch") || t.contains("save_file")
356 || t.contains("delete_file") || t.contains("remove_file") || t.contains("rename_file")
357 {
358 return ToolCategory::Write;
359 }
360 if t.contains("read") || t.contains("view_file") || t.contains("cat_file")
362 || t.contains("open_file") || t.contains("get_file_contents")
363 {
364 return ToolCategory::Read;
365 }
366 ToolCategory::Unknown
367}
368
369fn promote_mcp_called_tool(
370 event: &SessionEvent,
371 tool_name: &str,
372 se: &mut SideEffects,
373) {
374 let category = classify_tool(tool_name);
375
376 let file_path = first_meta_string(event, &[
379 "tool_input.file_path",
380 "tool_input.path",
381 "tool_input.notebook_path",
382 "tool_input.target_file",
383 "file_path",
384 "path",
385 ]);
386
387 let command = first_meta_string(event, &[
389 "tool_input.command",
390 "command",
391 "tool_input.cmd",
392 "cmd",
393 ]);
394
395 match (category, file_path, command) {
396 (ToolCategory::Read, Some(p), _) => {
397 se.files_read.push(FileAccess {
398 file_path: p,
399 agent_instance_id: event.agent_instance_id.clone(),
400 timestamp: event.timestamp.clone(),
401 digest: None,
402 operation: None,
403 additions: None,
404 deletions: None,
405 source: Some("mcp".into()),
406 });
407 }
408 (ToolCategory::Write, Some(p), _) => {
409 se.files_written.push(FileAccess {
410 file_path: p,
411 agent_instance_id: event.agent_instance_id.clone(),
412 timestamp: event.timestamp.clone(),
413 digest: None,
414 operation: None,
415 additions: None,
416 deletions: None,
417 source: Some("mcp".into()),
418 });
419 }
420 (ToolCategory::Process, _, Some(cmd)) => {
421 let short = cmd.chars().take(120).collect::<String>();
424 se.processes.push(ProcessExecution {
425 process_name: short,
426 agent_instance_id: event.agent_instance_id.clone(),
427 started_at: event.timestamp.clone(),
428 exit_code: None,
429 duration_ms: None,
430 command: Some(cmd),
431 source: Some("mcp".into()),
432 });
433 }
434 (ToolCategory::Unknown, Some(p), _) => {
435 se.files_written.push(FileAccess {
441 file_path: p,
442 agent_instance_id: event.agent_instance_id.clone(),
443 timestamp: event.timestamp.clone(),
444 digest: None,
445 operation: None,
446 additions: None,
447 deletions: None,
448 source: Some("mcp".into()),
449 });
450 }
451 _ => {
452 }
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463 use crate::session::event::*;
464
465 fn evt(event_type: EventType) -> SessionEvent {
466 SessionEvent {
467 session_id: "ssn_001".into(),
468 event_id: generate_event_id(),
469 timestamp: "2026-04-05T08:00:00Z".into(),
470 sequence_no: 0,
471 trace_id: "t".into(),
472 span_id: "s".into(),
473 parent_span_id: None,
474 agent_id: "agent://test".into(),
475 agent_instance_id: "ai_1".into(),
476 agent_name: "test".into(),
477 agent_role: None,
478 host_id: "h".into(),
479 tool_runtime_id: None,
480 event_type,
481 artifact_ref: None,
482 meta: None,
483 }
484 }
485
486 #[test]
487 fn aggregates_file_and_tool_events() {
488 let events = vec![
489 evt(EventType::AgentReadFile { file_path: "src/main.rs".into(), digest: None }),
490 evt(EventType::AgentWroteFile { file_path: "src/lib.rs".into(), digest: Some("sha256:abc".into()), operation: Some("modified".into()), additions: Some(10), deletions: Some(3) }),
491 evt(EventType::AgentCalledTool { tool_name: "read_file".into(), tool_input_digest: None, tool_output_digest: None, duration_ms: Some(10) }),
492 evt(EventType::AgentCalledTool { tool_name: "write_file".into(), tool_input_digest: None, tool_output_digest: None, duration_ms: None }),
493 ];
494
495 let se = SideEffects::from_events(&events);
496 assert_eq!(se.files_read.len(), 1);
497 assert_eq!(se.files_written.len(), 1);
498 assert_eq!(se.tool_invocations.len(), 2);
499 let summary = se.summary();
500 assert_eq!(summary.tool_invocations, 2);
501 }
502
503 #[test]
504 fn matches_process_start_and_complete() {
505 let events = vec![
506 evt(EventType::AgentStartedProcess { process_name: "npm test".into(), pid: Some(1234), command: Some("npm test --runInBand".into()) }),
507 evt(EventType::AgentCompletedProcess { process_name: "npm test".into(), exit_code: Some(0), duration_ms: Some(5000), command: None }),
508 ];
509
510 let se = SideEffects::from_events(&events);
511 assert_eq!(se.processes.len(), 1);
512 assert_eq!(se.processes[0].exit_code, Some(0));
513 assert_eq!(se.processes[0].duration_ms, Some(5000));
514 }
515
516 fn called_tool_with_meta(tool_name: &str, meta: serde_json::Value) -> SessionEvent {
519 let mut e = evt(EventType::AgentCalledTool {
520 tool_name: tool_name.into(),
521 tool_input_digest: None,
522 tool_output_digest: None,
523 duration_ms: None,
524 });
525 e.meta = Some(meta);
526 e
527 }
528
529 #[test]
530 fn hook_file_events_carry_source_hook() {
531 let events = vec![
535 evt(EventType::AgentReadFile { file_path: "src/a.rs".into(), digest: None }),
536 evt(EventType::AgentWroteFile { file_path: "src/b.rs".into(), digest: None, operation: None, additions: None, deletions: None }),
537 ];
538 let se = SideEffects::from_events(&events);
539 assert_eq!(se.files_read[0].source.as_deref(), Some("hook"));
540 assert_eq!(se.files_written[0].source.as_deref(), Some("hook"));
541 }
542
543 #[test]
544 fn mcp_called_tool_with_file_path_promotes_to_files_written() {
545 let events = vec![
551 called_tool_with_meta(
552 "Edit",
553 serde_json::json!({
554 "source": "mcp-bridge",
555 "tool_input": { "file_path": "src/api/receipt.ts" },
556 }),
557 ),
558 ];
559 let se = SideEffects::from_events(&events);
560 assert_eq!(se.files_written.len(), 1, "Edit with file_path must promote to files_written");
561 assert_eq!(se.files_written[0].file_path, "src/api/receipt.ts");
562 assert_eq!(se.files_written[0].source.as_deref(), Some("mcp"));
563 assert_eq!(se.tool_invocations.len(), 1);
566 }
567
568 #[test]
569 fn mcp_read_tool_promotes_to_files_read() {
570 let events = vec![
571 called_tool_with_meta(
572 "Read",
573 serde_json::json!({ "tool_input": { "file_path": "package.json" } }),
574 ),
575 ];
576 let se = SideEffects::from_events(&events);
577 assert_eq!(se.files_read.len(), 1);
578 assert_eq!(se.files_read[0].file_path, "package.json");
579 assert_eq!(se.files_read[0].source.as_deref(), Some("mcp"));
580 }
581
582 #[test]
583 fn mcp_bash_tool_promotes_to_processes() {
584 let events = vec![
585 called_tool_with_meta(
586 "Bash",
587 serde_json::json!({ "tool_input": { "command": "bun test --run" } }),
588 ),
589 ];
590 let se = SideEffects::from_events(&events);
591 assert_eq!(se.processes.len(), 1);
592 assert_eq!(se.processes[0].command.as_deref(), Some("bun test --run"));
593 assert_eq!(se.processes[0].source.as_deref(), Some("mcp"));
594 }
595
596 #[test]
597 fn mcp_unknown_tool_with_path_defaults_to_files_written() {
598 let events = vec![
603 called_tool_with_meta(
604 "mcp__weird-vendor__do_thing",
605 serde_json::json!({ "tool_input": { "file_path": "config.toml" } }),
606 ),
607 ];
608 let se = SideEffects::from_events(&events);
609 assert_eq!(se.files_written.len(), 1);
610 assert_eq!(se.files_written[0].file_path, "config.toml");
611 assert_eq!(se.files_written[0].source.as_deref(), Some("mcp"));
612 }
613
614 #[test]
615 fn mcp_called_tool_without_meta_does_not_promote() {
616 let events = vec![
621 called_tool_with_meta("ls", serde_json::json!({"source": "mcp-bridge"})),
622 ];
623 let se = SideEffects::from_events(&events);
624 assert_eq!(se.files_read.len(), 0);
625 assert_eq!(se.files_written.len(), 0);
626 assert_eq!(se.processes.len(), 0);
627 assert_eq!(se.tool_invocations.len(), 1);
628 }
629
630 #[test]
631 fn mcp_promotion_handles_alt_path_field_names() {
632 for path_field in &["tool_input.path", "tool_input.target_file", "file_path", "path"] {
635 let mut meta_obj = serde_json::Map::new();
636 let parts: Vec<&str> = path_field.split('.').collect();
638 if parts.len() == 1 {
639 meta_obj.insert(parts[0].into(), serde_json::json!("x.txt"));
640 } else {
641 let inner = serde_json::json!({ parts[1]: "x.txt" });
642 meta_obj.insert(parts[0].into(), inner);
643 }
644 let events = vec![called_tool_with_meta("Edit", serde_json::Value::Object(meta_obj))];
645 let se = SideEffects::from_events(&events);
646 assert_eq!(
647 se.files_written.len(), 1,
648 "expected promotion via {} but got nothing", path_field,
649 );
650 }
651 }
652
653 fn evt_with_meta(et: EventType, meta: serde_json::Value) -> SessionEvent {
663 let mut e = evt(et);
664 e.meta = Some(meta);
665 e
666 }
667
668 #[test]
669 fn source_from_meta_preserves_session_event_cli() {
670 let events = vec![
674 evt_with_meta(
675 EventType::AgentWroteFile {
676 file_path: "src/x.rs".into(),
677 digest: None, operation: None, additions: None, deletions: None,
678 },
679 serde_json::json!({"source": "session-event-cli"}),
680 ),
681 ];
682 let se = SideEffects::from_events(&events);
683 assert_eq!(se.files_written[0].source.as_deref(), Some("session-event-cli"),
684 "session-event-cli must be preserved verbatim, not downgraded to hook");
685 }
686
687 #[test]
688 fn source_from_meta_preserves_daemon_atime() {
689 let events = vec![
693 evt_with_meta(
694 EventType::AgentReadFile {
695 file_path: "src/x.rs".into(),
696 digest: None,
697 },
698 serde_json::json!({"source": "daemon-atime"}),
699 ),
700 ];
701 let se = SideEffects::from_events(&events);
702 assert_eq!(se.files_read[0].source.as_deref(), Some("daemon-atime"),
703 "daemon-atime must be preserved verbatim, not downgraded to hook");
704 }
705
706 #[test]
707 fn source_from_meta_preserves_arbitrary_unknown_label() {
708 let events = vec![
713 evt_with_meta(
714 EventType::AgentWroteFile {
715 file_path: "x".into(),
716 digest: None, operation: None, additions: None, deletions: None,
717 },
718 serde_json::json!({"source": "future-bridge-v2"}),
719 ),
720 ];
721 let se = SideEffects::from_events(&events);
722 assert_eq!(se.files_written[0].source.as_deref(), Some("future-bridge-v2"));
723 }
724
725 #[test]
726 fn source_from_meta_falls_back_when_meta_source_absent() {
727 let events = vec![
733 evt(EventType::AgentWroteFile {
734 file_path: "x".into(),
735 digest: None, operation: None, additions: None, deletions: None,
736 }),
737 ];
738 let se = SideEffects::from_events(&events);
739 assert_eq!(se.files_written[0].source.as_deref(), Some("hook"));
740 }
741
742 #[test]
743 fn source_from_meta_treats_empty_string_as_absent() {
744 let events = vec![
747 evt_with_meta(
748 EventType::AgentReadFile {
749 file_path: "x".into(),
750 digest: None,
751 },
752 serde_json::json!({"source": ""}),
753 ),
754 ];
755 let se = SideEffects::from_events(&events);
756 assert_eq!(se.files_read[0].source.as_deref(), Some("hook"));
757 }
758}