Skip to main content

coding_agent_search/connectors/
codex.rs

1use std::collections::HashSet;
2use std::fs::File;
3use std::io::{BufRead, BufReader};
4
5use anyhow::Result;
6use serde_json::Value;
7use tracing::warn;
8
9use super::{
10    Connector, DetectionResult, DiscoveredSourceFile, NormalizedConversation, NormalizedMessage,
11    ScanContext, parse_timestamp, reindex_messages,
12};
13
14const MAX_INDEXED_TOOL_OUTPUT_CHARS: usize = 128 * 1024;
15
16pub struct CodexConnector {
17    inner: franken_agent_detection::CodexConnector,
18}
19
20impl Default for CodexConnector {
21    fn default() -> Self {
22        Self::new()
23    }
24}
25
26impl CodexConnector {
27    #[must_use]
28    pub fn new() -> Self {
29        Self {
30            inner: franken_agent_detection::CodexConnector::new(),
31        }
32    }
33}
34
35impl Connector for CodexConnector {
36    fn detect(&self) -> DetectionResult {
37        self.inner.detect()
38    }
39
40    fn scan(&self, ctx: &ScanContext) -> Result<Vec<NormalizedConversation>> {
41        let mut conversations = self.inner.scan(ctx)?;
42        for conversation in &mut conversations {
43            augment_modern_codex_messages(conversation);
44        }
45        Ok(conversations)
46    }
47
48    fn supports_streaming_scan(&self) -> bool {
49        self.inner.supports_streaming_scan()
50    }
51
52    fn discover_source_files(&self, ctx: &ScanContext) -> Result<Vec<DiscoveredSourceFile>> {
53        self.inner.discover_source_files(ctx)
54    }
55
56    fn scan_with_callback(
57        &self,
58        ctx: &ScanContext,
59        on_conversation: &mut dyn FnMut(NormalizedConversation) -> Result<()>,
60    ) -> Result<()> {
61        self.inner.scan_with_callback(ctx, &mut |mut conversation| {
62            augment_modern_codex_messages(&mut conversation);
63            on_conversation(conversation)
64        })
65    }
66}
67
68fn augment_modern_codex_messages(conversation: &mut NormalizedConversation) {
69    if conversation
70        .source_path
71        .extension()
72        .and_then(|ext| ext.to_str())
73        .is_none_or(|ext| !ext.eq_ignore_ascii_case("jsonl"))
74    {
75        return;
76    }
77
78    let Ok(file) = File::open(&conversation.source_path) else {
79        return;
80    };
81
82    let mut seen_messages: HashSet<ModernCodexMessageSignature> = conversation
83        .messages
84        .iter()
85        .map(modern_codex_message_signature)
86        .collect();
87    let mut seen_call_ids: HashSet<String> = conversation
88        .messages
89        .iter()
90        .flat_map(modern_codex_message_call_ids)
91        .collect();
92    let mut seen_raw_entries: HashSet<[u8; 32]> = conversation
93        .messages
94        .iter()
95        .map(|message| modern_codex_raw_signature(&message.extra))
96        .collect();
97    let mut added = false;
98    for (line_no_zero, line) in BufReader::new(file)
99        .lines()
100        .map_while(Result::ok)
101        .enumerate()
102    {
103        let line_no = line_no_zero + 1;
104        let line = line.trim();
105        if line.is_empty() {
106            continue;
107        }
108        let raw = match serde_json::from_str::<Value>(line) {
109            Ok(value) => value,
110            Err(parse_err) => {
111                // Per gauntlet finding CONF-cass-003: surface malformed JSONL lines
112                // to tracing so operators can correlate `cass diag` reports against
113                // unreadable Codex rollout entries. The line is still dropped to
114                // preserve resilience; the warning is purely diagnostic.
115                warn!(
116                    source_path = %conversation.source_path.display(),
117                    line_no = line_no,
118                    error = %parse_err,
119                    "codex rollout JSONL line failed to parse; skipping",
120                );
121                continue;
122            }
123        };
124        let raw_signature = modern_codex_raw_signature(&raw);
125        if seen_raw_entries.contains(&raw_signature) {
126            continue;
127        }
128        let Some(message) = modern_codex_message(&raw) else {
129            continue;
130        };
131        if message_already_indexed(&seen_messages, &seen_call_ids, &message) {
132            seen_raw_entries.insert(raw_signature);
133            continue;
134        }
135        seen_messages.insert(modern_codex_message_signature(&message));
136        seen_call_ids.extend(modern_codex_message_call_ids(&message));
137        seen_raw_entries.insert(raw_signature);
138        conversation.messages.push(message);
139        added = true;
140    }
141
142    if added {
143        conversation.messages.sort_by(|left, right| {
144            left.created_at
145                .cmp(&right.created_at)
146                .then_with(|| left.idx.cmp(&right.idx))
147        });
148        reindex_messages(&mut conversation.messages);
149    }
150}
151
152fn modern_codex_message(raw: &Value) -> Option<NormalizedMessage> {
153    let entry_type = raw.get("type").and_then(Value::as_str)?;
154    let payload = raw.get("payload")?;
155    let created_at = raw.get("timestamp").and_then(parse_timestamp);
156
157    match entry_type {
158        "response_item" => response_item_message(payload, created_at, raw),
159        "event_msg" => event_message(payload, created_at, raw),
160        _ => None,
161    }
162}
163
164fn response_item_message(
165    payload: &Value,
166    created_at: Option<i64>,
167    raw: &Value,
168) -> Option<NormalizedMessage> {
169    match payload.get("type").and_then(Value::as_str) {
170        Some("message") | None => {
171            let content = payload.get("content").and_then(flatten_modern_content)?;
172            let role = payload
173                .get("role")
174                .and_then(Value::as_str)
175                .unwrap_or("agent")
176                .to_string();
177            Some(normalized_message(
178                role,
179                None,
180                created_at,
181                content,
182                raw.clone(),
183                payload.get("content").map_or_else(
184                    Vec::new,
185                    franken_agent_detection::extract_invocations_from_content_blocks,
186                ),
187            ))
188        }
189        Some("function_call") => {
190            let tool_name = payload
191                .get("name")
192                .and_then(Value::as_str)
193                .unwrap_or("unknown");
194            let arguments = payload.get("arguments").cloned();
195            let content = tool_call_content(tool_name, arguments.as_ref());
196            let call_id = payload
197                .get("call_id")
198                .or_else(|| payload.get("id"))
199                .and_then(Value::as_str)
200                .map(str::to_string);
201            Some(normalized_message(
202                "assistant".to_string(),
203                None,
204                created_at,
205                content,
206                raw.clone(),
207                vec![franken_agent_detection::NormalizedInvocation {
208                    kind: "tool".to_string(),
209                    name: tool_name.to_string(),
210                    raw_name: None,
211                    call_id,
212                    arguments: arguments.and_then(normalize_invocation_arguments),
213                }],
214            ))
215        }
216        Some("function_call_output") => {
217            let output = payload.get("output").and_then(Value::as_str)?;
218            let call_id = payload.get("call_id").and_then(Value::as_str);
219            Some(normalized_message(
220                "tool".to_string(),
221                None,
222                created_at,
223                tool_output_content(call_id, output),
224                raw.clone(),
225                Vec::new(),
226            ))
227        }
228        _ => None,
229    }
230}
231
232fn event_message(
233    payload: &Value,
234    created_at: Option<i64>,
235    raw: &Value,
236) -> Option<NormalizedMessage> {
237    match payload.get("type").and_then(Value::as_str) {
238        Some("agent_message") => {
239            let content = payload
240                .get("message")
241                .or_else(|| payload.get("text"))
242                .and_then(Value::as_str)?
243                .trim()
244                .to_string();
245            non_empty_message("assistant".to_string(), None, created_at, content, raw)
246        }
247        Some("tool_result") => {
248            let output = payload
249                .get("output")
250                .or_else(|| payload.get("result"))
251                .and_then(Value::as_str)?;
252            let call_id = payload
253                .get("call_id")
254                .or_else(|| payload.get("id"))
255                .and_then(Value::as_str);
256            Some(normalized_message(
257                "tool".to_string(),
258                None,
259                created_at,
260                tool_output_content(call_id, output),
261                raw.clone(),
262                Vec::new(),
263            ))
264        }
265        _ => None,
266    }
267}
268
269fn normalized_message(
270    role: String,
271    author: Option<String>,
272    created_at: Option<i64>,
273    content: String,
274    extra: Value,
275    invocations: Vec<franken_agent_detection::NormalizedInvocation>,
276) -> NormalizedMessage {
277    NormalizedMessage {
278        idx: 0,
279        role,
280        author,
281        created_at,
282        content,
283        extra,
284        invocations,
285        snippets: Vec::new(),
286    }
287}
288
289fn non_empty_message(
290    role: String,
291    author: Option<String>,
292    created_at: Option<i64>,
293    content: String,
294    raw: &Value,
295) -> Option<NormalizedMessage> {
296    (!content.trim().is_empty())
297        .then(|| normalized_message(role, author, created_at, content, raw.clone(), Vec::new()))
298}
299
300fn flatten_modern_content(content: &Value) -> Option<String> {
301    if let Some(text) = content
302        .as_str()
303        .map(str::trim)
304        .filter(|text| !text.is_empty())
305    {
306        return Some(text.to_string());
307    }
308
309    let mut parts = Vec::new();
310    for item in content.as_array()? {
311        let text = modern_content_part_text(item);
312
313        let text = text.trim();
314        if !text.is_empty() {
315            parts.push(text.to_string());
316        }
317    }
318
319    (!parts.is_empty()).then(|| parts.join("\n"))
320}
321
322fn modern_content_part_text(item: &Value) -> String {
323    if let Some(text) = item.as_str() {
324        return text.to_string();
325    }
326
327    let item_type = item.get("type").and_then(Value::as_str);
328    if matches!(
329        item_type,
330        None | Some("text") | Some("input_text") | Some("output_text")
331    ) {
332        return item
333            .get("text")
334            .and_then(Value::as_str)
335            .unwrap_or("")
336            .to_string();
337    }
338
339    if item_type == Some("tool_use") {
340        let tool_name = item
341            .get("name")
342            .and_then(Value::as_str)
343            .unwrap_or("unknown");
344        let detail = item
345            .get("input")
346            .and_then(|input| {
347                input
348                    .get("description")
349                    .or_else(|| input.get("file_path"))
350                    .or_else(|| input.get("path"))
351                    .or_else(|| input.get("command"))
352            })
353            .and_then(Value::as_str)
354            .unwrap_or("")
355            .trim();
356        return if detail.is_empty() {
357            format!("[Tool: {tool_name}]")
358        } else {
359            format!("[Tool: {tool_name} - {detail}]")
360        };
361    }
362
363    String::new()
364}
365
366fn tool_call_content(tool_name: &str, arguments: Option<&Value>) -> String {
367    let mut content = format!("[Tool: {tool_name}]");
368    if let Some(arguments) = arguments.and_then(argument_text) {
369        content.push('\n');
370        content.push_str(&arguments);
371    }
372    content
373}
374
375fn tool_output_content(call_id: Option<&str>, output: &str) -> String {
376    let label = call_id.map_or_else(
377        || "[Tool output]".to_string(),
378        |id| format!("[Tool output: {id}]"),
379    );
380    let output = truncate_tool_output(output.trim());
381    if output.is_empty() {
382        label
383    } else {
384        format!("{label}\n{output}")
385    }
386}
387
388fn argument_text(arguments: &Value) -> Option<String> {
389    let text = match arguments {
390        Value::String(text) => text.trim().to_string(),
391        other => serde_json::to_string(other).ok()?,
392    };
393    (!text.is_empty()).then_some(text)
394}
395
396fn normalize_invocation_arguments(arguments: Value) -> Option<Value> {
397    match arguments {
398        Value::String(text) => serde_json::from_str(&text)
399            .ok()
400            .or_else(|| (!text.trim().is_empty()).then_some(Value::String(text))),
401        Value::Null => None,
402        other => Some(other),
403    }
404}
405
406fn truncate_tool_output(output: &str) -> String {
407    let mut truncated = String::new();
408    let mut chars = output.chars();
409    for _ in 0..MAX_INDEXED_TOOL_OUTPUT_CHARS {
410        let Some(ch) = chars.next() else {
411            return output.to_string();
412        };
413        truncated.push(ch);
414    }
415    let omitted = chars.count();
416    truncated.push_str(&format!(
417        "\n[truncated {omitted} additional chars from tool output]"
418    ));
419    truncated
420}
421
422#[derive(Debug, Clone, PartialEq, Eq, Hash)]
423struct ModernCodexMessageSignature {
424    role: String,
425    author: Option<String>,
426    created_at: Option<i64>,
427    content_hash: [u8; 32],
428}
429
430fn modern_codex_message_signature(message: &NormalizedMessage) -> ModernCodexMessageSignature {
431    ModernCodexMessageSignature {
432        role: message.role.clone(),
433        author: message.author.clone(),
434        created_at: message.created_at,
435        content_hash: *blake3::hash(message.content.as_bytes()).as_bytes(),
436    }
437}
438
439fn modern_codex_raw_signature(raw: &Value) -> [u8; 32] {
440    let mut bytes = Vec::new();
441    if serde_json::to_writer(&mut bytes, raw).is_err() {
442        bytes.extend_from_slice(raw.to_string().as_bytes());
443    }
444    *blake3::hash(&bytes).as_bytes()
445}
446
447fn modern_codex_message_call_ids(message: &NormalizedMessage) -> impl Iterator<Item = String> + '_ {
448    message
449        .invocations
450        .iter()
451        .filter_map(|invocation| invocation.call_id.clone())
452}
453
454fn message_already_indexed(
455    seen_messages: &HashSet<ModernCodexMessageSignature>,
456    seen_call_ids: &HashSet<String>,
457    candidate: &NormalizedMessage,
458) -> bool {
459    seen_messages.contains(&modern_codex_message_signature(candidate))
460        || candidate
461            .invocations
462            .iter()
463            .filter_map(|invocation| invocation.call_id.as_deref())
464            .any(|call_id| seen_call_ids.contains(call_id))
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470
471    fn message(content: &str, call_id: Option<&str>) -> NormalizedMessage {
472        NormalizedMessage {
473            idx: 0,
474            role: "assistant".to_string(),
475            author: None,
476            created_at: Some(1_700_000_000_000),
477            content: content.to_string(),
478            extra: Value::Null,
479            invocations: call_id
480                .map(|call_id| {
481                    vec![franken_agent_detection::NormalizedInvocation {
482                        kind: "tool".to_string(),
483                        name: "shell".to_string(),
484                        raw_name: None,
485                        call_id: Some(call_id.to_string()),
486                        arguments: None,
487                    }]
488                })
489                .unwrap_or_default(),
490            snippets: Vec::new(),
491        }
492    }
493
494    #[test]
495    fn modern_codex_duplicate_detection_uses_precomputed_sets() {
496        let existing = message("canonical response", Some("call-1"));
497        let mut seen_messages = HashSet::from([modern_codex_message_signature(&existing)]);
498        let mut seen_call_ids: HashSet<String> = modern_codex_message_call_ids(&existing).collect();
499
500        assert!(message_already_indexed(
501            &seen_messages,
502            &seen_call_ids,
503            &message("canonical response", None)
504        ));
505        assert!(message_already_indexed(
506            &seen_messages,
507            &seen_call_ids,
508            &message("same tool call, changed wording", Some("call-1"))
509        ));
510
511        let fresh = message("fresh response", Some("call-2"));
512        assert!(!message_already_indexed(
513            &seen_messages,
514            &seen_call_ids,
515            &fresh
516        ));
517        seen_messages.insert(modern_codex_message_signature(&fresh));
518        seen_call_ids.extend(modern_codex_message_call_ids(&fresh));
519        assert!(message_already_indexed(
520            &seen_messages,
521            &seen_call_ids,
522            &fresh
523        ));
524    }
525}