1use crate::core::event::{Event, EventKind};
5use crate::metrics::types::ToolSpanSample;
6use crate::store::event_index::{
7 paths_from_event_payload, rules_from_event_json, skills_from_event_json,
8};
9use crate::store::tool_span_index::{
10 SpanBuilder, ToolSpanRecord, find_open_same_tool, find_open_without_call, hook_kind, hook_tool,
11 match_span_id, pick_i64, pick_u32, span_start, synthetic_span_id,
12};
13use std::collections::{BTreeMap, HashMap, HashSet};
14
15pub const DEFAULT_ORPHAN_TTL_MS: u64 = 60 * 60 * 1_000;
16
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct OpenSpan {
19 pub(crate) inner: SpanBuilder,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct ClosedSpan {
24 pub record: ToolSpanRecord,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub enum ProjectorEvent {
29 SpanClosed(ToolSpanRecord, ToolSpanSample),
30 SpanPatched(ToolSpanRecord),
31 FileTouched { session: String, path: String },
32 SkillUsed { session: String, skill: String },
33 RuleUsed { session: String, rule: String },
34}
35
36#[derive(Debug, Default)]
37pub struct Projector {
38 open_spans: HashMap<String, SpanBuilder>,
39 open_order: HashMap<String, Vec<String>>,
40 closed_spans: HashMap<String, BTreeMap<String, ToolSpanRecord>>,
41 file_touch: HashMap<String, HashSet<String>>,
42 skill_use: HashMap<String, HashSet<String>>,
43 rule_use: HashMap<String, HashSet<String>>,
44 last_seq: HashMap<String, u64>,
45}
46
47impl Projector {
48 pub fn apply(&mut self, evt: &Event) -> Vec<ProjectorEvent> {
49 let mut out = self.apply_derived(evt);
50 if !matches!(
51 evt.kind,
52 EventKind::ToolCall | EventKind::ToolResult | EventKind::Hook
53 ) {
54 self.last_seq.insert(evt.session_id.clone(), evt.seq);
55 return out;
56 }
57 match evt.kind {
58 EventKind::ToolCall => self.apply_tool_call(evt),
59 EventKind::ToolResult => out.extend(self.apply_tool_result(evt)),
60 EventKind::Hook => out.extend(self.apply_hook(evt)),
61 _ => {}
62 }
63 self.last_seq.insert(evt.session_id.clone(), evt.seq);
64 out
65 }
66
67 pub fn flush_session(&mut self, session_id: &str, _now_ms: u64) -> Vec<ProjectorEvent> {
68 let ids = self
69 .open_order
70 .remove(session_id)
71 .unwrap_or_default()
72 .into_iter()
73 .filter(|id| self.open_spans.contains_key(id))
74 .collect::<Vec<_>>();
75 let mut out = Vec::new();
76 for id in ids {
77 if let Some(span) = self.open_spans.remove(&id) {
78 out.extend(self.close_span(span));
79 }
80 }
81 out
82 }
83
84 pub fn flush_expired(&mut self, now_ms: u64, ttl_ms: u64) -> Vec<ProjectorEvent> {
85 let expired = self
86 .open_spans
87 .iter()
88 .filter_map(|(id, span)| {
89 let started = span_start(span)?;
90 (now_ms.saturating_sub(started) > ttl_ms).then(|| id.clone())
91 })
92 .collect::<Vec<_>>();
93 let mut out = Vec::new();
94 for id in expired {
95 let Some(span) = self.open_spans.remove(&id) else {
96 continue;
97 };
98 if let Some(order) = self.open_order.get_mut(&span.session_id) {
99 order.retain(|open_id| open_id != &id);
100 }
101 out.extend(self.close_span(span));
102 }
103 out
104 }
105
106 pub fn reset_session(&mut self, session_id: &str) {
107 if let Some(ids) = self.open_order.remove(session_id) {
108 for id in ids {
109 self.open_spans.remove(&id);
110 }
111 }
112 self.closed_spans.remove(session_id);
113 self.file_touch.remove(session_id);
114 self.skill_use.remove(session_id);
115 self.rule_use.remove(session_id);
116 self.last_seq.remove(session_id);
117 }
118
119 pub fn last_seq(&self, session_id: &str) -> Option<u64> {
120 self.last_seq.get(session_id).copied()
121 }
122
123 fn apply_derived(&mut self, evt: &Event) -> Vec<ProjectorEvent> {
124 let mut out = Vec::new();
125 let session = &evt.session_id;
126 for path in paths_from_event_payload(&evt.payload) {
127 if self
128 .file_touch
129 .entry(session.clone())
130 .or_default()
131 .insert(path.clone())
132 {
133 out.push(ProjectorEvent::FileTouched {
134 session: session.clone(),
135 path,
136 });
137 }
138 }
139 for skill in skills_from_event_json(&evt.payload) {
140 if self
141 .skill_use
142 .entry(session.clone())
143 .or_default()
144 .insert(skill.clone())
145 {
146 out.push(ProjectorEvent::SkillUsed {
147 session: session.clone(),
148 skill,
149 });
150 }
151 }
152 for rule in rules_from_event_json(&evt.payload) {
153 if self
154 .rule_use
155 .entry(session.clone())
156 .or_default()
157 .insert(rule.clone())
158 {
159 out.push(ProjectorEvent::RuleUsed {
160 session: session.clone(),
161 rule,
162 });
163 }
164 }
165 out
166 }
167
168 fn apply_tool_call(&mut self, event: &Event) {
169 let tool = event.tool.clone();
170 let existing = tool
171 .as_deref()
172 .and_then(|name| find_open_without_call(&self.open_spans, self.order(event), name));
173 let span_id = event
174 .tool_call_id
175 .clone()
176 .unwrap_or_else(|| existing.unwrap_or_else(|| synthetic_span_id(event)));
177 let span = self
178 .open_spans
179 .entry(span_id.clone())
180 .or_insert_with(|| SpanBuilder {
181 span_id: span_id.clone(),
182 session_id: event.session_id.clone(),
183 tool: tool.clone(),
184 tool_call_id: event.tool_call_id.clone(),
185 ..Default::default()
186 });
187 span.tool = tool;
188 span.tool_call_id = event.tool_call_id.clone();
189 span.call_start_ms = Some(event.ts_ms);
190 span.call_start_exact = event.ts_exact;
191 span.tokens_in = pick_u32(span.tokens_in, event.tokens_in);
192 span.tokens_out = pick_u32(span.tokens_out, event.tokens_out);
193 span.reasoning_tokens = pick_u32(span.reasoning_tokens, event.reasoning_tokens);
194 span.cost_usd_e6 = pick_i64(span.cost_usd_e6, event.cost_usd_e6);
195 span.paths.extend(paths_from_event_payload(&event.payload));
196 span.has_call = true;
197 self.push_open_order(&event.session_id, &span_id);
198 }
199
200 fn apply_tool_result(&mut self, event: &Event) -> Vec<ProjectorEvent> {
201 let Some(span_id) = match_span_id(event, &self.open_spans, self.order(event)) else {
202 return Vec::new();
203 };
204 let Some(span) = self.open_spans.get_mut(&span_id) else {
205 return Vec::new();
206 };
207 span.result_end_ms = Some(event.ts_ms);
208 span.result_end_exact = event.ts_exact;
209 span.tokens_in = pick_u32(span.tokens_in, event.tokens_in);
210 span.tokens_out = pick_u32(span.tokens_out, event.tokens_out);
211 span.reasoning_tokens = pick_u32(span.reasoning_tokens, event.reasoning_tokens);
212 span.cost_usd_e6 = pick_i64(span.cost_usd_e6, event.cost_usd_e6);
213 span.paths.extend(paths_from_event_payload(&event.payload));
214 span.has_end = true;
215 self.remove_open(&event.session_id, &span_id)
216 .map(|span| self.close_span(span))
217 .unwrap_or_default()
218 }
219
220 fn apply_hook(&mut self, event: &Event) -> Vec<ProjectorEvent> {
221 let Some(kind) = hook_kind(&event.payload) else {
222 return Vec::new();
223 };
224 let tool = hook_tool(&event.payload);
225 let span_id = event
226 .tool_call_id
227 .clone()
228 .or_else(|| {
229 tool.as_deref()
230 .and_then(|name| find_open_same_tool(&self.open_spans, self.order(event), name))
231 })
232 .unwrap_or_else(|| synthetic_span_id(event));
233 let span = self
234 .open_spans
235 .entry(span_id.clone())
236 .or_insert_with(|| SpanBuilder {
237 span_id: span_id.clone(),
238 session_id: event.session_id.clone(),
239 tool: tool.clone(),
240 tool_call_id: event.tool_call_id.clone(),
241 ..Default::default()
242 });
243 span.tool = span.tool.clone().or(tool);
244 span.tool_call_id = span.tool_call_id.clone().or(event.tool_call_id.clone());
245 span.paths.extend(paths_from_event_payload(&event.payload));
246 match kind {
247 "pre" => span.hook_start_ms = Some(event.ts_ms),
248 "post" => {
249 span.hook_end_ms = Some(event.ts_ms);
250 span.has_end = true;
251 }
252 _ => {}
253 }
254 self.push_open_order(&event.session_id, &span_id);
255 if kind == "post" {
256 return self
257 .remove_open(&event.session_id, &span_id)
258 .map(|span| self.close_span(span))
259 .unwrap_or_default();
260 }
261 Vec::new()
262 }
263
264 fn close_span(&mut self, mut span: SpanBuilder) -> Vec<ProjectorEvent> {
265 let session_id = span.session_id.clone();
266 span.parent_span_id = None;
267 span.depth = 0;
268 span.subtree_cost_usd_e6 = span.cost_usd_e6;
269 span.subtree_token_count = span.tokens_in.map(|i| i + span.tokens_out.unwrap_or(0));
270 let mut record = ToolSpanRecord::from_builder(&span);
271 let before = self
272 .closed_spans
273 .get(&session_id)
274 .cloned()
275 .unwrap_or_default();
276 self.closed_spans
277 .entry(session_id.clone())
278 .or_default()
279 .insert(record.span_id.clone(), record.clone());
280 self.recompute_session_tree(&session_id);
281 record = self
282 .closed_spans
283 .get(&session_id)
284 .and_then(|spans| spans.get(&record.span_id))
285 .cloned()
286 .unwrap_or(record);
287 let sample = ToolSpanSample::from(&record);
288 let mut out = vec![ProjectorEvent::SpanClosed(record.clone(), sample)];
289 if let Some(after) = self.closed_spans.get(&session_id) {
290 for (id, span) in after {
291 if id == &record.span_id {
292 continue;
293 }
294 if before.get(id) != Some(span) {
295 out.push(ProjectorEvent::SpanPatched(span.clone()));
296 }
297 }
298 }
299 out
300 }
301
302 fn recompute_session_tree(&mut self, session_id: &str) {
303 let Some(map) = self.closed_spans.get_mut(session_id) else {
304 return;
305 };
306 let mut spans = map
307 .values()
308 .map(record_to_builder)
309 .collect::<Vec<SpanBuilder>>();
310 crate::store::tool_span_index::assign_parents(&mut spans);
311 crate::store::tool_span_index::compute_subtree_costs(&mut spans);
312 map.clear();
313 for span in spans {
314 let record = ToolSpanRecord::from_builder(&span);
315 map.insert(record.span_id.clone(), record);
316 }
317 }
318
319 fn order(&self, event: &Event) -> &[String] {
320 self.open_order
321 .get(&event.session_id)
322 .map(Vec::as_slice)
323 .unwrap_or(&[])
324 }
325
326 fn push_open_order(&mut self, session_id: &str, span_id: &str) {
327 let order = self.open_order.entry(session_id.to_string()).or_default();
328 if !order.iter().any(|id| id == span_id) {
329 order.push(span_id.to_string());
330 }
331 }
332
333 fn remove_open(&mut self, session_id: &str, span_id: &str) -> Option<SpanBuilder> {
334 if let Some(order) = self.open_order.get_mut(session_id) {
335 order.retain(|id| id != span_id);
336 }
337 self.open_spans.remove(span_id)
338 }
339}
340
341fn record_to_builder(record: &ToolSpanRecord) -> SpanBuilder {
342 let paths = record.paths.iter().cloned().collect();
343 SpanBuilder {
344 span_id: record.span_id.clone(),
345 session_id: record.session_id.clone(),
346 tool: record.tool.clone(),
347 tool_call_id: record.tool_call_id.clone(),
348 hook_start_ms: record.started_at_ms,
349 hook_end_ms: None,
350 call_start_ms: record.started_at_ms,
351 result_end_ms: record.ended_at_ms,
352 call_start_exact: record.lead_time_ms.is_some(),
353 result_end_exact: record.lead_time_ms.is_some(),
354 tokens_in: record.tokens_in,
355 tokens_out: record.tokens_out,
356 reasoning_tokens: record.reasoning_tokens,
357 cost_usd_e6: record.cost_usd_e6,
358 paths,
359 has_call: record.started_at_ms.is_some(),
360 has_end: record.ended_at_ms.is_some(),
361 parent_span_id: None,
362 depth: 0,
363 subtree_cost_usd_e6: None,
364 subtree_token_count: None,
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use crate::core::event::EventSource;
372 use serde_json::json;
373
374 fn event(seq: u64, ts_ms: u64, kind: EventKind, tool: Option<&str>) -> Event {
375 Event {
376 session_id: "s".into(),
377 seq,
378 ts_ms,
379 ts_exact: true,
380 kind,
381 source: EventSource::Tail,
382 tool: tool.map(str::to_string),
383 tool_call_id: None,
384 tokens_in: None,
385 tokens_out: None,
386 reasoning_tokens: None,
387 cost_usd_e6: None,
388 stop_reason: None,
389 latency_ms: None,
390 ttft_ms: None,
391 retry_count: None,
392 context_used_tokens: None,
393 context_max_tokens: None,
394 cache_creation_tokens: None,
395 cache_read_tokens: None,
396 system_prompt_tokens: None,
397 payload: json!({}),
398 }
399 }
400
401 fn span_closed(events: Vec<ProjectorEvent>) -> Vec<ToolSpanRecord> {
402 events
403 .into_iter()
404 .filter_map(|event| match event {
405 ProjectorEvent::SpanClosed(span, _) => Some(span),
406 _ => None,
407 })
408 .collect()
409 }
410
411 #[test]
412 fn tool_call_result_without_id_closes_span() {
413 let mut p = Projector::default();
414 p.apply(&event(0, 10, EventKind::ToolCall, Some("bash")));
415 let spans = span_closed(p.apply(&event(1, 15, EventKind::ToolResult, Some("bash"))));
416 assert_eq!(spans.len(), 1);
417 assert_eq!(spans[0].status, "done");
418 assert_eq!(spans[0].lead_time_ms, Some(5));
419 }
420
421 #[test]
422 fn hook_pre_post_matching_closes_span() {
423 let mut pre = event(0, 10, EventKind::Hook, None);
424 pre.payload = json!({"event": "PreToolUse", "tool_name": "Read"});
425 let mut post = event(1, 17, EventKind::Hook, None);
426 post.payload = json!({"event": "PostToolUse", "tool_name": "Read"});
427 let mut p = Projector::default();
428 p.apply(&pre);
429 let spans = span_closed(p.apply(&post));
430 assert_eq!(spans[0].tool.as_deref(), Some("Read"));
431 assert_eq!(spans[0].lead_time_ms, Some(7));
432 }
433
434 #[test]
435 fn flush_session_marks_open_span_orphaned() {
436 let mut p = Projector::default();
437 p.apply(&event(0, 10, EventKind::ToolCall, Some("bash")));
438 let spans = span_closed(p.flush_session("s", 100));
439 assert_eq!(spans[0].status, "orphaned");
440 assert_eq!(spans[0].ended_at_ms, None);
441 }
442
443 #[test]
444 fn flush_expired_marks_old_open_span_orphaned() {
445 let mut p = Projector::default();
446 p.apply(&event(0, 10, EventKind::ToolCall, Some("bash")));
447 let spans = span_closed(p.flush_expired(20, 5));
448 assert_eq!(spans[0].status, "orphaned");
449 }
450
451 #[test]
452 fn derived_rows_dedup_per_session() {
453 let mut e = event(0, 10, EventKind::Message, None);
454 e.payload = json!({
455 "path": "src/lib.rs",
456 "text": ".cursor/skills/tdd/SKILL.md .cursor/rules/style.mdc"
457 });
458 let mut p = Projector::default();
459 assert_eq!(p.apply(&e).len(), 3);
460 assert!(p.apply(&e).is_empty());
461 }
462
463 #[test]
464 fn parent_close_patches_existing_child() {
465 let mut p = Projector::default();
466 p.apply(&event(0, 0, EventKind::ToolCall, Some("parent")));
467 p.apply(&event(1, 10, EventKind::ToolCall, Some("child")));
468 p.apply(&event(2, 20, EventKind::ToolResult, Some("child")));
469 let out = p.apply(&event(3, 30, EventKind::ToolResult, Some("parent")));
470 assert!(
471 out.iter()
472 .any(|event| matches!(event, ProjectorEvent::SpanPatched(span) if span.depth == 1))
473 );
474 }
475
476 #[test]
477 fn reset_session_clears_accumulators() {
478 let mut p = Projector::default();
479 let mut e = event(0, 10, EventKind::Message, None);
480 e.payload = json!({"path": "src/lib.rs"});
481 assert_eq!(p.apply(&e).len(), 1);
482 p.reset_session("s");
483 assert_eq!(p.apply(&e).len(), 1);
484 }
485}