Skip to main content

kaizen/collect/tail/
cursor.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Parse Cursor agent-transcript `.jsonl` files into Events.
3//! Pure parser — no notify dependency, no IO beyond file reads.
4
5use crate::collect::model_from_json;
6use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
7use anyhow::{Context, Result};
8use serde_json::Value;
9use std::path::Path;
10
11/// Parse one `.jsonl` line. Returns `Some(Event)` for action-bearing lines;
12/// `None` for text-only or non-action lines.
13pub fn parse_cursor_line(
14    session_id: &str,
15    seq: u64,
16    base_ts: u64,
17    line: &str,
18) -> Result<Option<Event>> {
19    let v: Value = serde_json::from_str(line.trim()).context("cursor transcript: invalid JSON")?;
20    let obj = match v.as_object() {
21        Some(o) => o,
22        None => return Ok(None),
23    };
24
25    let content = obj
26        .get("message")
27        .and_then(|m| m.get("content"))
28        .and_then(|c| c.as_array());
29
30    let content = match content {
31        Some(c) => c,
32        None => return Ok(None),
33    };
34
35    let ts_ms = line_ts_ms(obj).unwrap_or(base_ts + seq * 100);
36    let ts_exact = line_ts_ms(obj).is_some();
37    let reasoning_tokens = reasoning_tokens(obj);
38
39    for block in content {
40        let block_type = block.get("type").and_then(|t| t.as_str()).unwrap_or("");
41        match block_type {
42            "tool_use" => {
43                let tool_name = block
44                    .get("name")
45                    .and_then(|n| n.as_str())
46                    .unwrap_or("")
47                    .to_string();
48                if tool_name == "TodoWrite" {
49                    return Ok(Some(todo_write_lifecycle(
50                        session_id,
51                        seq,
52                        ts_ms,
53                        ts_exact,
54                        reasoning_tokens,
55                        block,
56                    )));
57                }
58                return Ok(Some(Event {
59                    session_id: session_id.to_string(),
60                    seq,
61                    ts_ms,
62                    ts_exact,
63                    kind: EventKind::ToolCall,
64                    source: EventSource::Tail,
65                    tool: Some(tool_name),
66                    tool_call_id: block
67                        .get("id")
68                        .and_then(|v| v.as_str())
69                        .map(ToOwned::to_owned),
70                    tokens_in: None,
71                    tokens_out: None,
72                    reasoning_tokens,
73                    cost_usd_e6: None,
74                    stop_reason: None,
75                    latency_ms: None,
76                    ttft_ms: None,
77                    retry_count: None,
78                    context_used_tokens: None,
79                    context_max_tokens: None,
80                    cache_creation_tokens: None,
81                    cache_read_tokens: None,
82                    system_prompt_tokens: None,
83                    payload: block.clone(),
84                }));
85            }
86            "tool_result" => {
87                return Ok(Some(Event {
88                    session_id: session_id.to_string(),
89                    seq,
90                    ts_ms,
91                    ts_exact,
92                    kind: EventKind::ToolResult,
93                    source: EventSource::Tail,
94                    tool: None,
95                    tool_call_id: block
96                        .get("tool_use_id")
97                        .and_then(|v| v.as_str())
98                        .map(ToOwned::to_owned),
99                    tokens_in: None,
100                    tokens_out: None,
101                    reasoning_tokens,
102                    cost_usd_e6: None,
103                    stop_reason: None,
104                    latency_ms: None,
105                    ttft_ms: None,
106                    retry_count: None,
107                    context_used_tokens: None,
108                    context_max_tokens: None,
109                    cache_creation_tokens: None,
110                    cache_read_tokens: None,
111                    system_prompt_tokens: None,
112                    payload: block.clone(),
113                }));
114            }
115            _ => {}
116        }
117    }
118    Ok(None)
119}
120
121fn todo_counts(input: &Value) -> (u32, u32, u32) {
122    let Some(arr) = input.get("todos").and_then(|t| t.as_array()) else {
123        return (0, 0, 0);
124    };
125    let mut comp = 0u32;
126    let mut canc = 0u32;
127    for t in arr {
128        match t.get("status").and_then(|s| s.as_str()).unwrap_or("") {
129            "completed" => comp += 1,
130            "cancelled" => canc += 1,
131            _ => {}
132        }
133    }
134    (arr.len() as u32, comp, canc)
135}
136
137fn todo_write_lifecycle(
138    session_id: &str,
139    seq: u64,
140    ts_ms: u64,
141    ts_exact: bool,
142    reasoning_tokens: Option<u32>,
143    block: &Value,
144) -> Event {
145    let input = block.get("input").unwrap_or(block);
146    let (total, comp, canc) = todo_counts(input);
147    Event {
148        session_id: session_id.to_string(),
149        seq,
150        ts_ms,
151        ts_exact,
152        kind: EventKind::Lifecycle,
153        source: EventSource::Tail,
154        tool: Some("TodoWrite".into()),
155        tool_call_id: block
156            .get("id")
157            .and_then(|v| v.as_str())
158            .map(ToOwned::to_owned),
159        tokens_in: None,
160        tokens_out: None,
161        reasoning_tokens,
162        cost_usd_e6: None,
163        stop_reason: None,
164        latency_ms: None,
165        ttft_ms: None,
166        retry_count: None,
167        context_used_tokens: None,
168        context_max_tokens: None,
169        cache_creation_tokens: None,
170        cache_read_tokens: None,
171        system_prompt_tokens: None,
172        payload: serde_json::json!({
173            "type": "todo_write",
174            "todos_total": total,
175            "todos_completed": comp,
176            "todos_cancelled": canc,
177        }),
178    }
179}
180
181fn line_ts_ms(obj: &serde_json::Map<String, Value>) -> Option<u64> {
182    if let Some(t) = ["timestamp_ms", "ts_ms", "created_at_ms"]
183        .iter()
184        .find_map(|k| obj.get(*k).and_then(|v| v.as_u64()))
185    {
186        return Some(t);
187    }
188    // Cursor often omits per-line times; `timestamp` may be seconds (≈1e9) or ms (≈1.7e12).
189    if let Some(t) = obj.get("timestamp").and_then(|v| v.as_u64()) {
190        return Some(if t < 1_000_000_000_000 {
191            t.saturating_mul(1000)
192        } else {
193            t
194        });
195    }
196    None
197}
198
199fn reasoning_tokens(obj: &serde_json::Map<String, Value>) -> Option<u32> {
200    obj.get("usage")
201        .and_then(|u| u.get("reasoning_tokens"))
202        .and_then(|v| v.as_u64())
203        .map(|v| v as u32)
204        .or_else(|| {
205            obj.get("tokens")
206                .and_then(|u| u.get("reasoningTokens"))
207                .and_then(|v| v.as_u64())
208                .map(|v| v as u32)
209        })
210}
211
212fn file_mtime_ms(path: &Path) -> u64 {
213    path.metadata()
214        .ok()
215        .and_then(|m| m.modified().ok())
216        .map(|t| {
217            t.duration_since(std::time::UNIX_EPOCH)
218                .unwrap_or_default()
219                .as_millis() as u64
220        })
221        .unwrap_or(0)
222}
223
224/// Read every `*.jsonl` directly under `dir` (sorted by name) and parse into events.
225/// First `model` (or supported nested field) found in any line is returned for the session.
226fn scan_jsonl_in_dir(dir: &Path, session_id: &str) -> Result<(Vec<Event>, Option<String>)> {
227    // Transcript lines often omit `timestamp_ms`; align synthetic times with the session
228    // dir mtime (same as `SessionRecord.started_at_ms`) so retro windows and queries match.
229    let base_ts = super::dir_mtime_ms(dir);
230    let mut entries: Vec<_> = std::fs::read_dir(dir)
231        .with_context(|| format!("read dir: {}", dir.display()))?
232        .filter_map(|e| e.ok())
233        .filter(|e| e.path().extension().map(|x| x == "jsonl").unwrap_or(false))
234        .collect();
235    entries.sort_by_key(|e| e.file_name());
236
237    let mut events = Vec::new();
238    let mut seq: u64 = 0;
239    let mut model: Option<String> = None;
240    for entry in entries {
241        let content = std::fs::read_to_string(entry.path())?;
242        for line in content.lines() {
243            if line.trim().is_empty() {
244                continue;
245            }
246            if model.is_none() {
247                model = model_from_json::from_line(line);
248            }
249            if let Some(ev) = parse_cursor_line(session_id, seq, base_ts, line)? {
250                events.push(ev);
251                seq += 1;
252            } else {
253                seq += 1;
254            }
255        }
256    }
257    Ok((events, model))
258}
259
260/// Parse a single transcript `.jsonl` file into events.
261fn scan_jsonl_file(path: &Path, session_id: &str) -> Result<(Vec<Event>, Option<String>)> {
262    let base_ts = file_mtime_ms(path);
263    let content =
264        std::fs::read_to_string(path).with_context(|| format!("read file: {}", path.display()))?;
265    let mut events = Vec::new();
266    let mut seq: u64 = 0;
267    let mut model: Option<String> = None;
268    for line in content.lines() {
269        if line.trim().is_empty() {
270            continue;
271        }
272        if model.is_none() {
273            model = model_from_json::from_line(line);
274        }
275        if let Some(ev) = parse_cursor_line(session_id, seq, base_ts, line)? {
276            events.push(ev);
277            seq += 1;
278        } else {
279            seq += 1;
280        }
281    }
282    Ok((events, model))
283}
284
285fn cursor_workspace_for_session_dir(dir: &Path) -> String {
286    dir.parent()
287        .and_then(|p| p.parent())
288        .map(|p| p.to_string_lossy().to_string())
289        .unwrap_or_default()
290}
291
292/// Main session plus one session per `subagents/*.jsonl` (Cursor subagent transcripts).
293pub fn scan_session_dir_all(dir: &Path) -> Result<Vec<(SessionRecord, Vec<Event>)>> {
294    let session_id = dir
295        .file_name()
296        .and_then(|n| n.to_str())
297        .unwrap_or("")
298        .to_string();
299
300    let workspace = cursor_workspace_for_session_dir(dir);
301
302    let (main_events, main_model) = scan_jsonl_in_dir(dir, &session_id)?;
303
304    let main_record = SessionRecord {
305        id: session_id.clone(),
306        agent: "cursor".to_string(),
307        model: main_model,
308        workspace: workspace.clone(),
309        started_at_ms: crate::collect::tail::dir_mtime_ms(dir),
310        ended_at_ms: None,
311        status: SessionStatus::Done,
312        trace_path: dir.to_string_lossy().to_string(),
313        start_commit: None,
314        end_commit: None,
315        branch: None,
316        dirty_start: None,
317        dirty_end: None,
318        repo_binding_source: None,
319        prompt_fingerprint: None,
320        parent_session_id: None,
321        agent_version: None,
322        os: None,
323        arch: None,
324        repo_file_count: None,
325        repo_total_loc: None,
326    };
327
328    let mut out = vec![(main_record, main_events)];
329
330    let subagents = dir.join("subagents");
331    if subagents.is_dir() {
332        let mut subs: Vec<_> = std::fs::read_dir(&subagents)
333            .with_context(|| format!("read dir: {}", subagents.display()))?
334            .filter_map(|e| e.ok())
335            .filter(|e| e.path().extension().map(|x| x == "jsonl").unwrap_or(false))
336            .collect();
337        subs.sort_by_key(|e| e.file_name());
338
339        for entry in subs {
340            let path = entry.path();
341            let sub_id = path
342                .file_stem()
343                .and_then(|s| s.to_str())
344                .unwrap_or("")
345                .to_string();
346            if sub_id.is_empty() {
347                continue;
348            }
349            let (events, sub_model) = scan_jsonl_file(&path, &sub_id)?;
350            let record = SessionRecord {
351                id: sub_id.clone(),
352                agent: "cursor".to_string(),
353                model: sub_model,
354                workspace: workspace.clone(),
355                started_at_ms: file_mtime_ms(&path),
356                ended_at_ms: None,
357                status: SessionStatus::Done,
358                trace_path: path.to_string_lossy().to_string(),
359                start_commit: None,
360                end_commit: None,
361                branch: None,
362                dirty_start: None,
363                dirty_end: None,
364                repo_binding_source: None,
365                prompt_fingerprint: None,
366                parent_session_id: Some(session_id.clone()),
367                agent_version: None,
368                os: None,
369                arch: None,
370                repo_file_count: None,
371                repo_total_loc: None,
372            };
373            out.push((record, events));
374        }
375    }
376
377    Ok(out)
378}
379
380/// Walk all `.jsonl` files directly under `dir`; return inferred `SessionRecord` + events.
381///
382/// Session id = dir name (last path component).
383/// Agent = "cursor". workspace = parent of parent (assuming `.../agent-transcripts/<id>`).
384/// status = Done (static dir assumed completed).
385///
386/// Does not include `subagents/*.jsonl`; use [`scan_session_dir_all`] for full ingestion.
387pub fn scan_session_dir(dir: &Path) -> Result<(SessionRecord, Vec<Event>)> {
388    let session_id = dir
389        .file_name()
390        .and_then(|n| n.to_str())
391        .unwrap_or("")
392        .to_string();
393    let workspace = cursor_workspace_for_session_dir(dir);
394    let (events, model) = scan_jsonl_in_dir(dir, &session_id)?;
395    let record = SessionRecord {
396        id: session_id.clone(),
397        agent: "cursor".to_string(),
398        model,
399        workspace,
400        started_at_ms: crate::collect::tail::dir_mtime_ms(dir),
401        ended_at_ms: None,
402        status: SessionStatus::Done,
403        trace_path: dir.to_string_lossy().to_string(),
404        start_commit: None,
405        end_commit: None,
406        branch: None,
407        dirty_start: None,
408        dirty_end: None,
409        repo_binding_source: None,
410        prompt_fingerprint: None,
411        parent_session_id: None,
412        agent_version: None,
413        os: None,
414        arch: None,
415        repo_file_count: None,
416        repo_total_loc: None,
417    };
418    Ok((record, events))
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424
425    const TOOL_USE_LINE: &str = r#"{"role":"assistant","message":{"content":[{"type":"tool_use","id":"toolu_01","name":"read_file","input":{"path":"src/main.rs"}}]}}"#;
426    const TOOL_RESULT_LINE: &str = r#"{"role":"user","message":{"content":[{"type":"tool_result","tool_use_id":"toolu_01","content":[{"type":"text","text":"fn main() {}"}]}]}}"#;
427    const TEXT_ONLY_LINE: &str =
428        r#"{"role":"assistant","message":{"content":[{"type":"text","text":"hello"}]}}"#;
429
430    #[test]
431    fn parse_tool_use() {
432        let ev = parse_cursor_line("s1", 0, 0, TOOL_USE_LINE)
433            .unwrap()
434            .unwrap();
435        assert_eq!(ev.kind, EventKind::ToolCall);
436        assert_eq!(ev.tool.as_deref(), Some("read_file"));
437        assert_eq!(ev.tool_call_id.as_deref(), Some("toolu_01"));
438        assert_eq!(ev.session_id, "s1");
439    }
440
441    #[test]
442    fn parse_tool_result() {
443        let ev = parse_cursor_line("s1", 1, 0, TOOL_RESULT_LINE)
444            .unwrap()
445            .unwrap();
446        assert_eq!(ev.kind, EventKind::ToolResult);
447        assert_eq!(ev.seq, 1);
448        assert_eq!(ev.tool_call_id.as_deref(), Some("toolu_01"));
449    }
450
451    #[test]
452    fn text_only_returns_none() {
453        let result = parse_cursor_line("s1", 2, 0, TEXT_ONLY_LINE).unwrap();
454        assert!(result.is_none());
455    }
456
457    #[test]
458    fn ts_ms_synthesized() {
459        let ev = parse_cursor_line("s1", 3, 1000, TOOL_USE_LINE)
460            .unwrap()
461            .unwrap();
462        assert_eq!(ev.ts_ms, 1000 + 3 * 100);
463    }
464
465    #[test]
466    fn scan_fixture_dir() {
467        let fixture_dir =
468            std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/cursor");
469        let (record, events) = scan_session_dir(&fixture_dir).unwrap();
470        assert_eq!(record.agent, "cursor");
471        assert_eq!(record.model.as_deref(), Some("Test Fixture Model"));
472        assert_eq!(record.status, SessionStatus::Done);
473        assert!(!events.is_empty(), "expected events from fixture files");
474        assert!(events.iter().any(|e| e.kind == EventKind::ToolCall));
475        assert!(events.iter().any(|e| e.kind == EventKind::ToolResult));
476    }
477
478    #[test]
479    fn scan_session_dir_all_includes_subagents() {
480        let fixture_dir =
481            std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/cursor");
482        let sessions = scan_session_dir_all(&fixture_dir).unwrap();
483        assert!(
484            sessions.len() >= 2,
485            "expected main session + subagent fixture"
486        );
487        let sub_id = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
488        let sub = sessions
489            .iter()
490            .find(|(r, _)| r.id == sub_id)
491            .expect("subagent session");
492        assert_eq!(sub.0.agent, "cursor");
493        assert!(
494            sub.0.trace_path.ends_with(".jsonl"),
495            "subagent trace_path should be file path"
496        );
497        assert!(
498            sub.1.iter().any(|e| e.tool.as_deref() == Some("grep")),
499            "subagent tool call"
500        );
501    }
502}