coding_agent_search/connectors/
codex.rs1use std::collections::HashSet;
2use std::fs::File;
3use std::io::{BufRead, BufReader};
4
5use anyhow::Result;
6use serde_json::Value;
7
8use super::{
9 Connector, DetectionResult, DiscoveredSourceFile, NormalizedConversation, NormalizedMessage,
10 ScanContext, parse_timestamp, reindex_messages,
11};
12
13const MAX_INDEXED_TOOL_OUTPUT_CHARS: usize = 128 * 1024;
14
15pub struct CodexConnector {
16 inner: franken_agent_detection::CodexConnector,
17}
18
19impl Default for CodexConnector {
20 fn default() -> Self {
21 Self::new()
22 }
23}
24
25impl CodexConnector {
26 #[must_use]
27 pub fn new() -> Self {
28 Self {
29 inner: franken_agent_detection::CodexConnector::new(),
30 }
31 }
32}
33
34impl Connector for CodexConnector {
35 fn detect(&self) -> DetectionResult {
36 self.inner.detect()
37 }
38
39 fn scan(&self, ctx: &ScanContext) -> Result<Vec<NormalizedConversation>> {
40 let mut conversations = self.inner.scan(ctx)?;
41 for conversation in &mut conversations {
42 augment_modern_codex_messages(conversation);
43 }
44 Ok(conversations)
45 }
46
47 fn supports_streaming_scan(&self) -> bool {
48 self.inner.supports_streaming_scan()
49 }
50
51 fn discover_source_files(&self, ctx: &ScanContext) -> Result<Vec<DiscoveredSourceFile>> {
52 self.inner.discover_source_files(ctx)
53 }
54
55 fn scan_with_callback(
56 &self,
57 ctx: &ScanContext,
58 on_conversation: &mut dyn FnMut(NormalizedConversation) -> Result<()>,
59 ) -> Result<()> {
60 self.inner.scan_with_callback(ctx, &mut |mut conversation| {
61 augment_modern_codex_messages(&mut conversation);
62 on_conversation(conversation)
63 })
64 }
65}
66
67fn augment_modern_codex_messages(conversation: &mut NormalizedConversation) {
68 if conversation
69 .source_path
70 .extension()
71 .and_then(|ext| ext.to_str())
72 .is_none_or(|ext| !ext.eq_ignore_ascii_case("jsonl"))
73 {
74 return;
75 }
76
77 let Ok(file) = File::open(&conversation.source_path) else {
78 return;
79 };
80
81 let mut seen_messages: HashSet<ModernCodexMessageSignature> = conversation
82 .messages
83 .iter()
84 .map(modern_codex_message_signature)
85 .collect();
86 let mut seen_call_ids: HashSet<String> = conversation
87 .messages
88 .iter()
89 .flat_map(modern_codex_message_call_ids)
90 .collect();
91 let mut seen_raw_entries: HashSet<[u8; 32]> = conversation
92 .messages
93 .iter()
94 .map(|message| modern_codex_raw_signature(&message.extra))
95 .collect();
96 let mut added = false;
97 for line in BufReader::new(file).lines().map_while(Result::ok) {
98 let line = line.trim();
99 if line.is_empty() {
100 continue;
101 }
102 let Ok(raw) = serde_json::from_str::<Value>(line) else {
103 continue;
104 };
105 let raw_signature = modern_codex_raw_signature(&raw);
106 if seen_raw_entries.contains(&raw_signature) {
107 continue;
108 }
109 let Some(message) = modern_codex_message(&raw) else {
110 continue;
111 };
112 if message_already_indexed(&seen_messages, &seen_call_ids, &message) {
113 seen_raw_entries.insert(raw_signature);
114 continue;
115 }
116 seen_messages.insert(modern_codex_message_signature(&message));
117 seen_call_ids.extend(modern_codex_message_call_ids(&message));
118 seen_raw_entries.insert(raw_signature);
119 conversation.messages.push(message);
120 added = true;
121 }
122
123 if added {
124 conversation.messages.sort_by(|left, right| {
125 left.created_at
126 .cmp(&right.created_at)
127 .then_with(|| left.idx.cmp(&right.idx))
128 });
129 reindex_messages(&mut conversation.messages);
130 }
131}
132
133fn modern_codex_message(raw: &Value) -> Option<NormalizedMessage> {
134 let entry_type = raw.get("type").and_then(Value::as_str)?;
135 let payload = raw.get("payload")?;
136 let created_at = raw.get("timestamp").and_then(parse_timestamp);
137
138 match entry_type {
139 "response_item" => response_item_message(payload, created_at, raw),
140 "event_msg" => event_message(payload, created_at, raw),
141 _ => None,
142 }
143}
144
145fn response_item_message(
146 payload: &Value,
147 created_at: Option<i64>,
148 raw: &Value,
149) -> Option<NormalizedMessage> {
150 match payload.get("type").and_then(Value::as_str) {
151 Some("message") | None => {
152 let content = payload.get("content").and_then(flatten_modern_content)?;
153 let role = payload
154 .get("role")
155 .and_then(Value::as_str)
156 .unwrap_or("agent")
157 .to_string();
158 Some(normalized_message(
159 role,
160 None,
161 created_at,
162 content,
163 raw.clone(),
164 payload.get("content").map_or_else(
165 Vec::new,
166 franken_agent_detection::extract_invocations_from_content_blocks,
167 ),
168 ))
169 }
170 Some("function_call") => {
171 let tool_name = payload
172 .get("name")
173 .and_then(Value::as_str)
174 .unwrap_or("unknown");
175 let arguments = payload.get("arguments").cloned();
176 let content = tool_call_content(tool_name, arguments.as_ref());
177 let call_id = payload
178 .get("call_id")
179 .or_else(|| payload.get("id"))
180 .and_then(Value::as_str)
181 .map(str::to_string);
182 Some(normalized_message(
183 "assistant".to_string(),
184 None,
185 created_at,
186 content,
187 raw.clone(),
188 vec![franken_agent_detection::NormalizedInvocation {
189 kind: "tool".to_string(),
190 name: tool_name.to_string(),
191 raw_name: None,
192 call_id,
193 arguments: arguments.and_then(normalize_invocation_arguments),
194 }],
195 ))
196 }
197 Some("function_call_output") => {
198 let output = payload.get("output").and_then(Value::as_str)?;
199 let call_id = payload.get("call_id").and_then(Value::as_str);
200 Some(normalized_message(
201 "tool".to_string(),
202 None,
203 created_at,
204 tool_output_content(call_id, output),
205 raw.clone(),
206 Vec::new(),
207 ))
208 }
209 _ => None,
210 }
211}
212
213fn event_message(
214 payload: &Value,
215 created_at: Option<i64>,
216 raw: &Value,
217) -> Option<NormalizedMessage> {
218 match payload.get("type").and_then(Value::as_str) {
219 Some("agent_message") => {
220 let content = payload
221 .get("message")
222 .or_else(|| payload.get("text"))
223 .and_then(Value::as_str)?
224 .trim()
225 .to_string();
226 non_empty_message("assistant".to_string(), None, created_at, content, raw)
227 }
228 Some("tool_result") => {
229 let output = payload
230 .get("output")
231 .or_else(|| payload.get("result"))
232 .and_then(Value::as_str)?;
233 let call_id = payload
234 .get("call_id")
235 .or_else(|| payload.get("id"))
236 .and_then(Value::as_str);
237 Some(normalized_message(
238 "tool".to_string(),
239 None,
240 created_at,
241 tool_output_content(call_id, output),
242 raw.clone(),
243 Vec::new(),
244 ))
245 }
246 _ => None,
247 }
248}
249
250fn normalized_message(
251 role: String,
252 author: Option<String>,
253 created_at: Option<i64>,
254 content: String,
255 extra: Value,
256 invocations: Vec<franken_agent_detection::NormalizedInvocation>,
257) -> NormalizedMessage {
258 NormalizedMessage {
259 idx: 0,
260 role,
261 author,
262 created_at,
263 content,
264 extra,
265 invocations,
266 snippets: Vec::new(),
267 }
268}
269
270fn non_empty_message(
271 role: String,
272 author: Option<String>,
273 created_at: Option<i64>,
274 content: String,
275 raw: &Value,
276) -> Option<NormalizedMessage> {
277 (!content.trim().is_empty())
278 .then(|| normalized_message(role, author, created_at, content, raw.clone(), Vec::new()))
279}
280
281fn flatten_modern_content(content: &Value) -> Option<String> {
282 if let Some(text) = content
283 .as_str()
284 .map(str::trim)
285 .filter(|text| !text.is_empty())
286 {
287 return Some(text.to_string());
288 }
289
290 let mut parts = Vec::new();
291 for item in content.as_array()? {
292 let text = modern_content_part_text(item);
293
294 let text = text.trim();
295 if !text.is_empty() {
296 parts.push(text.to_string());
297 }
298 }
299
300 (!parts.is_empty()).then(|| parts.join("\n"))
301}
302
303fn modern_content_part_text(item: &Value) -> String {
304 if let Some(text) = item.as_str() {
305 return text.to_string();
306 }
307
308 let item_type = item.get("type").and_then(Value::as_str);
309 if matches!(
310 item_type,
311 None | Some("text") | Some("input_text") | Some("output_text")
312 ) {
313 return item
314 .get("text")
315 .and_then(Value::as_str)
316 .unwrap_or("")
317 .to_string();
318 }
319
320 if item_type == Some("tool_use") {
321 let tool_name = item
322 .get("name")
323 .and_then(Value::as_str)
324 .unwrap_or("unknown");
325 let detail = item
326 .get("input")
327 .and_then(|input| {
328 input
329 .get("description")
330 .or_else(|| input.get("file_path"))
331 .or_else(|| input.get("path"))
332 .or_else(|| input.get("command"))
333 })
334 .and_then(Value::as_str)
335 .unwrap_or("")
336 .trim();
337 return if detail.is_empty() {
338 format!("[Tool: {tool_name}]")
339 } else {
340 format!("[Tool: {tool_name} - {detail}]")
341 };
342 }
343
344 String::new()
345}
346
347fn tool_call_content(tool_name: &str, arguments: Option<&Value>) -> String {
348 let mut content = format!("[Tool: {tool_name}]");
349 if let Some(arguments) = arguments.and_then(argument_text) {
350 content.push('\n');
351 content.push_str(&arguments);
352 }
353 content
354}
355
356fn tool_output_content(call_id: Option<&str>, output: &str) -> String {
357 let label = call_id.map_or_else(
358 || "[Tool output]".to_string(),
359 |id| format!("[Tool output: {id}]"),
360 );
361 let output = truncate_tool_output(output.trim());
362 if output.is_empty() {
363 label
364 } else {
365 format!("{label}\n{output}")
366 }
367}
368
369fn argument_text(arguments: &Value) -> Option<String> {
370 let text = match arguments {
371 Value::String(text) => text.trim().to_string(),
372 other => serde_json::to_string(other).ok()?,
373 };
374 (!text.is_empty()).then_some(text)
375}
376
377fn normalize_invocation_arguments(arguments: Value) -> Option<Value> {
378 match arguments {
379 Value::String(text) => serde_json::from_str(&text)
380 .ok()
381 .or_else(|| (!text.trim().is_empty()).then_some(Value::String(text))),
382 Value::Null => None,
383 other => Some(other),
384 }
385}
386
387fn truncate_tool_output(output: &str) -> String {
388 let mut truncated = String::new();
389 let mut chars = output.chars();
390 for _ in 0..MAX_INDEXED_TOOL_OUTPUT_CHARS {
391 let Some(ch) = chars.next() else {
392 return output.to_string();
393 };
394 truncated.push(ch);
395 }
396 let omitted = chars.count();
397 truncated.push_str(&format!(
398 "\n[truncated {omitted} additional chars from tool output]"
399 ));
400 truncated
401}
402
403#[derive(Debug, Clone, PartialEq, Eq, Hash)]
404struct ModernCodexMessageSignature {
405 role: String,
406 author: Option<String>,
407 created_at: Option<i64>,
408 content_hash: [u8; 32],
409}
410
411fn modern_codex_message_signature(message: &NormalizedMessage) -> ModernCodexMessageSignature {
412 ModernCodexMessageSignature {
413 role: message.role.clone(),
414 author: message.author.clone(),
415 created_at: message.created_at,
416 content_hash: *blake3::hash(message.content.as_bytes()).as_bytes(),
417 }
418}
419
420fn modern_codex_raw_signature(raw: &Value) -> [u8; 32] {
421 let mut bytes = Vec::new();
422 if serde_json::to_writer(&mut bytes, raw).is_err() {
423 bytes.extend_from_slice(raw.to_string().as_bytes());
424 }
425 *blake3::hash(&bytes).as_bytes()
426}
427
428fn modern_codex_message_call_ids(message: &NormalizedMessage) -> impl Iterator<Item = String> + '_ {
429 message
430 .invocations
431 .iter()
432 .filter_map(|invocation| invocation.call_id.clone())
433}
434
435fn message_already_indexed(
436 seen_messages: &HashSet<ModernCodexMessageSignature>,
437 seen_call_ids: &HashSet<String>,
438 candidate: &NormalizedMessage,
439) -> bool {
440 seen_messages.contains(&modern_codex_message_signature(candidate))
441 || candidate
442 .invocations
443 .iter()
444 .filter_map(|invocation| invocation.call_id.as_deref())
445 .any(|call_id| seen_call_ids.contains(call_id))
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451
452 fn message(content: &str, call_id: Option<&str>) -> NormalizedMessage {
453 NormalizedMessage {
454 idx: 0,
455 role: "assistant".to_string(),
456 author: None,
457 created_at: Some(1_700_000_000_000),
458 content: content.to_string(),
459 extra: Value::Null,
460 invocations: call_id
461 .map(|call_id| {
462 vec![franken_agent_detection::NormalizedInvocation {
463 kind: "tool".to_string(),
464 name: "shell".to_string(),
465 raw_name: None,
466 call_id: Some(call_id.to_string()),
467 arguments: None,
468 }]
469 })
470 .unwrap_or_default(),
471 snippets: Vec::new(),
472 }
473 }
474
475 #[test]
476 fn modern_codex_duplicate_detection_uses_precomputed_sets() {
477 let existing = message("canonical response", Some("call-1"));
478 let mut seen_messages = HashSet::from([modern_codex_message_signature(&existing)]);
479 let mut seen_call_ids: HashSet<String> = modern_codex_message_call_ids(&existing).collect();
480
481 assert!(message_already_indexed(
482 &seen_messages,
483 &seen_call_ids,
484 &message("canonical response", None)
485 ));
486 assert!(message_already_indexed(
487 &seen_messages,
488 &seen_call_ids,
489 &message("same tool call, changed wording", Some("call-1"))
490 ));
491
492 let fresh = message("fresh response", Some("call-2"));
493 assert!(!message_already_indexed(
494 &seen_messages,
495 &seen_call_ids,
496 &fresh
497 ));
498 seen_messages.insert(modern_codex_message_signature(&fresh));
499 seen_call_ids.extend(modern_codex_message_call_ids(&fresh));
500 assert!(message_already_indexed(
501 &seen_messages,
502 &seen_call_ids,
503 &fresh
504 ));
505 }
506}