Skip to main content

kaizen/collect/tail/
cursor.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Parse Cursor agent-transcript `.jsonl` files into Events.
3//! Pure parser — no notify dependency, no IO beyond file reads.
4
5use crate::collect::model_from_json;
6use crate::core::cost::estimate_tail_event_cost_usd_e6;
7use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
8use anyhow::{Context, Result};
9use serde_json::Value;
10use std::path::Path;
11
12/// Parse one `.jsonl` line. Returns `Some(Event)` for action-bearing lines;
13/// `None` for text-only or non-action lines.
14pub fn parse_cursor_line(
15    session_id: &str,
16    seq: u64,
17    base_ts: u64,
18    line: &str,
19) -> Result<Option<Event>> {
20    let v: Value = serde_json::from_str(line.trim()).context("cursor transcript: invalid JSON")?;
21    let obj = match v.as_object() {
22        Some(o) => o,
23        None => return Ok(None),
24    };
25
26    let content = obj
27        .get("message")
28        .and_then(|m| m.get("content"))
29        .and_then(|c| c.as_array());
30
31    let content = match content {
32        Some(c) => c,
33        None => return Ok(None),
34    };
35
36    let ts_ms = line_ts_ms(obj).unwrap_or(base_ts + seq * 100);
37    let ts_exact = line_ts_ms(obj).is_some();
38    let (tokens_in, tokens_out, reasoning_tokens) = line_usage_tokens(obj);
39    let line_model = model_from_json::from_object(obj);
40    let cost_usd_e6 = estimate_tail_event_cost_usd_e6(
41        line_model.as_deref(),
42        tokens_in,
43        tokens_out,
44        reasoning_tokens,
45    );
46
47    for block in content {
48        let block_type = block.get("type").and_then(|t| t.as_str()).unwrap_or("");
49        match block_type {
50            "tool_use" => {
51                let tool_name = block
52                    .get("name")
53                    .and_then(|n| n.as_str())
54                    .unwrap_or("")
55                    .to_string();
56                if tool_name == "TodoWrite" {
57                    return Ok(Some(todo_write_lifecycle(
58                        session_id,
59                        seq,
60                        ts_ms,
61                        ts_exact,
62                        (tokens_in, tokens_out, reasoning_tokens, cost_usd_e6),
63                        block,
64                    )));
65                }
66                return Ok(Some(Event {
67                    session_id: session_id.to_string(),
68                    seq,
69                    ts_ms,
70                    ts_exact,
71                    kind: EventKind::ToolCall,
72                    source: EventSource::Tail,
73                    tool: Some(tool_name),
74                    tool_call_id: block
75                        .get("id")
76                        .and_then(|v| v.as_str())
77                        .map(ToOwned::to_owned),
78                    tokens_in,
79                    tokens_out,
80                    reasoning_tokens,
81                    cost_usd_e6,
82                    stop_reason: None,
83                    latency_ms: None,
84                    ttft_ms: None,
85                    retry_count: None,
86                    context_used_tokens: None,
87                    context_max_tokens: None,
88                    cache_creation_tokens: None,
89                    cache_read_tokens: None,
90                    system_prompt_tokens: None,
91                    payload: block.clone(),
92                }));
93            }
94            "tool_result" => {
95                return Ok(Some(Event {
96                    session_id: session_id.to_string(),
97                    seq,
98                    ts_ms,
99                    ts_exact,
100                    kind: EventKind::ToolResult,
101                    source: EventSource::Tail,
102                    tool: None,
103                    tool_call_id: block
104                        .get("tool_use_id")
105                        .and_then(|v| v.as_str())
106                        .map(ToOwned::to_owned),
107                    tokens_in: None,
108                    tokens_out: None,
109                    reasoning_tokens: None,
110                    cost_usd_e6: None,
111                    stop_reason: None,
112                    latency_ms: None,
113                    ttft_ms: None,
114                    retry_count: None,
115                    context_used_tokens: None,
116                    context_max_tokens: None,
117                    cache_creation_tokens: None,
118                    cache_read_tokens: None,
119                    system_prompt_tokens: None,
120                    payload: block.clone(),
121                }));
122            }
123            _ => {}
124        }
125    }
126    Ok(None)
127}
128
129fn todo_counts(input: &Value) -> (u32, u32, u32) {
130    let Some(arr) = input.get("todos").and_then(|t| t.as_array()) else {
131        return (0, 0, 0);
132    };
133    let mut comp = 0u32;
134    let mut canc = 0u32;
135    for t in arr {
136        match t.get("status").and_then(|s| s.as_str()).unwrap_or("") {
137            "completed" => comp += 1,
138            "cancelled" => canc += 1,
139            _ => {}
140        }
141    }
142    (arr.len() as u32, comp, canc)
143}
144
145type CursorLineUsage = (Option<u32>, Option<u32>, Option<u32>, Option<i64>);
146
147fn todo_write_lifecycle(
148    session_id: &str,
149    seq: u64,
150    ts_ms: u64,
151    ts_exact: bool,
152    usage: CursorLineUsage,
153    block: &Value,
154) -> Event {
155    let (tokens_in, tokens_out, reasoning_tokens, cost_usd_e6) = usage;
156    let input = block.get("input").unwrap_or(block);
157    let (total, comp, canc) = todo_counts(input);
158    Event {
159        session_id: session_id.to_string(),
160        seq,
161        ts_ms,
162        ts_exact,
163        kind: EventKind::Lifecycle,
164        source: EventSource::Tail,
165        tool: Some("TodoWrite".into()),
166        tool_call_id: block
167            .get("id")
168            .and_then(|v| v.as_str())
169            .map(ToOwned::to_owned),
170        tokens_in,
171        tokens_out,
172        reasoning_tokens,
173        cost_usd_e6,
174        stop_reason: None,
175        latency_ms: None,
176        ttft_ms: None,
177        retry_count: None,
178        context_used_tokens: None,
179        context_max_tokens: None,
180        cache_creation_tokens: None,
181        cache_read_tokens: None,
182        system_prompt_tokens: None,
183        payload: serde_json::json!({
184            "type": "todo_write",
185            "todos_total": total,
186            "todos_completed": comp,
187            "todos_cancelled": canc,
188        }),
189    }
190}
191
192fn line_ts_ms(obj: &serde_json::Map<String, Value>) -> Option<u64> {
193    if let Some(t) = ["timestamp_ms", "ts_ms", "created_at_ms"]
194        .iter()
195        .find_map(|k| obj.get(*k).and_then(|v| v.as_u64()))
196    {
197        return Some(t);
198    }
199    // Cursor often omits per-line times; `timestamp` may be seconds (≈1e9) or ms (≈1.7e12).
200    if let Some(t) = obj.get("timestamp").and_then(|v| v.as_u64()) {
201        return Some(if t < 1_000_000_000_000 {
202            t.saturating_mul(1000)
203        } else {
204            t
205        });
206    }
207    None
208}
209
210/// Top-level `tokens` (Cursor/Claude shape) and OpenAI-style `usage` on the line.
211fn line_usage_tokens(
212    obj: &serde_json::Map<String, Value>,
213) -> (Option<u32>, Option<u32>, Option<u32>) {
214    let mut tokens_in = None;
215    let mut tokens_out = None;
216    let mut reasoning_tokens = None;
217
218    if let Some(t) = obj.get("tokens").and_then(|x| x.as_object()) {
219        tokens_in = t
220            .get("inputTokens")
221            .and_then(|v| v.as_u64())
222            .map(|v| v as u32);
223        tokens_out = t
224            .get("outputTokens")
225            .and_then(|v| v.as_u64())
226            .map(|v| v as u32);
227        reasoning_tokens = t
228            .get("reasoningTokens")
229            .and_then(|v| v.as_u64())
230            .map(|v| v as u32);
231    }
232
233    if let Some(u) = obj.get("usage").and_then(|x| x.as_object()) {
234        tokens_in = tokens_in.or_else(|| {
235            u.get("prompt_tokens")
236                .or_else(|| u.get("input_tokens"))
237                .and_then(|v| v.as_u64())
238                .map(|v| v as u32)
239        });
240        tokens_out = tokens_out.or_else(|| {
241            u.get("completion_tokens")
242                .or_else(|| u.get("output_tokens"))
243                .and_then(|v| v.as_u64())
244                .map(|v| v as u32)
245        });
246        reasoning_tokens = reasoning_tokens.or_else(|| {
247            u.get("reasoning_tokens")
248                .and_then(|v| v.as_u64())
249                .map(|v| v as u32)
250        });
251    }
252
253    (tokens_in, tokens_out, reasoning_tokens)
254}
255
256fn file_mtime_ms(path: &Path) -> u64 {
257    path.metadata()
258        .ok()
259        .and_then(|m| m.modified().ok())
260        .map(|t| {
261            t.duration_since(std::time::UNIX_EPOCH)
262                .unwrap_or_default()
263                .as_millis() as u64
264        })
265        .unwrap_or(0)
266}
267
268/// Read every `*.jsonl` directly under `dir` (sorted by name) and parse into events.
269/// Session `model` is the last model id seen on any line that [`model_from_json::from_line`] returns.
270fn scan_jsonl_in_dir(dir: &Path, session_id: &str) -> Result<(Vec<Event>, Option<String>)> {
271    // Transcript lines often omit `timestamp_ms`; align synthetic times with the session
272    // dir mtime (same as `SessionRecord.started_at_ms`) so retro windows and queries match.
273    let base_ts = super::dir_mtime_ms(dir);
274    let mut entries: Vec<_> = std::fs::read_dir(dir)
275        .with_context(|| format!("read dir: {}", dir.display()))?
276        .filter_map(|e| e.ok())
277        .filter(|e| e.path().extension().map(|x| x == "jsonl").unwrap_or(false))
278        .collect();
279    entries.sort_by_key(|e| e.file_name());
280
281    let mut events = Vec::new();
282    let mut seq: u64 = 0;
283    let mut model: Option<String> = None;
284    for entry in entries {
285        let content = std::fs::read_to_string(entry.path())?;
286        for line in content.lines() {
287            if line.trim().is_empty() {
288                continue;
289            }
290            if let Some(m) = model_from_json::from_line(line) {
291                model = Some(m);
292            }
293            if let Some(ev) = parse_cursor_line(session_id, seq, base_ts, line)? {
294                events.push(ev);
295                seq += 1;
296            } else {
297                seq += 1;
298            }
299        }
300    }
301    Ok((events, model))
302}
303
304/// Parse a single transcript `.jsonl` file into events.
305fn scan_jsonl_file(path: &Path, session_id: &str) -> Result<(Vec<Event>, Option<String>)> {
306    let base_ts = file_mtime_ms(path);
307    let content =
308        std::fs::read_to_string(path).with_context(|| format!("read file: {}", path.display()))?;
309    let mut events = Vec::new();
310    let mut seq: u64 = 0;
311    let mut model: Option<String> = None;
312    for line in content.lines() {
313        if line.trim().is_empty() {
314            continue;
315        }
316        if let Some(m) = model_from_json::from_line(line) {
317            model = Some(m);
318        }
319        if let Some(ev) = parse_cursor_line(session_id, seq, base_ts, line)? {
320            events.push(ev);
321            seq += 1;
322        } else {
323            seq += 1;
324        }
325    }
326    Ok((events, model))
327}
328
329fn cursor_workspace_for_session_dir(dir: &Path) -> String {
330    dir.parent()
331        .and_then(|p| p.parent())
332        .map(|p| p.to_string_lossy().to_string())
333        .unwrap_or_default()
334}
335
336/// Main session plus one session per `subagents/*.jsonl` (Cursor subagent transcripts).
337pub fn scan_session_dir_all(dir: &Path) -> Result<Vec<(SessionRecord, Vec<Event>)>> {
338    let session_id = dir
339        .file_name()
340        .and_then(|n| n.to_str())
341        .unwrap_or("")
342        .to_string();
343
344    let workspace = cursor_workspace_for_session_dir(dir);
345
346    let (main_events, main_model) = scan_jsonl_in_dir(dir, &session_id)?;
347
348    let main_record = SessionRecord {
349        id: session_id.clone(),
350        agent: "cursor".to_string(),
351        model: main_model,
352        workspace: workspace.clone(),
353        started_at_ms: crate::collect::tail::dir_mtime_ms(dir),
354        ended_at_ms: None,
355        status: SessionStatus::Done,
356        trace_path: dir.to_string_lossy().to_string(),
357        start_commit: None,
358        end_commit: None,
359        branch: None,
360        dirty_start: None,
361        dirty_end: None,
362        repo_binding_source: None,
363        prompt_fingerprint: None,
364        parent_session_id: None,
365        agent_version: None,
366        os: None,
367        arch: None,
368        repo_file_count: None,
369        repo_total_loc: None,
370    };
371
372    let mut out = vec![(main_record, main_events)];
373
374    let subagents = dir.join("subagents");
375    if subagents.is_dir() {
376        let mut subs: Vec<_> = std::fs::read_dir(&subagents)
377            .with_context(|| format!("read dir: {}", subagents.display()))?
378            .filter_map(|e| e.ok())
379            .filter(|e| e.path().extension().map(|x| x == "jsonl").unwrap_or(false))
380            .collect();
381        subs.sort_by_key(|e| e.file_name());
382
383        for entry in subs {
384            let path = entry.path();
385            let sub_id = path
386                .file_stem()
387                .and_then(|s| s.to_str())
388                .unwrap_or("")
389                .to_string();
390            if sub_id.is_empty() {
391                continue;
392            }
393            let (events, sub_model) = scan_jsonl_file(&path, &sub_id)?;
394            let record = SessionRecord {
395                id: sub_id.clone(),
396                agent: "cursor".to_string(),
397                model: sub_model,
398                workspace: workspace.clone(),
399                started_at_ms: file_mtime_ms(&path),
400                ended_at_ms: None,
401                status: SessionStatus::Done,
402                trace_path: path.to_string_lossy().to_string(),
403                start_commit: None,
404                end_commit: None,
405                branch: None,
406                dirty_start: None,
407                dirty_end: None,
408                repo_binding_source: None,
409                prompt_fingerprint: None,
410                parent_session_id: Some(session_id.clone()),
411                agent_version: None,
412                os: None,
413                arch: None,
414                repo_file_count: None,
415                repo_total_loc: None,
416            };
417            out.push((record, events));
418        }
419    }
420
421    Ok(out)
422}
423
424/// Walk all `.jsonl` files directly under `dir`; return inferred `SessionRecord` + events.
425///
426/// Session id = dir name (last path component).
427/// Agent = "cursor". workspace = parent of parent (assuming `.../agent-transcripts/<id>`).
428/// status = Done (static dir assumed completed).
429///
430/// Does not include `subagents/*.jsonl`; use [`scan_session_dir_all`] for full ingestion.
431pub fn scan_session_dir(dir: &Path) -> Result<(SessionRecord, Vec<Event>)> {
432    let session_id = dir
433        .file_name()
434        .and_then(|n| n.to_str())
435        .unwrap_or("")
436        .to_string();
437    let workspace = cursor_workspace_for_session_dir(dir);
438    let (events, model) = scan_jsonl_in_dir(dir, &session_id)?;
439    let record = SessionRecord {
440        id: session_id.clone(),
441        agent: "cursor".to_string(),
442        model,
443        workspace,
444        started_at_ms: crate::collect::tail::dir_mtime_ms(dir),
445        ended_at_ms: None,
446        status: SessionStatus::Done,
447        trace_path: dir.to_string_lossy().to_string(),
448        start_commit: None,
449        end_commit: None,
450        branch: None,
451        dirty_start: None,
452        dirty_end: None,
453        repo_binding_source: None,
454        prompt_fingerprint: None,
455        parent_session_id: None,
456        agent_version: None,
457        os: None,
458        arch: None,
459        repo_file_count: None,
460        repo_total_loc: None,
461    };
462    Ok((record, events))
463}
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468
469    const TOOL_USE_LINE: &str = r#"{"role":"assistant","message":{"content":[{"type":"tool_use","id":"toolu_01","name":"read_file","input":{"path":"src/main.rs"}}]}}"#;
470    const TOOL_RESULT_LINE: &str = r#"{"role":"user","message":{"content":[{"type":"tool_result","tool_use_id":"toolu_01","content":[{"type":"text","text":"fn main() {}"}]}]}}"#;
471    const TEXT_ONLY_LINE: &str =
472        r#"{"role":"assistant","message":{"content":[{"type":"text","text":"hello"}]}}"#;
473
474    #[test]
475    fn parse_tool_use() {
476        let ev = parse_cursor_line("s1", 0, 0, TOOL_USE_LINE)
477            .unwrap()
478            .unwrap();
479        assert_eq!(ev.kind, EventKind::ToolCall);
480        assert_eq!(ev.tool.as_deref(), Some("read_file"));
481        assert_eq!(ev.tool_call_id.as_deref(), Some("toolu_01"));
482        assert_eq!(ev.session_id, "s1");
483    }
484
485    #[test]
486    fn parse_tool_result() {
487        let ev = parse_cursor_line("s1", 1, 0, TOOL_RESULT_LINE)
488            .unwrap()
489            .unwrap();
490        assert_eq!(ev.kind, EventKind::ToolResult);
491        assert_eq!(ev.seq, 1);
492        assert_eq!(ev.tool_call_id.as_deref(), Some("toolu_01"));
493    }
494
495    #[test]
496    fn text_only_returns_none() {
497        let result = parse_cursor_line("s1", 2, 0, TEXT_ONLY_LINE).unwrap();
498        assert!(result.is_none());
499    }
500
501    #[test]
502    fn ts_ms_synthesized() {
503        let ev = parse_cursor_line("s1", 3, 1000, TOOL_USE_LINE)
504            .unwrap()
505            .unwrap();
506        assert_eq!(ev.ts_ms, 1000 + 3 * 100);
507    }
508
509    #[test]
510    fn parse_tool_use_sets_cost_when_tokens_present() {
511        let p = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
512            .join("tests/fixtures/cursor/01_tool_with_tokens.jsonl");
513        let line = std::fs::read_to_string(p).unwrap();
514        let ev = parse_cursor_line("s1", 0, 0, line.trim()).unwrap().unwrap();
515        assert_eq!(ev.tool.as_deref(), Some("Read"));
516        assert!(
517            ev.cost_usd_e6.is_some_and(|c| c > 0),
518            "expected estimated cost_usd_e6"
519        );
520        assert_eq!(ev.tokens_in, Some(100));
521        assert_eq!(ev.tokens_out, Some(50));
522    }
523
524    #[test]
525    fn scan_fixture_dir() {
526        let fixture_dir =
527            std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/cursor");
528        let (record, events) = scan_session_dir(&fixture_dir).unwrap();
529        assert_eq!(record.agent, "cursor");
530        assert_eq!(record.model.as_deref(), Some("cursor-model-with-usage"));
531        assert_eq!(record.status, SessionStatus::Done);
532        assert!(!events.is_empty(), "expected events from fixture files");
533        assert!(events.iter().any(|e| e.kind == EventKind::ToolCall));
534        assert!(events.iter().any(|e| e.kind == EventKind::ToolResult));
535        assert!(
536            events.iter().any(|e| e.cost_usd_e6.is_some_and(|c| c > 0)),
537            "expected at least one cost-bearing event from usage fixture"
538        );
539    }
540
541    #[test]
542    fn scan_session_dir_all_includes_subagents() {
543        let fixture_dir =
544            std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/cursor");
545        let sessions = scan_session_dir_all(&fixture_dir).unwrap();
546        assert!(
547            sessions.len() >= 2,
548            "expected main session + subagent fixture"
549        );
550        let sub_id = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
551        let sub = sessions
552            .iter()
553            .find(|(r, _)| r.id == sub_id)
554            .expect("subagent session");
555        assert_eq!(sub.0.agent, "cursor");
556        assert!(
557            sub.0.trace_path.ends_with(".jsonl"),
558            "subagent trace_path should be file path"
559        );
560        assert!(
561            sub.1.iter().any(|e| e.tool.as_deref() == Some("grep")),
562            "subagent tool call"
563        );
564    }
565}