coding_agent_search/connectors/
codex.rs1use std::collections::HashSet;
2use std::fs::File;
3use std::io::{BufRead, BufReader};
4
5use anyhow::Result;
6use serde_json::Value;
7use tracing::warn;
8
9use super::{
10 Connector, DetectionResult, DiscoveredSourceFile, NormalizedConversation, NormalizedMessage,
11 ScanContext, parse_timestamp, reindex_messages,
12};
13
14const MAX_INDEXED_TOOL_OUTPUT_CHARS: usize = 128 * 1024;
15
16pub struct CodexConnector {
17 inner: franken_agent_detection::CodexConnector,
18}
19
20impl Default for CodexConnector {
21 fn default() -> Self {
22 Self::new()
23 }
24}
25
26impl CodexConnector {
27 #[must_use]
28 pub fn new() -> Self {
29 Self {
30 inner: franken_agent_detection::CodexConnector::new(),
31 }
32 }
33}
34
35impl Connector for CodexConnector {
36 fn detect(&self) -> DetectionResult {
37 self.inner.detect()
38 }
39
40 fn scan(&self, ctx: &ScanContext) -> Result<Vec<NormalizedConversation>> {
41 let mut conversations = self.inner.scan(ctx)?;
42 for conversation in &mut conversations {
43 augment_modern_codex_messages(conversation);
44 }
45 Ok(conversations)
46 }
47
48 fn supports_streaming_scan(&self) -> bool {
49 self.inner.supports_streaming_scan()
50 }
51
52 fn discover_source_files(&self, ctx: &ScanContext) -> Result<Vec<DiscoveredSourceFile>> {
53 self.inner.discover_source_files(ctx)
54 }
55
56 fn scan_with_callback(
57 &self,
58 ctx: &ScanContext,
59 on_conversation: &mut dyn FnMut(NormalizedConversation) -> Result<()>,
60 ) -> Result<()> {
61 self.inner.scan_with_callback(ctx, &mut |mut conversation| {
62 augment_modern_codex_messages(&mut conversation);
63 on_conversation(conversation)
64 })
65 }
66}
67
68fn augment_modern_codex_messages(conversation: &mut NormalizedConversation) {
69 if conversation
70 .source_path
71 .extension()
72 .and_then(|ext| ext.to_str())
73 .is_none_or(|ext| !ext.eq_ignore_ascii_case("jsonl"))
74 {
75 return;
76 }
77
78 let Ok(file) = File::open(&conversation.source_path) else {
79 return;
80 };
81
82 let mut seen_messages: HashSet<ModernCodexMessageSignature> = conversation
83 .messages
84 .iter()
85 .map(modern_codex_message_signature)
86 .collect();
87 let mut seen_call_ids: HashSet<String> = conversation
88 .messages
89 .iter()
90 .flat_map(modern_codex_message_call_ids)
91 .collect();
92 let mut seen_raw_entries: HashSet<[u8; 32]> = conversation
93 .messages
94 .iter()
95 .map(|message| modern_codex_raw_signature(&message.extra))
96 .collect();
97 let mut added = false;
98 for (line_no_zero, line) in BufReader::new(file)
99 .lines()
100 .map_while(Result::ok)
101 .enumerate()
102 {
103 let line_no = line_no_zero + 1;
104 let line = line.trim();
105 if line.is_empty() {
106 continue;
107 }
108 let raw = match serde_json::from_str::<Value>(line) {
109 Ok(value) => value,
110 Err(parse_err) => {
111 warn!(
116 source_path = %conversation.source_path.display(),
117 line_no = line_no,
118 error = %parse_err,
119 "codex rollout JSONL line failed to parse; skipping",
120 );
121 continue;
122 }
123 };
124 let raw_signature = modern_codex_raw_signature(&raw);
125 if seen_raw_entries.contains(&raw_signature) {
126 continue;
127 }
128 let Some(message) = modern_codex_message(&raw) else {
129 continue;
130 };
131 if message_already_indexed(&seen_messages, &seen_call_ids, &message) {
132 seen_raw_entries.insert(raw_signature);
133 continue;
134 }
135 seen_messages.insert(modern_codex_message_signature(&message));
136 seen_call_ids.extend(modern_codex_message_call_ids(&message));
137 seen_raw_entries.insert(raw_signature);
138 conversation.messages.push(message);
139 added = true;
140 }
141
142 if added {
143 conversation.messages.sort_by(|left, right| {
144 left.created_at
145 .cmp(&right.created_at)
146 .then_with(|| left.idx.cmp(&right.idx))
147 });
148 reindex_messages(&mut conversation.messages);
149 }
150}
151
152fn modern_codex_message(raw: &Value) -> Option<NormalizedMessage> {
153 let entry_type = raw.get("type").and_then(Value::as_str)?;
154 let payload = raw.get("payload")?;
155 let created_at = raw.get("timestamp").and_then(parse_timestamp);
156
157 match entry_type {
158 "response_item" => response_item_message(payload, created_at, raw),
159 "event_msg" => event_message(payload, created_at, raw),
160 _ => None,
161 }
162}
163
164fn response_item_message(
165 payload: &Value,
166 created_at: Option<i64>,
167 raw: &Value,
168) -> Option<NormalizedMessage> {
169 match payload.get("type").and_then(Value::as_str) {
170 Some("message") | None => {
171 let content = payload.get("content").and_then(flatten_modern_content)?;
172 let role = payload
173 .get("role")
174 .and_then(Value::as_str)
175 .unwrap_or("agent")
176 .to_string();
177 Some(normalized_message(
178 role,
179 None,
180 created_at,
181 content,
182 raw.clone(),
183 payload.get("content").map_or_else(
184 Vec::new,
185 franken_agent_detection::extract_invocations_from_content_blocks,
186 ),
187 ))
188 }
189 Some("function_call") => {
190 let tool_name = payload
191 .get("name")
192 .and_then(Value::as_str)
193 .unwrap_or("unknown");
194 let arguments = payload.get("arguments").cloned();
195 let content = tool_call_content(tool_name, arguments.as_ref());
196 let call_id = payload
197 .get("call_id")
198 .or_else(|| payload.get("id"))
199 .and_then(Value::as_str)
200 .map(str::to_string);
201 Some(normalized_message(
202 "assistant".to_string(),
203 None,
204 created_at,
205 content,
206 raw.clone(),
207 vec![franken_agent_detection::NormalizedInvocation {
208 kind: "tool".to_string(),
209 name: tool_name.to_string(),
210 raw_name: None,
211 call_id,
212 arguments: arguments.and_then(normalize_invocation_arguments),
213 }],
214 ))
215 }
216 Some("function_call_output") => {
217 let output = payload.get("output").and_then(Value::as_str)?;
218 let call_id = payload.get("call_id").and_then(Value::as_str);
219 Some(normalized_message(
220 "tool".to_string(),
221 None,
222 created_at,
223 tool_output_content(call_id, output),
224 raw.clone(),
225 Vec::new(),
226 ))
227 }
228 _ => None,
229 }
230}
231
232fn event_message(
233 payload: &Value,
234 created_at: Option<i64>,
235 raw: &Value,
236) -> Option<NormalizedMessage> {
237 match payload.get("type").and_then(Value::as_str) {
238 Some("agent_message") => {
239 let content = payload
240 .get("message")
241 .or_else(|| payload.get("text"))
242 .and_then(Value::as_str)?
243 .trim()
244 .to_string();
245 non_empty_message("assistant".to_string(), None, created_at, content, raw)
246 }
247 Some("tool_result") => {
248 let output = payload
249 .get("output")
250 .or_else(|| payload.get("result"))
251 .and_then(Value::as_str)?;
252 let call_id = payload
253 .get("call_id")
254 .or_else(|| payload.get("id"))
255 .and_then(Value::as_str);
256 Some(normalized_message(
257 "tool".to_string(),
258 None,
259 created_at,
260 tool_output_content(call_id, output),
261 raw.clone(),
262 Vec::new(),
263 ))
264 }
265 _ => None,
266 }
267}
268
269fn normalized_message(
270 role: String,
271 author: Option<String>,
272 created_at: Option<i64>,
273 content: String,
274 extra: Value,
275 invocations: Vec<franken_agent_detection::NormalizedInvocation>,
276) -> NormalizedMessage {
277 NormalizedMessage {
278 idx: 0,
279 role,
280 author,
281 created_at,
282 content,
283 extra,
284 invocations,
285 snippets: Vec::new(),
286 }
287}
288
289fn non_empty_message(
290 role: String,
291 author: Option<String>,
292 created_at: Option<i64>,
293 content: String,
294 raw: &Value,
295) -> Option<NormalizedMessage> {
296 (!content.trim().is_empty())
297 .then(|| normalized_message(role, author, created_at, content, raw.clone(), Vec::new()))
298}
299
300fn flatten_modern_content(content: &Value) -> Option<String> {
301 if let Some(text) = content
302 .as_str()
303 .map(str::trim)
304 .filter(|text| !text.is_empty())
305 {
306 return Some(text.to_string());
307 }
308
309 let mut parts = Vec::new();
310 for item in content.as_array()? {
311 let text = modern_content_part_text(item);
312
313 let text = text.trim();
314 if !text.is_empty() {
315 parts.push(text.to_string());
316 }
317 }
318
319 (!parts.is_empty()).then(|| parts.join("\n"))
320}
321
322fn modern_content_part_text(item: &Value) -> String {
323 if let Some(text) = item.as_str() {
324 return text.to_string();
325 }
326
327 let item_type = item.get("type").and_then(Value::as_str);
328 if matches!(
329 item_type,
330 None | Some("text") | Some("input_text") | Some("output_text")
331 ) {
332 return item
333 .get("text")
334 .and_then(Value::as_str)
335 .unwrap_or("")
336 .to_string();
337 }
338
339 if item_type == Some("tool_use") {
340 let tool_name = item
341 .get("name")
342 .and_then(Value::as_str)
343 .unwrap_or("unknown");
344 let detail = item
345 .get("input")
346 .and_then(|input| {
347 input
348 .get("description")
349 .or_else(|| input.get("file_path"))
350 .or_else(|| input.get("path"))
351 .or_else(|| input.get("command"))
352 })
353 .and_then(Value::as_str)
354 .unwrap_or("")
355 .trim();
356 return if detail.is_empty() {
357 format!("[Tool: {tool_name}]")
358 } else {
359 format!("[Tool: {tool_name} - {detail}]")
360 };
361 }
362
363 String::new()
364}
365
366fn tool_call_content(tool_name: &str, arguments: Option<&Value>) -> String {
367 let mut content = format!("[Tool: {tool_name}]");
368 if let Some(arguments) = arguments.and_then(argument_text) {
369 content.push('\n');
370 content.push_str(&arguments);
371 }
372 content
373}
374
375fn tool_output_content(call_id: Option<&str>, output: &str) -> String {
376 let label = call_id.map_or_else(
377 || "[Tool output]".to_string(),
378 |id| format!("[Tool output: {id}]"),
379 );
380 let output = truncate_tool_output(output.trim());
381 if output.is_empty() {
382 label
383 } else {
384 format!("{label}\n{output}")
385 }
386}
387
388fn argument_text(arguments: &Value) -> Option<String> {
389 let text = match arguments {
390 Value::String(text) => text.trim().to_string(),
391 other => serde_json::to_string(other).ok()?,
392 };
393 (!text.is_empty()).then_some(text)
394}
395
396fn normalize_invocation_arguments(arguments: Value) -> Option<Value> {
397 match arguments {
398 Value::String(text) => serde_json::from_str(&text)
399 .ok()
400 .or_else(|| (!text.trim().is_empty()).then_some(Value::String(text))),
401 Value::Null => None,
402 other => Some(other),
403 }
404}
405
406fn truncate_tool_output(output: &str) -> String {
407 let mut truncated = String::new();
408 let mut chars = output.chars();
409 for _ in 0..MAX_INDEXED_TOOL_OUTPUT_CHARS {
410 let Some(ch) = chars.next() else {
411 return output.to_string();
412 };
413 truncated.push(ch);
414 }
415 let omitted = chars.count();
416 truncated.push_str(&format!(
417 "\n[truncated {omitted} additional chars from tool output]"
418 ));
419 truncated
420}
421
422#[derive(Debug, Clone, PartialEq, Eq, Hash)]
423struct ModernCodexMessageSignature {
424 role: String,
425 author: Option<String>,
426 created_at: Option<i64>,
427 content_hash: [u8; 32],
428}
429
430fn modern_codex_message_signature(message: &NormalizedMessage) -> ModernCodexMessageSignature {
431 ModernCodexMessageSignature {
432 role: message.role.clone(),
433 author: message.author.clone(),
434 created_at: message.created_at,
435 content_hash: *blake3::hash(message.content.as_bytes()).as_bytes(),
436 }
437}
438
439fn modern_codex_raw_signature(raw: &Value) -> [u8; 32] {
440 let mut bytes = Vec::new();
441 if serde_json::to_writer(&mut bytes, raw).is_err() {
442 bytes.extend_from_slice(raw.to_string().as_bytes());
443 }
444 *blake3::hash(&bytes).as_bytes()
445}
446
447fn modern_codex_message_call_ids(message: &NormalizedMessage) -> impl Iterator<Item = String> + '_ {
448 message
449 .invocations
450 .iter()
451 .filter_map(|invocation| invocation.call_id.clone())
452}
453
454fn message_already_indexed(
455 seen_messages: &HashSet<ModernCodexMessageSignature>,
456 seen_call_ids: &HashSet<String>,
457 candidate: &NormalizedMessage,
458) -> bool {
459 seen_messages.contains(&modern_codex_message_signature(candidate))
460 || candidate
461 .invocations
462 .iter()
463 .filter_map(|invocation| invocation.call_id.as_deref())
464 .any(|call_id| seen_call_ids.contains(call_id))
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470
471 fn message(content: &str, call_id: Option<&str>) -> NormalizedMessage {
472 NormalizedMessage {
473 idx: 0,
474 role: "assistant".to_string(),
475 author: None,
476 created_at: Some(1_700_000_000_000),
477 content: content.to_string(),
478 extra: Value::Null,
479 invocations: call_id
480 .map(|call_id| {
481 vec![franken_agent_detection::NormalizedInvocation {
482 kind: "tool".to_string(),
483 name: "shell".to_string(),
484 raw_name: None,
485 call_id: Some(call_id.to_string()),
486 arguments: None,
487 }]
488 })
489 .unwrap_or_default(),
490 snippets: Vec::new(),
491 }
492 }
493
494 #[test]
495 fn modern_codex_duplicate_detection_uses_precomputed_sets() {
496 let existing = message("canonical response", Some("call-1"));
497 let mut seen_messages = HashSet::from([modern_codex_message_signature(&existing)]);
498 let mut seen_call_ids: HashSet<String> = modern_codex_message_call_ids(&existing).collect();
499
500 assert!(message_already_indexed(
501 &seen_messages,
502 &seen_call_ids,
503 &message("canonical response", None)
504 ));
505 assert!(message_already_indexed(
506 &seen_messages,
507 &seen_call_ids,
508 &message("same tool call, changed wording", Some("call-1"))
509 ));
510
511 let fresh = message("fresh response", Some("call-2"));
512 assert!(!message_already_indexed(
513 &seen_messages,
514 &seen_call_ids,
515 &fresh
516 ));
517 seen_messages.insert(modern_codex_message_signature(&fresh));
518 seen_call_ids.extend(modern_codex_message_call_ids(&fresh));
519 assert!(message_already_indexed(
520 &seen_messages,
521 &seen_call_ids,
522 &fresh
523 ));
524 }
525}