Skip to main content

codex_recall/
parser.rs

1use crate::redact::redact_secrets;
2use anyhow::{Context, Result};
3use serde_json::Value;
4use std::collections::HashSet;
5use std::fs::File;
6use std::io::{BufRead, BufReader};
7use std::path::{Path, PathBuf};
8use std::str::FromStr;
9
10const COMMAND_OUTPUT_LIMIT: usize = 4_000;
11const COMMAND_OUTPUT_REDACTION_WINDOW: usize = COMMAND_OUTPUT_LIMIT + 512;
12const MESSAGE_TEXT_LIMIT: usize = 20_000;
13const MESSAGE_TEXT_REDACTION_WINDOW: usize = MESSAGE_TEXT_LIMIT + 512;
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct ParsedSession {
17    pub session: SessionMetadata,
18    pub events: Vec<ParsedEvent>,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub struct SessionMetadata {
23    pub id: String,
24    pub timestamp: String,
25    pub cwd: String,
26    pub cli_version: Option<String>,
27    pub source_file_path: PathBuf,
28}
29
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct ParsedEvent {
32    pub session_id: String,
33    pub kind: EventKind,
34    pub role: Option<String>,
35    pub text: String,
36    pub command: Option<String>,
37    pub cwd: Option<String>,
38    pub exit_code: Option<i64>,
39    pub source_timestamp: Option<String>,
40    pub source_file_path: PathBuf,
41    pub source_line_number: usize,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum EventKind {
46    UserMessage,
47    AssistantMessage,
48    Command,
49}
50
51impl EventKind {
52    pub fn as_str(self) -> &'static str {
53        match self {
54            EventKind::UserMessage => "user_message",
55            EventKind::AssistantMessage => "assistant_message",
56            EventKind::Command => "command",
57        }
58    }
59
60    fn parse_kind(value: &str) -> Option<Self> {
61        match value {
62            "user_message" => Some(EventKind::UserMessage),
63            "assistant_message" => Some(EventKind::AssistantMessage),
64            "command" => Some(EventKind::Command),
65            _ => None,
66        }
67    }
68}
69
70impl FromStr for EventKind {
71    type Err = ();
72
73    fn from_str(value: &str) -> std::result::Result<Self, Self::Err> {
74        Self::parse_kind(value).ok_or(())
75    }
76}
77
78pub fn parse_session_file(path: &Path) -> Result<Option<ParsedSession>> {
79    let file = File::open(path).with_context(|| format!("open {}", path.display()))?;
80    let reader = BufReader::new(file);
81    let mut session = None;
82    let mut pending_events = Vec::new();
83
84    for (index, line) in reader.lines().enumerate() {
85        let line_number = index + 1;
86        let line = line.with_context(|| format!("read {}:{line_number}", path.display()))?;
87        if line.trim().is_empty() {
88            continue;
89        }
90
91        let record: Value = serde_json::from_str(&line)
92            .with_context(|| format!("parse json {}:{line_number}", path.display()))?;
93        let top_type = record
94            .get("type")
95            .and_then(Value::as_str)
96            .unwrap_or_default();
97        let source_timestamp = record
98            .get("timestamp")
99            .and_then(Value::as_str)
100            .map(str::to_owned);
101        let payload = record.get("payload").unwrap_or(&Value::Null);
102
103        if top_type == "session_meta" {
104            session = parse_session_meta(payload, path);
105            continue;
106        }
107
108        if let Some(event) = parse_event(
109            top_type,
110            payload,
111            path,
112            line_number,
113            source_timestamp.as_deref(),
114        ) {
115            pending_events.push(event);
116        }
117    }
118
119    let Some(session) = session else {
120        return Ok(None);
121    };
122
123    let mut seen = HashSet::new();
124    let mut events = Vec::new();
125    for mut event in pending_events {
126        let dedupe_key = format!(
127            "{}\u{1f}{}\u{1f}{}",
128            event.kind.as_str(),
129            event.role.as_deref().unwrap_or_default(),
130            event.text
131        );
132        if seen.insert(dedupe_key) {
133            event.session_id = session.id.clone();
134            events.push(event);
135        }
136    }
137
138    Ok(Some(ParsedSession { session, events }))
139}
140
141fn parse_session_meta(payload: &Value, path: &Path) -> Option<SessionMetadata> {
142    let id = payload.get("id")?.as_str()?.to_owned();
143    let timestamp = payload
144        .get("timestamp")
145        .and_then(Value::as_str)
146        .unwrap_or_default()
147        .to_owned();
148    let cwd = payload
149        .get("cwd")
150        .and_then(Value::as_str)
151        .unwrap_or_default()
152        .to_owned();
153    let cli_version = payload
154        .get("cli_version")
155        .and_then(Value::as_str)
156        .map(str::to_owned);
157
158    Some(SessionMetadata {
159        id,
160        timestamp,
161        cwd,
162        cli_version,
163        source_file_path: path.to_path_buf(),
164    })
165}
166
167fn parse_event(
168    top_type: &str,
169    payload: &Value,
170    path: &Path,
171    source_line_number: usize,
172    source_timestamp: Option<&str>,
173) -> Option<ParsedEvent> {
174    let payload_type = payload
175        .get("type")
176        .and_then(Value::as_str)
177        .unwrap_or_default();
178
179    match (top_type, payload_type) {
180        ("event_msg", "user_message") => {
181            let text = payload.get("message").and_then(Value::as_str)?.trim();
182            non_empty_text_event(
183                EventKind::UserMessage,
184                Some("user"),
185                text,
186                path,
187                source_line_number,
188                source_timestamp,
189            )
190        }
191        ("event_msg", "agent_message") => {
192            let text = payload.get("message").and_then(Value::as_str)?.trim();
193            non_empty_text_event(
194                EventKind::AssistantMessage,
195                Some("assistant"),
196                text,
197                path,
198                source_line_number,
199                source_timestamp,
200            )
201        }
202        ("event_msg", "exec_command_end") => {
203            parse_command_event(payload, path, source_line_number, source_timestamp)
204        }
205        ("response_item", "message") => {
206            let role = payload.get("role").and_then(Value::as_str)?;
207            let kind = match role {
208                "user" => EventKind::UserMessage,
209                "assistant" => EventKind::AssistantMessage,
210                _ => return None,
211            };
212            let text = extract_content_text(payload.get("content")?)?;
213            non_empty_text_event(
214                kind,
215                Some(role),
216                text.trim(),
217                path,
218                source_line_number,
219                source_timestamp,
220            )
221        }
222        _ => None,
223    }
224}
225
226fn non_empty_text_event(
227    kind: EventKind,
228    role: Option<&str>,
229    text: &str,
230    path: &Path,
231    source_line_number: usize,
232    source_timestamp: Option<&str>,
233) -> Option<ParsedEvent> {
234    if text.is_empty() {
235        return None;
236    }
237    if is_codex_preamble(text) {
238        return None;
239    }
240
241    let capped_text = cap_text(text, MESSAGE_TEXT_REDACTION_WINDOW);
242    let redacted_text = cap_text(&redact_secrets(&capped_text), MESSAGE_TEXT_LIMIT);
243
244    Some(ParsedEvent {
245        session_id: String::new(),
246        kind,
247        role: role.map(str::to_owned),
248        text: redacted_text,
249        command: None,
250        cwd: None,
251        exit_code: None,
252        source_timestamp: source_timestamp.map(str::to_owned),
253        source_file_path: path.to_path_buf(),
254        source_line_number,
255    })
256}
257
258fn is_codex_preamble(text: &str) -> bool {
259    let trimmed = text.trim_start();
260    trimmed.starts_with("# AGENTS.md instructions") || trimmed.contains("<environment_context>")
261}
262
263fn parse_command_event(
264    payload: &Value,
265    path: &Path,
266    source_line_number: usize,
267    source_timestamp: Option<&str>,
268) -> Option<ParsedEvent> {
269    let command = extract_command(payload.get("command")?)?;
270    let command = command.trim();
271    if command.is_empty() {
272        return None;
273    }
274    let redacted_command = redact_secrets(command);
275
276    let stdout = payload.get("stdout").and_then(Value::as_str).unwrap_or("");
277    let stderr = payload.get("stderr").and_then(Value::as_str).unwrap_or("");
278    let mut text = format!("$ {redacted_command}");
279    let output = payload
280        .get("aggregated_output")
281        .and_then(Value::as_str)
282        .filter(|value| !value.trim().is_empty())
283        .map(str::to_owned)
284        .unwrap_or_else(|| join_command_output(stdout, stderr));
285    if !output.is_empty() {
286        text.push('\n');
287        let capped_output = cap_text(&output, COMMAND_OUTPUT_REDACTION_WINDOW);
288        text.push_str(&cap_text(
289            &redact_secrets(&capped_output),
290            COMMAND_OUTPUT_LIMIT,
291        ));
292    }
293
294    Some(ParsedEvent {
295        session_id: String::new(),
296        kind: EventKind::Command,
297        role: None,
298        text,
299        command: Some(redacted_command),
300        cwd: payload
301            .get("cwd")
302            .and_then(Value::as_str)
303            .map(str::to_owned),
304        exit_code: payload.get("exit_code").and_then(Value::as_i64),
305        source_timestamp: source_timestamp.map(str::to_owned),
306        source_file_path: path.to_path_buf(),
307        source_line_number,
308    })
309}
310
311fn extract_command(value: &Value) -> Option<String> {
312    if let Some(command) = value.as_str() {
313        return Some(command.to_owned());
314    }
315
316    let argv = value.as_array()?;
317    let args = argv.iter().filter_map(Value::as_str).collect::<Vec<_>>();
318    if args.len() >= 3 && (args[1] == "-lc" || args[1] == "-c") {
319        return Some(args[2].to_owned());
320    }
321
322    if args.is_empty() {
323        None
324    } else {
325        Some(args.join(" "))
326    }
327}
328
329fn extract_content_text(content: &Value) -> Option<String> {
330    let parts = content.as_array()?;
331    let text = parts
332        .iter()
333        .filter_map(|part| part.get("text").and_then(Value::as_str))
334        .collect::<Vec<_>>()
335        .join("\n");
336
337    if text.trim().is_empty() {
338        None
339    } else {
340        Some(text)
341    }
342}
343
344fn join_command_output(stdout: &str, stderr: &str) -> String {
345    match (stdout.trim().is_empty(), stderr.trim().is_empty()) {
346        (true, true) => String::new(),
347        (false, true) => stdout.to_owned(),
348        (true, false) => stderr.to_owned(),
349        (false, false) => format!("{stdout}\n{stderr}"),
350    }
351}
352
353fn cap_text(text: &str, limit: usize) -> String {
354    if text.len() <= limit {
355        return text.to_owned();
356    }
357
358    let mut capped = text
359        .char_indices()
360        .take_while(|(index, _)| *index < limit)
361        .map(|(_, ch)| ch)
362        .collect::<String>();
363    capped.push_str("\n[truncated]");
364    capped
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use std::fs;
371    use std::path::PathBuf;
372
373    fn temp_jsonl(name: &str, contents: &str) -> PathBuf {
374        let dir = std::env::temp_dir().join(format!(
375            "codex-recall-parser-test-{}-{}",
376            std::process::id(),
377            name
378        ));
379        fs::create_dir_all(&dir).unwrap();
380        let path = dir.join("session.jsonl");
381        fs::write(&path, contents).unwrap();
382        path
383    }
384
385    #[test]
386    fn parses_session_metadata_and_high_signal_events() {
387        let path = temp_jsonl(
388            "basic",
389            r#"{"timestamp":"2026-04-13T01:00:00Z","type":"session_meta","payload":{"id":"session-1","timestamp":"2026-04-13T01:00:00Z","cwd":"/Users/me/project","cli_version":"0.1.0"}}
390{"timestamp":"2026-04-13T01:00:01Z","type":"event_msg","payload":{"type":"user_message","message":"Find the Sentry issue","text_elements":[],"images":[],"local_images":[]}}
391{"timestamp":"2026-04-13T01:00:02Z","type":"response_item","payload":{"type":"message","role":"assistant","content":[{"type":"output_text","text":"I found the Sentry root cause."}]}}
392{"timestamp":"2026-04-13T01:00:03Z","type":"event_msg","payload":{"type":"exec_command_end","command":"rg SENTRY","cwd":"/Users/me/project","exit_code":0,"stdout":"SENTRY_DSN=redacted\n","stderr":""}}
393"#,
394        );
395
396        let parsed = parse_session_file(&path).unwrap().unwrap();
397
398        assert_eq!(parsed.session.id, "session-1");
399        assert_eq!(parsed.session.cwd, "/Users/me/project");
400        assert_eq!(parsed.session.source_file_path, path);
401        assert_eq!(parsed.events.len(), 3);
402        assert_eq!(parsed.events[0].kind, EventKind::UserMessage);
403        assert_eq!(parsed.events[0].text, "Find the Sentry issue");
404        assert_eq!(parsed.events[0].source_line_number, 2);
405        assert_eq!(parsed.events[1].kind, EventKind::AssistantMessage);
406        assert_eq!(parsed.events[1].text, "I found the Sentry root cause.");
407        assert_eq!(parsed.events[2].kind, EventKind::Command);
408        assert_eq!(parsed.events[2].command.as_deref(), Some("rg SENTRY"));
409        assert!(parsed.events[2].text.contains("rg SENTRY"));
410        assert!(parsed.events[2].text.contains("SENTRY_DSN"));
411    }
412
413    #[test]
414    fn skips_events_without_indexable_text() {
415        let path = temp_jsonl(
416            "noise",
417            r#"{"timestamp":"2026-04-13T01:00:00Z","type":"session_meta","payload":{"id":"session-2","timestamp":"2026-04-13T01:00:00Z","cwd":"/tmp"}}
418{"timestamp":"2026-04-13T01:00:01Z","type":"event_msg","payload":{"type":"token_count","info":{"total_token_usage":{"input_tokens":10}}}}
419{"timestamp":"2026-04-13T01:00:02Z","type":"response_item","payload":{"type":"function_call","name":"exec_command","arguments":"{}","call_id":"call-1"}}
420"#,
421        );
422
423        let parsed = parse_session_file(&path).unwrap().unwrap();
424
425        assert_eq!(parsed.events.len(), 0);
426    }
427
428    #[test]
429    fn removes_exact_duplicate_transcript_events() {
430        let path = temp_jsonl(
431            "duplicates",
432            r#"{"timestamp":"2026-04-13T01:00:00Z","type":"session_meta","payload":{"id":"session-3","timestamp":"2026-04-13T01:00:00Z","cwd":"/tmp"}}
433{"timestamp":"2026-04-13T01:00:01Z","type":"event_msg","payload":{"type":"agent_message","message":"Same assistant answer."}}
434{"timestamp":"2026-04-13T01:00:01Z","type":"response_item","payload":{"type":"message","role":"assistant","content":[{"type":"output_text","text":"Same assistant answer."}]}}
435"#,
436        );
437
438        let parsed = parse_session_file(&path).unwrap().unwrap();
439
440        assert_eq!(parsed.events.len(), 1);
441        assert_eq!(parsed.events[0].source_line_number, 2);
442    }
443
444    #[test]
445    fn skips_codex_instruction_preamble_messages() {
446        let path = temp_jsonl(
447            "preamble",
448            r##"{"timestamp":"2026-04-13T01:00:00Z","type":"session_meta","payload":{"id":"session-4","timestamp":"2026-04-13T01:00:00Z","cwd":"/tmp"}}
449{"timestamp":"2026-04-13T01:00:01Z","type":"event_msg","payload":{"type":"user_message","message":"# AGENTS.md instructions for /tmp\n\n<environment_context>\n  <cwd>/tmp</cwd>\n</environment_context>"}}
450{"timestamp":"2026-04-13T01:00:02Z","type":"event_msg","payload":{"type":"user_message","message":"What did we decide about Sentry?"}}
451"##,
452        );
453
454        let parsed = parse_session_file(&path).unwrap().unwrap();
455
456        assert_eq!(parsed.events.len(), 1);
457        assert_eq!(parsed.events[0].text, "What did we decide about Sentry?");
458    }
459
460    #[test]
461    fn parses_exec_command_end_with_argv_and_aggregated_output() {
462        let path = temp_jsonl(
463            "argv-command",
464            r#"{"timestamp":"2026-04-13T01:00:00Z","type":"session_meta","payload":{"id":"session-5","timestamp":"2026-04-13T01:00:00Z","cwd":"/Users/me/notes-vault"}}
465{"timestamp":"2026-04-13T01:00:01Z","type":"event_msg","payload":{"type":"exec_command_end","command":["/bin/zsh","-lc","cargo test"],"cwd":"/Users/me/projects/codex-recall","exit_code":0,"aggregated_output":"test result: ok"}}
466"#,
467        );
468
469        let parsed = parse_session_file(&path).unwrap().unwrap();
470
471        assert_eq!(parsed.events.len(), 1);
472        assert_eq!(parsed.events[0].kind, EventKind::Command);
473        assert_eq!(parsed.events[0].command.as_deref(), Some("cargo test"));
474        assert_eq!(
475            parsed.events[0].cwd.as_deref(),
476            Some("/Users/me/projects/codex-recall")
477        );
478        assert!(parsed.events[0].text.contains("test result: ok"));
479    }
480
481    #[test]
482    fn redacts_secrets_before_events_are_indexed() {
483        let github_pat = ["github", "_pat_", "1234567890abcdefghijklmnop"].concat();
484        let path = temp_jsonl(
485            "redaction",
486            &format!(
487                r#"{{"timestamp":"2026-04-13T01:00:00Z","type":"session_meta","payload":{{"id":"session-6","timestamp":"2026-04-13T01:00:00Z","cwd":"/Users/me/project"}}}}
488{{"timestamp":"2026-04-13T01:00:01Z","type":"event_msg","payload":{{"type":"user_message","message":"Use API_KEY=abc123456789 and Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ"}}}}
489{{"timestamp":"2026-04-13T01:00:02Z","type":"event_msg","payload":{{"type":"exec_command_end","command":"curl -H 'Authorization: Bearer supersecrettoken123456' https://example.com","cwd":"/Users/me/project","exit_code":0,"stdout":"{github_pat}\n","stderr":""}}}}
490"#
491            ),
492        );
493
494        let parsed = parse_session_file(&path).unwrap().unwrap();
495
496        assert_eq!(parsed.events.len(), 2);
497        assert!(parsed.events[0].text.contains("API_KEY=[REDACTED]"));
498        assert!(parsed.events[0]
499            .text
500            .contains("Authorization: Bearer [REDACTED]"));
501        assert!(parsed.events[1]
502            .command
503            .as_deref()
504            .unwrap()
505            .contains("Authorization: Bearer [REDACTED]"));
506        assert!(parsed.events[1].text.contains("[REDACTED]"));
507        assert!(!parsed.events.iter().any(|event| {
508            event.text.contains("abc123456789")
509                || event.text.contains("supersecrettoken123456")
510                || event.text.contains(&github_pat)
511        }));
512    }
513
514    #[test]
515    fn caps_large_message_events_before_indexing() {
516        let long_text = "alpha ".repeat(10_000);
517        let escaped = serde_json::to_string(&long_text).unwrap();
518        let path = temp_jsonl(
519            "large-message",
520            &format!(
521                r#"{{"timestamp":"2026-04-13T01:00:00Z","type":"session_meta","payload":{{"id":"session-7","timestamp":"2026-04-13T01:00:00Z","cwd":"/Users/me/project"}}}}
522{{"timestamp":"2026-04-13T01:00:01Z","type":"event_msg","payload":{{"type":"user_message","message":{escaped}}}}}
523"#
524            ),
525        );
526
527        let parsed = parse_session_file(&path).unwrap().unwrap();
528
529        assert_eq!(parsed.events.len(), 1);
530        assert!(parsed.events[0].text.len() <= MESSAGE_TEXT_LIMIT + "[truncated]".len() + 1);
531        assert!(parsed.events[0].text.contains("[truncated]"));
532    }
533}