mi6_core/input/transcript/scanner.rs
1//! Transcript file scanner with storage integration.
2//!
3//! Provides high-level scanning functionality that orchestrates parsing,
4//! deduplication, and storage operations.
5
6use std::path::Path;
7
8use crate::model::error::ScanError;
9use crate::model::{EventBuilder, EventType, Storage};
10
11use super::{TranscriptParser, extract_first_prompt};
12
13/// Result of scanning a transcript file.
14#[derive(Debug, Clone, Default)]
15pub struct ScanResult {
16 /// Number of events inserted
17 pub inserted: usize,
18 /// Number of lines that failed to parse
19 pub parse_errors: u64,
20}
21
22/// Orchestrates transcript file scanning with storage integration.
23///
24/// `TranscriptScanner` provides a high-level API for scanning Claude Code
25/// transcript files and inserting events into storage. It handles:
26/// - Incremental parsing (only reads new content since last scan)
27/// - Deduplication by UUID
28/// - Position tracking for future scans
29///
30/// # Example
31///
32/// ```ignore
33/// use mi6_core::{Storage, TranscriptScanner};
34/// use mi6_storage_sqlite::SqliteStorage;
35/// use std::path::Path;
36///
37/// let storage = SqliteStorage::open(Path::new("mi6.db"))?;
38/// let scanner = TranscriptScanner::new(&storage, "my-machine-id");
39///
40/// let result = scanner.scan_file(Path::new("/path/to/transcript.jsonl"))?;
41/// println!("Inserted {} events", result.inserted);
42/// ```
43pub struct TranscriptScanner<'a, S: Storage> {
44 storage: &'a S,
45 machine_id: String,
46}
47
48impl<'a, S: Storage> TranscriptScanner<'a, S> {
49 /// Create a new scanner with the given storage and machine ID.
50 ///
51 /// The machine ID is used to tag events with their source machine,
52 /// enabling multi-machine aggregation.
53 pub fn new(storage: &'a S, machine_id: impl Into<String>) -> Self {
54 Self {
55 storage,
56 machine_id: machine_id.into(),
57 }
58 }
59
60 /// Scan a transcript file and insert events into storage.
61 ///
62 /// This method:
63 /// 1. Loads the last scanned position for the file
64 /// 2. Parses new entries since that position
65 /// 3. Inserts new events (checking for duplicates by UUID)
66 /// 4. Updates the position for future incremental scans
67 ///
68 /// # Returns
69 ///
70 /// Returns a [`ScanResult`] with the count of inserted events and
71 /// parse errors encountered.
72 ///
73 /// # Errors
74 ///
75 /// Returns a [`ScanError`] if:
76 /// - The file cannot be read
77 /// - Storage operations fail
78 pub fn scan_file(&self, path: &Path) -> Result<ScanResult, ScanError> {
79 // Get last scanned position (or start from beginning)
80 let start_position = self
81 .storage
82 .get_transcript_position(path)?
83 .unwrap_or_default();
84
85 // Parse the transcript file
86 let parser = TranscriptParser::new(&self.machine_id);
87 let result = parser.parse_incremental(path, &start_position)?;
88
89 let parse_errors = result.parse_errors;
90
91 // Insert new events, skipping duplicates
92 let mut inserted = 0;
93 for event in result.events {
94 // Check for duplicate by UUID in metadata
95 if let Some(metadata) = &event.metadata
96 && let Ok(meta_json) = serde_json::from_str::<serde_json::Value>(metadata)
97 && let Some(uuid) = meta_json.get("uuid").and_then(|v| v.as_str())
98 {
99 // Skip if already exists
100 if self.storage.event_exists_by_uuid(&event.session_id, uuid)? {
101 continue;
102 }
103 }
104
105 self.storage.insert(&event)?;
106 inserted += 1;
107 }
108
109 // Update position for next scan
110 self.storage
111 .set_transcript_position(path, &result.position)?;
112
113 Ok(ScanResult {
114 inserted,
115 parse_errors,
116 })
117 }
118
119 /// Scan transcripts for specific sessions by ID.
120 ///
121 /// Looks up each session, checks if it has a `transcript_path` that exists,
122 /// and scans it. Sessions without transcript paths or with missing files
123 /// are silently skipped.
124 ///
125 /// # Arguments
126 ///
127 /// * `session_ids` - Slice of session IDs to scan
128 ///
129 /// # Returns
130 ///
131 /// Returns an aggregated [`ScanResult`] with the total count of inserted
132 /// events and parse errors across all scanned files.
133 ///
134 /// # Errors
135 ///
136 /// Returns a [`ScanError`] if a storage operation fails or a transcript
137 /// file cannot be parsed.
138 pub fn scan_sessions(&self, session_ids: &[&str]) -> Result<ScanResult, ScanError> {
139 let mut total = ScanResult::default();
140 for session_id in session_ids {
141 if let Some(session) = self.storage.get_session(session_id)?
142 && let Some(ref transcript_path) = session.transcript_path
143 {
144 let path = Path::new(transcript_path);
145 if path.exists() {
146 let result = self.scan_file(path)?;
147 total.inserted += result.inserted;
148 total.parse_errors += result.parse_errors;
149 }
150 }
151 }
152 Ok(total)
153 }
154
155 /// Backfill the initial prompt for a session from its transcript file.
156 ///
157 /// This is a fallback for when the UserPromptSubmit hook doesn't fire,
158 /// such as when Claude is started with an inline prompt like
159 /// `claude 'initial prompt'`.
160 ///
161 /// The method:
162 /// 1. Checks if the session exists and has no first_user_message
163 /// 2. Reads the transcript file to find the first user prompt
164 /// 3. Creates a UserPromptSubmit event with the prompt
165 ///
166 /// This is idempotent - if the session already has a first_user_message,
167 /// this method does nothing.
168 ///
169 /// # Arguments
170 ///
171 /// * `session_id` - The session ID to backfill
172 /// * `transcript_path` - Path to the transcript file
173 ///
174 /// # Returns
175 ///
176 /// Returns `true` if a prompt was backfilled, `false` otherwise.
177 pub fn backfill_initial_prompt(
178 &self,
179 session_id: &str,
180 transcript_path: &Path,
181 ) -> Result<bool, ScanError> {
182 // Check if session exists and needs backfill
183 let session = match self.storage.get_session(session_id)? {
184 Some(s) => s,
185 None => return Ok(false),
186 };
187
188 // If session already has a first_user_message, nothing to do
189 if session.first_user_message.is_some() {
190 return Ok(false);
191 }
192
193 // Try to extract the first prompt from the transcript
194 let prompt = match extract_first_prompt(transcript_path) {
195 Some(p) => p,
196 None => return Ok(false),
197 };
198
199 // Create a UserPromptSubmit event with the prompt
200 let event = EventBuilder::new(&self.machine_id, EventType::UserPromptSubmit, session_id)
201 .source("transcript")
202 .framework("claude")
203 .payload(serde_json::json!({"prompt": prompt}).to_string())
204 .build();
205
206 // Insert the event (this will update first_user_message via session update logic)
207 self.storage.insert(&event)?;
208
209 Ok(true)
210 }
211}