Skip to main content

coding_agent_search/connectors/
codex.rs

1use std::collections::HashSet;
2use std::fs::File;
3use std::io::{BufRead, BufReader};
4
5use anyhow::Result;
6use serde_json::Value;
7
8use super::{
9    Connector, DetectionResult, DiscoveredSourceFile, NormalizedConversation, NormalizedMessage,
10    ScanContext, parse_timestamp, reindex_messages,
11};
12
13const MAX_INDEXED_TOOL_OUTPUT_CHARS: usize = 128 * 1024;
14
15pub struct CodexConnector {
16    inner: franken_agent_detection::CodexConnector,
17}
18
19impl Default for CodexConnector {
20    fn default() -> Self {
21        Self::new()
22    }
23}
24
25impl CodexConnector {
26    #[must_use]
27    pub fn new() -> Self {
28        Self {
29            inner: franken_agent_detection::CodexConnector::new(),
30        }
31    }
32}
33
34impl Connector for CodexConnector {
35    fn detect(&self) -> DetectionResult {
36        self.inner.detect()
37    }
38
39    fn scan(&self, ctx: &ScanContext) -> Result<Vec<NormalizedConversation>> {
40        let mut conversations = self.inner.scan(ctx)?;
41        for conversation in &mut conversations {
42            augment_modern_codex_messages(conversation);
43        }
44        Ok(conversations)
45    }
46
47    fn supports_streaming_scan(&self) -> bool {
48        self.inner.supports_streaming_scan()
49    }
50
51    fn discover_source_files(&self, ctx: &ScanContext) -> Result<Vec<DiscoveredSourceFile>> {
52        self.inner.discover_source_files(ctx)
53    }
54
55    fn scan_with_callback(
56        &self,
57        ctx: &ScanContext,
58        on_conversation: &mut dyn FnMut(NormalizedConversation) -> Result<()>,
59    ) -> Result<()> {
60        self.inner.scan_with_callback(ctx, &mut |mut conversation| {
61            augment_modern_codex_messages(&mut conversation);
62            on_conversation(conversation)
63        })
64    }
65}
66
67fn augment_modern_codex_messages(conversation: &mut NormalizedConversation) {
68    if conversation
69        .source_path
70        .extension()
71        .and_then(|ext| ext.to_str())
72        .is_none_or(|ext| !ext.eq_ignore_ascii_case("jsonl"))
73    {
74        return;
75    }
76
77    let Ok(file) = File::open(&conversation.source_path) else {
78        return;
79    };
80
81    let mut seen_messages: HashSet<ModernCodexMessageSignature> = conversation
82        .messages
83        .iter()
84        .map(modern_codex_message_signature)
85        .collect();
86    let mut seen_call_ids: HashSet<String> = conversation
87        .messages
88        .iter()
89        .flat_map(modern_codex_message_call_ids)
90        .collect();
91    let mut seen_raw_entries: HashSet<[u8; 32]> = conversation
92        .messages
93        .iter()
94        .map(|message| modern_codex_raw_signature(&message.extra))
95        .collect();
96    let mut added = false;
97    for line in BufReader::new(file).lines().map_while(Result::ok) {
98        let line = line.trim();
99        if line.is_empty() {
100            continue;
101        }
102        let Ok(raw) = serde_json::from_str::<Value>(line) else {
103            continue;
104        };
105        let raw_signature = modern_codex_raw_signature(&raw);
106        if seen_raw_entries.contains(&raw_signature) {
107            continue;
108        }
109        let Some(message) = modern_codex_message(&raw) else {
110            continue;
111        };
112        if message_already_indexed(&seen_messages, &seen_call_ids, &message) {
113            seen_raw_entries.insert(raw_signature);
114            continue;
115        }
116        seen_messages.insert(modern_codex_message_signature(&message));
117        seen_call_ids.extend(modern_codex_message_call_ids(&message));
118        seen_raw_entries.insert(raw_signature);
119        conversation.messages.push(message);
120        added = true;
121    }
122
123    if added {
124        conversation.messages.sort_by(|left, right| {
125            left.created_at
126                .cmp(&right.created_at)
127                .then_with(|| left.idx.cmp(&right.idx))
128        });
129        reindex_messages(&mut conversation.messages);
130    }
131}
132
133fn modern_codex_message(raw: &Value) -> Option<NormalizedMessage> {
134    let entry_type = raw.get("type").and_then(Value::as_str)?;
135    let payload = raw.get("payload")?;
136    let created_at = raw.get("timestamp").and_then(parse_timestamp);
137
138    match entry_type {
139        "response_item" => response_item_message(payload, created_at, raw),
140        "event_msg" => event_message(payload, created_at, raw),
141        _ => None,
142    }
143}
144
145fn response_item_message(
146    payload: &Value,
147    created_at: Option<i64>,
148    raw: &Value,
149) -> Option<NormalizedMessage> {
150    match payload.get("type").and_then(Value::as_str) {
151        Some("message") | None => {
152            let content = payload.get("content").and_then(flatten_modern_content)?;
153            let role = payload
154                .get("role")
155                .and_then(Value::as_str)
156                .unwrap_or("agent")
157                .to_string();
158            Some(normalized_message(
159                role,
160                None,
161                created_at,
162                content,
163                raw.clone(),
164                payload.get("content").map_or_else(
165                    Vec::new,
166                    franken_agent_detection::extract_invocations_from_content_blocks,
167                ),
168            ))
169        }
170        Some("function_call") => {
171            let tool_name = payload
172                .get("name")
173                .and_then(Value::as_str)
174                .unwrap_or("unknown");
175            let arguments = payload.get("arguments").cloned();
176            let content = tool_call_content(tool_name, arguments.as_ref());
177            let call_id = payload
178                .get("call_id")
179                .or_else(|| payload.get("id"))
180                .and_then(Value::as_str)
181                .map(str::to_string);
182            Some(normalized_message(
183                "assistant".to_string(),
184                None,
185                created_at,
186                content,
187                raw.clone(),
188                vec![franken_agent_detection::NormalizedInvocation {
189                    kind: "tool".to_string(),
190                    name: tool_name.to_string(),
191                    raw_name: None,
192                    call_id,
193                    arguments: arguments.and_then(normalize_invocation_arguments),
194                }],
195            ))
196        }
197        Some("function_call_output") => {
198            let output = payload.get("output").and_then(Value::as_str)?;
199            let call_id = payload.get("call_id").and_then(Value::as_str);
200            Some(normalized_message(
201                "tool".to_string(),
202                None,
203                created_at,
204                tool_output_content(call_id, output),
205                raw.clone(),
206                Vec::new(),
207            ))
208        }
209        _ => None,
210    }
211}
212
213fn event_message(
214    payload: &Value,
215    created_at: Option<i64>,
216    raw: &Value,
217) -> Option<NormalizedMessage> {
218    match payload.get("type").and_then(Value::as_str) {
219        Some("agent_message") => {
220            let content = payload
221                .get("message")
222                .or_else(|| payload.get("text"))
223                .and_then(Value::as_str)?
224                .trim()
225                .to_string();
226            non_empty_message("assistant".to_string(), None, created_at, content, raw)
227        }
228        Some("tool_result") => {
229            let output = payload
230                .get("output")
231                .or_else(|| payload.get("result"))
232                .and_then(Value::as_str)?;
233            let call_id = payload
234                .get("call_id")
235                .or_else(|| payload.get("id"))
236                .and_then(Value::as_str);
237            Some(normalized_message(
238                "tool".to_string(),
239                None,
240                created_at,
241                tool_output_content(call_id, output),
242                raw.clone(),
243                Vec::new(),
244            ))
245        }
246        _ => None,
247    }
248}
249
250fn normalized_message(
251    role: String,
252    author: Option<String>,
253    created_at: Option<i64>,
254    content: String,
255    extra: Value,
256    invocations: Vec<franken_agent_detection::NormalizedInvocation>,
257) -> NormalizedMessage {
258    NormalizedMessage {
259        idx: 0,
260        role,
261        author,
262        created_at,
263        content,
264        extra,
265        invocations,
266        snippets: Vec::new(),
267    }
268}
269
270fn non_empty_message(
271    role: String,
272    author: Option<String>,
273    created_at: Option<i64>,
274    content: String,
275    raw: &Value,
276) -> Option<NormalizedMessage> {
277    (!content.trim().is_empty())
278        .then(|| normalized_message(role, author, created_at, content, raw.clone(), Vec::new()))
279}
280
281fn flatten_modern_content(content: &Value) -> Option<String> {
282    if let Some(text) = content
283        .as_str()
284        .map(str::trim)
285        .filter(|text| !text.is_empty())
286    {
287        return Some(text.to_string());
288    }
289
290    let mut parts = Vec::new();
291    for item in content.as_array()? {
292        let text = modern_content_part_text(item);
293
294        let text = text.trim();
295        if !text.is_empty() {
296            parts.push(text.to_string());
297        }
298    }
299
300    (!parts.is_empty()).then(|| parts.join("\n"))
301}
302
303fn modern_content_part_text(item: &Value) -> String {
304    if let Some(text) = item.as_str() {
305        return text.to_string();
306    }
307
308    let item_type = item.get("type").and_then(Value::as_str);
309    if matches!(
310        item_type,
311        None | Some("text") | Some("input_text") | Some("output_text")
312    ) {
313        return item
314            .get("text")
315            .and_then(Value::as_str)
316            .unwrap_or("")
317            .to_string();
318    }
319
320    if item_type == Some("tool_use") {
321        let tool_name = item
322            .get("name")
323            .and_then(Value::as_str)
324            .unwrap_or("unknown");
325        let detail = item
326            .get("input")
327            .and_then(|input| {
328                input
329                    .get("description")
330                    .or_else(|| input.get("file_path"))
331                    .or_else(|| input.get("path"))
332                    .or_else(|| input.get("command"))
333            })
334            .and_then(Value::as_str)
335            .unwrap_or("")
336            .trim();
337        return if detail.is_empty() {
338            format!("[Tool: {tool_name}]")
339        } else {
340            format!("[Tool: {tool_name} - {detail}]")
341        };
342    }
343
344    String::new()
345}
346
347fn tool_call_content(tool_name: &str, arguments: Option<&Value>) -> String {
348    let mut content = format!("[Tool: {tool_name}]");
349    if let Some(arguments) = arguments.and_then(argument_text) {
350        content.push('\n');
351        content.push_str(&arguments);
352    }
353    content
354}
355
356fn tool_output_content(call_id: Option<&str>, output: &str) -> String {
357    let label = call_id.map_or_else(
358        || "[Tool output]".to_string(),
359        |id| format!("[Tool output: {id}]"),
360    );
361    let output = truncate_tool_output(output.trim());
362    if output.is_empty() {
363        label
364    } else {
365        format!("{label}\n{output}")
366    }
367}
368
369fn argument_text(arguments: &Value) -> Option<String> {
370    let text = match arguments {
371        Value::String(text) => text.trim().to_string(),
372        other => serde_json::to_string(other).ok()?,
373    };
374    (!text.is_empty()).then_some(text)
375}
376
377fn normalize_invocation_arguments(arguments: Value) -> Option<Value> {
378    match arguments {
379        Value::String(text) => serde_json::from_str(&text)
380            .ok()
381            .or_else(|| (!text.trim().is_empty()).then_some(Value::String(text))),
382        Value::Null => None,
383        other => Some(other),
384    }
385}
386
387fn truncate_tool_output(output: &str) -> String {
388    let mut truncated = String::new();
389    let mut chars = output.chars();
390    for _ in 0..MAX_INDEXED_TOOL_OUTPUT_CHARS {
391        let Some(ch) = chars.next() else {
392            return output.to_string();
393        };
394        truncated.push(ch);
395    }
396    let omitted = chars.count();
397    truncated.push_str(&format!(
398        "\n[truncated {omitted} additional chars from tool output]"
399    ));
400    truncated
401}
402
403#[derive(Debug, Clone, PartialEq, Eq, Hash)]
404struct ModernCodexMessageSignature {
405    role: String,
406    author: Option<String>,
407    created_at: Option<i64>,
408    content_hash: [u8; 32],
409}
410
411fn modern_codex_message_signature(message: &NormalizedMessage) -> ModernCodexMessageSignature {
412    ModernCodexMessageSignature {
413        role: message.role.clone(),
414        author: message.author.clone(),
415        created_at: message.created_at,
416        content_hash: *blake3::hash(message.content.as_bytes()).as_bytes(),
417    }
418}
419
420fn modern_codex_raw_signature(raw: &Value) -> [u8; 32] {
421    let mut bytes = Vec::new();
422    if serde_json::to_writer(&mut bytes, raw).is_err() {
423        bytes.extend_from_slice(raw.to_string().as_bytes());
424    }
425    *blake3::hash(&bytes).as_bytes()
426}
427
428fn modern_codex_message_call_ids(message: &NormalizedMessage) -> impl Iterator<Item = String> + '_ {
429    message
430        .invocations
431        .iter()
432        .filter_map(|invocation| invocation.call_id.clone())
433}
434
435fn message_already_indexed(
436    seen_messages: &HashSet<ModernCodexMessageSignature>,
437    seen_call_ids: &HashSet<String>,
438    candidate: &NormalizedMessage,
439) -> bool {
440    seen_messages.contains(&modern_codex_message_signature(candidate))
441        || candidate
442            .invocations
443            .iter()
444            .filter_map(|invocation| invocation.call_id.as_deref())
445            .any(|call_id| seen_call_ids.contains(call_id))
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451
452    fn message(content: &str, call_id: Option<&str>) -> NormalizedMessage {
453        NormalizedMessage {
454            idx: 0,
455            role: "assistant".to_string(),
456            author: None,
457            created_at: Some(1_700_000_000_000),
458            content: content.to_string(),
459            extra: Value::Null,
460            invocations: call_id
461                .map(|call_id| {
462                    vec![franken_agent_detection::NormalizedInvocation {
463                        kind: "tool".to_string(),
464                        name: "shell".to_string(),
465                        raw_name: None,
466                        call_id: Some(call_id.to_string()),
467                        arguments: None,
468                    }]
469                })
470                .unwrap_or_default(),
471            snippets: Vec::new(),
472        }
473    }
474
475    #[test]
476    fn modern_codex_duplicate_detection_uses_precomputed_sets() {
477        let existing = message("canonical response", Some("call-1"));
478        let mut seen_messages = HashSet::from([modern_codex_message_signature(&existing)]);
479        let mut seen_call_ids: HashSet<String> = modern_codex_message_call_ids(&existing).collect();
480
481        assert!(message_already_indexed(
482            &seen_messages,
483            &seen_call_ids,
484            &message("canonical response", None)
485        ));
486        assert!(message_already_indexed(
487            &seen_messages,
488            &seen_call_ids,
489            &message("same tool call, changed wording", Some("call-1"))
490        ));
491
492        let fresh = message("fresh response", Some("call-2"));
493        assert!(!message_already_indexed(
494            &seen_messages,
495            &seen_call_ids,
496            &fresh
497        ));
498        seen_messages.insert(modern_codex_message_signature(&fresh));
499        seen_call_ids.extend(modern_codex_message_call_ids(&fresh));
500        assert!(message_already_indexed(
501            &seen_messages,
502            &seen_call_ids,
503            &fresh
504        ));
505    }
506}