Skip to main content

nexus_memory_hooks/
transcript.rs

1//! Transcript streamer — reads Claude Code JSONL transcripts and formats
2//! them for Nexus ingest.
3//!
4//! Ported from `letta-ai/claude-subconscious/scripts/transcript_utils.ts`.
5
6use std::fs::File;
7use std::io::{self, BufRead, BufReader};
8use std::path::Path;
9
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use tracing::debug;
13
14/// Maximum bytes per line — prevents OOM from malformed transcripts.
15const MAX_LINE_BYTES: usize = 10 * 1024 * 1024; // 10 MB
16
17/// Maximum bytes for tool result content stored in memory (≥8K tokens).
18const MAX_TOOL_RESULT_CONTENT: usize = 12000;
19
20/// A parsed entry from a Claude Code JSONL transcript.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct TranscriptEntry {
23    /// Role: "user", "assistant", "system"
24    pub role: String,
25    /// Extracted text content
26    pub text: String,
27    /// Thinking content (if any)
28    pub thinking: Option<String>,
29    /// Tool calls made in this entry
30    pub tool_calls: Vec<ToolCall>,
31    /// Tool results received in this entry
32    pub tool_results: Vec<ToolResult>,
33    /// Original message type from JSONL
34    pub message_type: String,
35    /// Timestamp
36    pub timestamp: Option<DateTime<Utc>>,
37    /// Position index in the transcript (for sync state tracking)
38    pub index: usize,
39}
40
41/// Truncate an owned string to a maximum byte length, respecting UTF-8 boundaries.
42/// The returned string (including the ellipsis suffix) never exceeds `max_len` bytes.
43fn truncate_owned(s: String, max_len: usize) -> String {
44    if s.len() <= max_len {
45        return s;
46    }
47    const ELLIPSIS: &str = "…";
48    if max_len <= ELLIPSIS.len() {
49        return String::new();
50    }
51    let mut end = max_len - ELLIPSIS.len();
52    while end > 0 && !s.is_char_boundary(end) {
53        end -= 1;
54    }
55    format!("{}{}", &s[..end], ELLIPSIS)
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct ToolCall {
60    pub id: String,
61    pub name: String,
62    pub input_summary: String,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ToolResult {
67    pub tool_use_id: String,
68    pub content: String,
69    pub is_error: bool,
70}
71
72/// Formatted entry ready for Nexus ingest-hook-event.
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct IngestEntry {
75    pub role: String,
76    pub content: String,
77    pub timestamp: Option<DateTime<Utc>>,
78}
79
80/// Read a Claude Code JSONL transcript file and parse entries.
81pub fn read_transcript(path: &Path) -> io::Result<Vec<TranscriptEntry>> {
82    if !path.exists() {
83        return Ok(Vec::new());
84    }
85
86    let file = File::open(path)?;
87    let reader = BufReader::new(file);
88    let mut entries = Vec::new();
89
90    for (index, line) in reader.lines().enumerate() {
91        let line = match line {
92            Ok(l) => l,
93            Err(e) => {
94                debug!("Failed to read line {index}: {e}");
95                continue;
96            }
97        };
98
99        if line.len() > MAX_LINE_BYTES {
100            debug!("Skipping oversized line {index}: {} bytes", line.len());
101            continue;
102        }
103
104        if line.trim().is_empty() {
105            continue;
106        }
107
108        match serde_json::from_str::<serde_json::Value>(&line) {
109            Ok(msg) => {
110                if let Some(entry) = parse_transcript_message(&msg, index) {
111                    entries.push(entry);
112                }
113            }
114            Err(e) => {
115                debug!("Failed to parse line {index}: {e}");
116            }
117        }
118    }
119
120    Ok(entries)
121}
122
123/// Read only entries strictly after `start_index` (exclusive) for incremental sync.
124/// Pass `None` to read all entries (never synced).
125pub fn read_transcript_from(
126    path: &Path,
127    start_index: Option<usize>,
128) -> io::Result<Vec<TranscriptEntry>> {
129    if !path.exists() {
130        return Ok(Vec::new());
131    }
132
133    // Optimization: if never synced, read via fast path
134    let start_index = match start_index {
135        Some(idx) => idx,
136        None => return read_transcript(path),
137    };
138
139    let file = File::open(path)?;
140    let reader = BufReader::new(file);
141    let mut entries = Vec::new();
142
143    for (index, line) in reader.lines().enumerate() {
144        if index <= start_index {
145            continue;
146        }
147        let line = match line {
148            Ok(l) => l,
149            Err(e) => {
150                debug!("Failed to read line {index}: {e}");
151                continue;
152            }
153        };
154
155        if line.len() > MAX_LINE_BYTES {
156            debug!("Skipping oversized line {index}: {} bytes", line.len());
157            continue;
158        }
159
160        if line.trim().is_empty() {
161            continue;
162        }
163
164        match serde_json::from_str::<serde_json::Value>(&line) {
165            Ok(msg) => {
166                if let Some(entry) = parse_transcript_message(&msg, index) {
167                    entries.push(entry);
168                }
169            }
170            Err(e) => {
171                debug!("Failed to parse line {index}: {e}");
172            }
173        }
174    }
175
176    Ok(entries)
177}
178
179/// Format transcript entries for Nexus ingest.
180/// Produces a single concatenated string suitable for `nexus ingest-hook-event`.
181pub fn format_for_ingest(
182    entries: &[TranscriptEntry],
183    max_chars_per_entry: usize,
184) -> Vec<IngestEntry> {
185    let mut formatted = Vec::new();
186    let mut tool_name_map: std::collections::HashMap<String, String> =
187        std::collections::HashMap::new();
188
189    for entry in entries {
190        // Skip internal message types
191        if entry.message_type == "file-history-snapshot" || entry.message_type == "system" {
192            continue;
193        }
194
195        // Handle summary messages
196        if entry.message_type == "summary" && !entry.text.is_empty() {
197            formatted.push(IngestEntry {
198                role: "system".to_string(),
199                content: format!(
200                    "[Session Summary]: {}",
201                    truncate(&entry.text, max_chars_per_entry)
202                ),
203                timestamp: entry.timestamp,
204            });
205            continue;
206        }
207
208        let mut parts = Vec::new();
209
210        // Add thinking (summarized)
211        if let Some(ref thinking) = entry.thinking {
212            parts.push(format!("[Thinking]: {}", truncate(thinking, 2000)));
213        }
214
215        // Add tool calls
216        for tc in &entry.tool_calls {
217            tool_name_map.insert(tc.id.clone(), tc.name.clone());
218            parts.push(format!(
219                "[Tool: {}] {}",
220                tc.name,
221                truncate(&tc.input_summary, 2000)
222            ));
223        }
224
225        // Add tool results
226        for tr in &entry.tool_results {
227            let tool_name = tool_name_map
228                .get(&tr.tool_use_id)
229                .cloned()
230                .unwrap_or_else(|| tr.tool_use_id.clone());
231            let prefix = if tr.is_error {
232                "[Tool Error"
233            } else {
234                "[Tool Result"
235            };
236            let truncated = truncate(&tr.content, 12000);
237            parts.push(format!("{prefix}: {tool_name}]\n{truncated}"));
238        }
239
240        // Add main text
241        if !entry.text.is_empty() {
242            parts.push(truncate(&entry.text, max_chars_per_entry).to_string());
243        }
244
245        if !parts.is_empty() {
246            formatted.push(IngestEntry {
247                role: entry.role.clone(),
248                content: parts.join("\n"),
249                timestamp: entry.timestamp,
250            });
251        }
252    }
253
254    formatted
255}
256
257/// Build a JSON payload for `nexus ingest-hook-event` from transcript entries.
258pub fn build_ingest_payload(
259    entries: &[IngestEntry],
260    agent: &str,
261    session_id: &str,
262    cwd: &str,
263) -> serde_json::Value {
264    let mut parts = Vec::new();
265    for entry in entries {
266        parts.push(format!("[{}]: {}", entry.role, entry.content));
267    }
268    let joined = parts.join("\n\n");
269
270    serde_json::json!({
271        // Scoreable fields — the normalizer maps these to derive_candidates inputs:
272        //   tool_name + tool_input → 0.3 signal
273        //   assistant_message_text → 0.2 signal (if len > 20)
274        //   Total: 0.5, above the 0.4 threshold
275        "tool_name": "transcript_ingest",
276        "tool_input": {
277            "entry_count": entries.len(),
278            "session_id": session_id,
279            "agent": agent,
280        },
281        "assistant_message_text": &joined,
282        // Structured transcript for downstream consumers
283        "transcript": {
284            "entries": entries.iter().map(|e| serde_json::json!({
285                "role": e.role,
286                "content": e.content,
287                "timestamp": e.timestamp.map(|t| t.to_rfc3339()),
288            })).collect::<Vec<_>>(),
289            "session_id": session_id,
290            "agent": agent,
291            "cwd": cwd,
292        },
293        "content": &joined,
294        "session_id": session_id,
295        "cwd": cwd,
296    })
297}
298
299/// Parse a single JSONL message into a TranscriptEntry.
300fn parse_transcript_message(msg: &serde_json::Value, index: usize) -> Option<TranscriptEntry> {
301    let msg_type = msg.get("type")?.as_str()?;
302
303    match msg_type {
304        "summary" => {
305            let summary = msg
306                .get("summary")
307                .and_then(|s| s.as_str())
308                .unwrap_or("")
309                .to_string();
310            Some(TranscriptEntry {
311                role: "system".to_string(),
312                text: summary,
313                thinking: None,
314                tool_calls: Vec::new(),
315                tool_results: Vec::new(),
316                message_type: "summary".to_string(),
317                timestamp: parse_timestamp(msg.get("timestamp")),
318                index,
319            })
320        }
321        "user" => {
322            let (text, tool_results) = extract_content(msg);
323            Some(TranscriptEntry {
324                role: "user".to_string(),
325                text,
326                thinking: None,
327                tool_calls: Vec::new(),
328                tool_results,
329                message_type: "user".to_string(),
330                timestamp: parse_timestamp(msg.get("timestamp")),
331                index,
332            })
333        }
334        "assistant" => {
335            let (text, thinking, tool_calls) = extract_assistant_content(msg);
336            Some(TranscriptEntry {
337                role: "assistant".to_string(),
338                text,
339                thinking,
340                tool_calls,
341                tool_results: Vec::new(),
342                message_type: "assistant".to_string(),
343                timestamp: parse_timestamp(msg.get("timestamp")),
344                index,
345            })
346        }
347        "system" | "file-history-snapshot" => {
348            // Include but will be filtered during formatting
349            Some(TranscriptEntry {
350                role: "system".to_string(),
351                text: String::new(),
352                thinking: None,
353                tool_calls: Vec::new(),
354                tool_results: Vec::new(),
355                message_type: msg_type.to_string(),
356                timestamp: parse_timestamp(msg.get("timestamp")),
357                index,
358            })
359        }
360        _ => None,
361    }
362}
363
364/// Extract text content and tool results from a user message.
365fn extract_content(msg: &serde_json::Value) -> (String, Vec<ToolResult>) {
366    let content = msg
367        .get("message")
368        .and_then(|m| m.get("content"))
369        .or_else(|| msg.get("content"));
370
371    match content {
372        Some(serde_json::Value::String(s)) => (s.clone(), Vec::new()),
373        Some(serde_json::Value::Array(blocks)) => {
374            let mut text_parts = Vec::new();
375            let mut results = Vec::new();
376
377            for block in blocks {
378                let block_type = block.get("type").and_then(|t| t.as_str()).unwrap_or("");
379                match block_type {
380                    "text" => {
381                        if let Some(text) = block.get("text").and_then(|t| t.as_str()) {
382                            text_parts.push(text.to_string());
383                        }
384                    }
385                    "tool_result" => {
386                        let tool_use_id = block
387                            .get("tool_use_id")
388                            .and_then(|v| v.as_str())
389                            .unwrap_or_default()
390                            .to_string();
391                        let is_error = block
392                            .get("is_error")
393                            .and_then(|v| v.as_bool())
394                            .unwrap_or(false);
395                        let content_str = block
396                            .get("content")
397                            .map(|c| {
398                                c.as_str()
399                                    .map(|s| s.to_string())
400                                    .unwrap_or_else(|| serde_json::to_string(c).unwrap_or_default())
401                            })
402                            .unwrap_or_default();
403                        let content_str = truncate_owned(content_str, MAX_TOOL_RESULT_CONTENT);
404                        results.push(ToolResult {
405                            tool_use_id,
406                            content: content_str,
407                            is_error,
408                        });
409                    }
410                    _ => {}
411                }
412            }
413
414            (text_parts.join("\n"), results)
415        }
416        _ => (String::new(), Vec::new()),
417    }
418}
419
420/// Extract text, thinking, and tool calls from an assistant message.
421/// Tool results only appear in user messages.
422fn extract_assistant_content(msg: &serde_json::Value) -> (String, Option<String>, Vec<ToolCall>) {
423    let content = msg
424        .get("message")
425        .and_then(|m| m.get("content"))
426        .or_else(|| msg.get("content"));
427
428    match content {
429        Some(serde_json::Value::String(s)) => (s.clone(), None, Vec::new()),
430        Some(serde_json::Value::Array(blocks)) => {
431            let mut text_parts = Vec::new();
432            let mut thinking_parts = Vec::new();
433            let mut tool_calls = Vec::new();
434
435            for block in blocks {
436                let block_type = block.get("type").and_then(|t| t.as_str()).unwrap_or("");
437                match block_type {
438                    "text" => {
439                        if let Some(text) = block.get("text").and_then(|t| t.as_str()) {
440                            text_parts.push(text.to_string());
441                        }
442                    }
443                    "thinking" => {
444                        if let Some(thinking) = block.get("thinking").and_then(|t| t.as_str()) {
445                            thinking_parts.push(thinking.to_string());
446                        }
447                    }
448                    "tool_use" => {
449                        let id = block
450                            .get("id")
451                            .and_then(|i| i.as_str())
452                            .unwrap_or("unknown")
453                            .to_string();
454                        let name = block
455                            .get("name")
456                            .and_then(|n| n.as_str())
457                            .unwrap_or("unknown")
458                            .to_string();
459                        let input_summary = summarize_tool_input(
460                            &name,
461                            block.get("input").unwrap_or(&serde_json::Value::Null),
462                        );
463                        tool_calls.push(ToolCall {
464                            id,
465                            name,
466                            input_summary,
467                        });
468                    }
469                    _ => {}
470                }
471            }
472
473            let thinking = if thinking_parts.is_empty() {
474                None
475            } else {
476                Some(thinking_parts.join("\n"))
477            };
478
479            (text_parts.join("\n"), thinking, tool_calls)
480        }
481        _ => (String::new(), None, Vec::new()),
482    }
483}
484
485/// Produce a concise summary of a tool's input, mirroring Letta's formatToolInput.
486fn summarize_tool_input(tool_name: &str, input: &serde_json::Value) -> String {
487    match tool_name {
488        "Read" | "Edit" | "Write" => input
489            .get("file_path")
490            .and_then(|p| p.as_str())
491            .map(|s| s.to_string())
492            .unwrap_or_default(),
493        "Bash" => input
494            .get("command")
495            .and_then(|c| c.as_str())
496            .map(|s| truncate(s, 100).to_string())
497            .unwrap_or_default(),
498        "Glob" => input
499            .get("pattern")
500            .and_then(|p| p.as_str())
501            .map(|s| s.to_string())
502            .unwrap_or_default(),
503        "Grep" => input
504            .get("pattern")
505            .and_then(|p| p.as_str())
506            .map(|s| s.to_string())
507            .unwrap_or_default(),
508        "WebFetch" => input
509            .get("url")
510            .and_then(|u| u.as_str())
511            .map(|s| s.to_string())
512            .unwrap_or_default(),
513        "WebSearch" => input
514            .get("query")
515            .and_then(|q| q.as_str())
516            .map(|s| s.to_string())
517            .unwrap_or_default(),
518        "Task" => input
519            .get("description")
520            .and_then(|d| d.as_str())
521            .map(|s| s.to_string())
522            .unwrap_or_default(),
523        _ => {
524            let s = serde_json::to_string(input).unwrap_or_default();
525            truncate(&s, 100).to_string()
526        }
527    }
528}
529
530fn parse_timestamp(val: Option<&serde_json::Value>) -> Option<DateTime<Utc>> {
531    val.and_then(|v| v.as_str())
532        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
533        .map(|dt| dt.with_timezone(&Utc))
534}
535
536/// Truncate text to a maximum length.
537fn truncate(text: &str, max_len: usize) -> &str {
538    if text.len() <= max_len {
539        return text;
540    }
541    // Find a safe UTF-8 boundary
542    let mut end = max_len;
543    while end > 0 && !text.is_char_boundary(end) {
544        end -= 1;
545    }
546    &text[..end]
547}
548
549#[cfg(test)]
550mod tests {
551    use super::*;
552    use std::io::Write;
553
554    #[test]
555    fn read_empty_transcript() {
556        let dir = tempfile::TempDir::new().unwrap();
557        let path = dir.path().join("transcript.jsonl");
558        File::create(&path).unwrap();
559        let entries = read_transcript(&path).unwrap();
560        assert!(entries.is_empty());
561    }
562
563    #[test]
564    fn read_nonexistent_transcript() {
565        let entries = read_transcript(Path::new("/no/such/file.jsonl")).unwrap();
566        assert!(entries.is_empty());
567    }
568
569    #[test]
570    fn parse_user_message() {
571        let dir = tempfile::TempDir::new().unwrap();
572        let path = dir.path().join("transcript.jsonl");
573        let mut file = File::create(&path).unwrap();
574        writeln!(
575            file,
576            r#"{{"type":"user","message":{{"role":"user","content":"Hello world"}}}}"#
577        )
578        .unwrap();
579
580        let entries = read_transcript(&path).unwrap();
581        assert_eq!(entries.len(), 1);
582        assert_eq!(entries[0].role, "user");
583        assert_eq!(entries[0].text, "Hello world");
584        assert_eq!(entries[0].index, 0);
585    }
586
587    #[test]
588    fn parse_assistant_with_tool_use() {
589        let dir = tempfile::TempDir::new().unwrap();
590        let path = dir.path().join("transcript.jsonl");
591        let mut file = File::create(&path).unwrap();
592        writeln!(
593            file,
594            r#"{{"type":"assistant","message":{{"role":"assistant","content":[{{"type":"text","text":"Let me read that file"}},{{"type":"tool_use","name":"Read","id":"call_123","input":{{"file_path":"/src/main.rs"}}}}]}}}}"#
595        )
596        .unwrap();
597
598        let entries = read_transcript(&path).unwrap();
599        assert_eq!(entries.len(), 1);
600        assert_eq!(entries[0].role, "assistant");
601        assert_eq!(entries[0].text, "Let me read that file");
602        assert_eq!(entries[0].tool_calls.len(), 1);
603        assert_eq!(entries[0].tool_calls[0].name, "Read");
604        assert_eq!(entries[0].tool_calls[0].id, "call_123");
605        assert_eq!(entries[0].tool_calls[0].input_summary, "/src/main.rs");
606    }
607
608    #[test]
609    fn skip_system_and_file_history() {
610        let dir = tempfile::TempDir::new().unwrap();
611        let path = dir.path().join("transcript.jsonl");
612        let mut file = File::create(&path).unwrap();
613        writeln!(file, r#"{{"type":"system","subtype":"init"}}"#).unwrap();
614        writeln!(
615            file,
616            r#"{{"type":"file-history-snapshot","snapshot":{{}}}}"#
617        )
618        .unwrap();
619        writeln!(
620            file,
621            r#"{{"type":"user","message":{{"role":"user","content":"real content"}}}}"#
622        )
623        .unwrap();
624
625        let entries = read_transcript(&path).unwrap();
626        assert_eq!(entries.len(), 3);
627
628        // Formatting should skip system/file-history
629        let ingest = format_for_ingest(&entries, 500);
630        assert_eq!(ingest.len(), 1);
631        assert_eq!(ingest[0].role, "user");
632    }
633
634    #[test]
635    fn read_from_index() {
636        let dir = tempfile::TempDir::new().unwrap();
637        let path = dir.path().join("transcript.jsonl");
638        let mut file = File::create(&path).unwrap();
639        for i in 0..5 {
640            writeln!(
641                file,
642                r#"{{"type":"user","message":{{"role":"user","content":"msg {i}"}}}}"#
643            )
644            .unwrap();
645        }
646
647        let entries = read_transcript_from(&path, Some(2)).unwrap();
648        assert_eq!(entries.len(), 2); // indices 3, 4 (filter > 2)
649        assert!(entries[0].index > 2);
650    }
651
652    #[test]
653    fn format_for_ingest_truncates() {
654        let entry = TranscriptEntry {
655            role: "user".to_string(),
656            text: "a".repeat(1000),
657            thinking: None,
658            tool_calls: Vec::new(),
659            tool_results: Vec::new(),
660            message_type: "user".to_string(),
661            timestamp: None,
662            index: 0,
663        };
664        let result = format_for_ingest(&[entry], 50);
665        assert_eq!(result.len(), 1);
666        assert!(result[0].content.len() < 100);
667    }
668
669    #[test]
670    fn summarize_tool_input_known_tools() {
671        assert_eq!(
672            summarize_tool_input("Read", &serde_json::json!({"file_path": "/src/main.rs"})),
673            "/src/main.rs"
674        );
675        assert_eq!(
676            summarize_tool_input("Bash", &serde_json::json!({"command": "cargo build"})),
677            "cargo build"
678        );
679    }
680
681    #[test]
682    fn build_ingest_payload_structure() {
683        let entries = vec![IngestEntry {
684            role: "user".to_string(),
685            content: "hello".to_string(),
686            timestamp: None,
687        }];
688        let payload = build_ingest_payload(&entries, "claude-code", "sess-1", "/home/user");
689        assert_eq!(payload["session_id"], "sess-1");
690        assert_eq!(payload["cwd"], "/home/user");
691    }
692
693    #[test]
694    fn truncate_handles_multibyte() {
695        let text = "héllo wörld";
696        let result = truncate(text, 5);
697        assert!(result.len() <= 5);
698    }
699}