Skip to main content

normalize_chat_sessions/formats/
codex.rs

1//! OpenAI Codex CLI JSONL format parser.
2
3use super::{LogFormat, ParseError, SessionFile, peek_lines};
4use crate::{ContentBlock, Message, Role, Session, TokenUsage, Turn};
5use serde_json::Value;
6use std::collections::HashMap;
7use std::fs::File;
8use std::io::{BufRead, BufReader};
9use std::path::{Path, PathBuf};
10
11/// OpenAI Codex CLI session log format (JSONL).
12pub struct CodexFormat;
13
14impl LogFormat for CodexFormat {
15    fn name(&self) -> &'static str {
16        "codex"
17    }
18
19    fn sessions_dir(&self, _project: Option<&Path>) -> PathBuf {
20        let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".into());
21        PathBuf::from(home).join(".codex/sessions")
22    }
23
24    fn list_sessions(&self, project: Option<&Path>) -> Vec<SessionFile> {
25        let dir = self.sessions_dir(project);
26        // Codex stores sessions in ~/.codex/sessions/YYYY/MM/DD/*.jsonl
27        let mut sessions = Vec::new();
28        // Walk year directories
29        if let Ok(years) = std::fs::read_dir(&dir) {
30            for year in years.filter_map(|e| e.ok()) {
31                if !year.path().is_dir() {
32                    continue;
33                }
34                // Walk month directories
35                if let Ok(months) = std::fs::read_dir(year.path()) {
36                    for month in months.filter_map(|e| e.ok()) {
37                        if !month.path().is_dir() {
38                            continue;
39                        }
40                        // Walk day directories
41                        if let Ok(days) = std::fs::read_dir(month.path()) {
42                            for day in days.filter_map(|e| e.ok()) {
43                                if !day.path().is_dir() {
44                                    continue;
45                                }
46                                // Find .jsonl files
47                                if let Ok(files) = std::fs::read_dir(day.path()) {
48                                    for file in files.filter_map(|e| e.ok()) {
49                                        let path = file.path();
50                                        if path.extension().and_then(|e| e.to_str())
51                                            == Some("jsonl")
52                                            && let Ok(meta) = path.metadata()
53                                            && let Ok(mtime) = meta.modified()
54                                        {
55                                            sessions.push(SessionFile {
56                                                path,
57                                                mtime,
58                                                parent_id: None,
59                                                agent_id: None,
60                                                subagent_type: Some("interactive".into()),
61                                            });
62                                        }
63                                    }
64                                }
65                            }
66                        }
67                    }
68                }
69            }
70        }
71        sessions
72    }
73
74    fn detect(&self, path: &Path) -> f64 {
75        // Check extension
76        let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
77        if ext != "jsonl" {
78            return 0.0;
79        }
80
81        // Peek at first few lines
82        for line in peek_lines(path, 5) {
83            if let Ok(entry) = serde_json::from_str::<Value>(&line) {
84                // Codex has type field with session_meta, response_item, event_msg
85                if let Some(t) = entry.get("type").and_then(|v| v.as_str())
86                    && t == "session_meta"
87                {
88                    // Check for codex-specific originator
89                    if let Some(originator) = entry
90                        .get("payload")
91                        .and_then(|p| p.get("originator"))
92                        .and_then(|v| v.as_str())
93                        && originator.contains("codex")
94                    {
95                        return 1.0;
96                    }
97                }
98            }
99        }
100        0.0
101    }
102
103    fn parse(&self, path: &Path) -> Result<Session, ParseError> {
104        let file = File::open(path).map_err(|e| ParseError::Io {
105            path: path.to_path_buf(),
106            source: e,
107        })?;
108        let reader = BufReader::new(file);
109
110        let mut session = Session::new(path.to_path_buf(), self.name());
111        session.subagent_type = Some("interactive".into());
112        let mut current_turn = Turn::default();
113        let mut pending_tool_calls: HashMap<String, (String, Value)> = HashMap::new();
114
115        for line in reader.lines() {
116            let line = line.map_err(|e| ParseError::Io {
117                path: path.to_path_buf(),
118                source: e,
119            })?;
120            if line.trim().is_empty() {
121                continue;
122            }
123
124            let Ok(entry) = serde_json::from_str::<Value>(&line) else {
125                continue;
126            };
127
128            let entry_type = entry.get("type").and_then(|v| v.as_str()).unwrap_or("");
129
130            // Extract metadata from session_meta
131            if entry_type == "session_meta"
132                && let Some(payload) = entry.get("payload")
133            {
134                if session.metadata.session_id.is_none() {
135                    session.metadata.session_id = payload
136                        .get("session_id")
137                        .and_then(|v| v.as_str())
138                        .map(String::from);
139                }
140                if session.metadata.model.is_none() {
141                    session.metadata.model = payload
142                        .get("model")
143                        .and_then(|v| v.as_str())
144                        .map(String::from);
145                }
146            }
147
148            let Some(payload) = entry.get("payload") else {
149                continue;
150            };
151
152            let payload_type = payload.get("type").and_then(|v| v.as_str()).unwrap_or("");
153
154            match payload_type {
155                "user_message" => {
156                    // Flush previous turn
157                    if !current_turn.messages.is_empty() {
158                        session.turns.push(std::mem::take(&mut current_turn));
159                    }
160
161                    let text = payload
162                        .get("content")
163                        .and_then(|v| v.as_str())
164                        .unwrap_or("")
165                        .to_string();
166
167                    current_turn.messages.push(Message {
168                        role: Role::User,
169                        content: vec![ContentBlock::Text { text }],
170                        timestamp: entry
171                            .get("timestamp")
172                            .and_then(|v| v.as_str())
173                            .map(String::from),
174                    });
175                }
176                "message" => {
177                    // Assistant text response
178                    let text = payload
179                        .get("content")
180                        .and_then(|v| v.as_str())
181                        .unwrap_or("")
182                        .to_string();
183
184                    if !text.is_empty() {
185                        current_turn.messages.push(Message {
186                            role: Role::Assistant,
187                            content: vec![ContentBlock::Text { text }],
188                            timestamp: entry
189                                .get("timestamp")
190                                .and_then(|v| v.as_str())
191                                .map(String::from),
192                        });
193                    }
194                }
195                "function_call" => {
196                    let call_id = payload
197                        .get("call_id")
198                        .and_then(|v| v.as_str())
199                        .unwrap_or("")
200                        .to_string();
201                    let name = payload
202                        .get("name")
203                        .and_then(|v| v.as_str())
204                        .unwrap_or("")
205                        .to_string();
206                    let args_str = payload
207                        .get("arguments")
208                        .and_then(|v| v.as_str())
209                        .unwrap_or("{}");
210                    let input: Value =
211                        serde_json::from_str(args_str).unwrap_or(Value::Object(Default::default()));
212
213                    // Store for later pairing with result
214                    pending_tool_calls.insert(call_id.clone(), (name.clone(), input.clone()));
215
216                    current_turn.messages.push(Message {
217                        role: Role::Assistant,
218                        content: vec![ContentBlock::ToolUse {
219                            id: call_id,
220                            name,
221                            input,
222                        }],
223                        timestamp: entry
224                            .get("timestamp")
225                            .and_then(|v| v.as_str())
226                            .map(String::from),
227                    });
228                }
229                "function_call_output" => {
230                    let call_id = payload
231                        .get("call_id")
232                        .and_then(|v| v.as_str())
233                        .unwrap_or("")
234                        .to_string();
235                    let output = payload
236                        .get("output")
237                        .and_then(|v| v.as_str())
238                        .unwrap_or("")
239                        .to_string();
240                    let is_error = output.contains("Exit code: 1")
241                        || output.starts_with("Error:")
242                        || output.contains("\nError:");
243
244                    current_turn.messages.push(Message {
245                        role: Role::User,
246                        content: vec![ContentBlock::ToolResult {
247                            tool_use_id: call_id,
248                            content: output,
249                            is_error,
250                        }],
251                        timestamp: entry
252                            .get("timestamp")
253                            .and_then(|v| v.as_str())
254                            .map(String::from),
255                    });
256                }
257                "token_count" => {
258                    // Extract final token usage
259                    if let Some(info) = payload.get("info")
260                        && let Some(total) = info.get("total_token_usage")
261                    {
262                        current_turn.token_usage = Some(TokenUsage {
263                            input: total
264                                .get("input_tokens")
265                                .and_then(|v| v.as_u64())
266                                .unwrap_or(0),
267                            output: total
268                                .get("output_tokens")
269                                .and_then(|v| v.as_u64())
270                                .unwrap_or(0)
271                                + total
272                                    .get("reasoning_output_tokens")
273                                    .and_then(|v| v.as_u64())
274                                    .unwrap_or(0),
275                            cache_read: total.get("cached_input_tokens").and_then(|v| v.as_u64()),
276                            cache_create: None,
277                            model: session.metadata.model.clone(),
278                        });
279                    }
280                }
281                _ => {}
282            }
283        }
284
285        // Flush final turn
286        if !current_turn.messages.is_empty() {
287            session.turns.push(current_turn);
288        }
289
290        // Set provider
291        session.metadata.provider = Some("openai".to_string());
292
293        Ok(session)
294    }
295}