mi6_core/input/transcript/
scanner.rs

1//! Transcript file scanner with storage integration.
2//!
3//! Provides high-level scanning functionality that orchestrates parsing,
4//! deduplication, and storage operations.
5
6use std::path::Path;
7
8use crate::model::error::ScanError;
9use crate::model::{EventBuilder, EventType, Storage};
10
11use super::{TranscriptParser, extract_first_prompt};
12
13/// Result of scanning a transcript file.
14#[derive(Debug, Clone, Default)]
15pub struct ScanResult {
16    /// Number of events inserted
17    pub inserted: usize,
18    /// Number of lines that failed to parse
19    pub parse_errors: u64,
20}
21
22/// Orchestrates transcript file scanning with storage integration.
23///
24/// `TranscriptScanner` provides a high-level API for scanning Claude Code
25/// transcript files and inserting events into storage. It handles:
26/// - Incremental parsing (only reads new content since last scan)
27/// - Deduplication by UUID
28/// - Position tracking for future scans
29///
30/// # Example
31///
32/// ```ignore
33/// use mi6_core::{Storage, TranscriptScanner};
34/// use mi6_storage_sqlite::SqliteStorage;
35/// use std::path::Path;
36///
37/// let storage = SqliteStorage::open(Path::new("mi6.db"))?;
38/// let scanner = TranscriptScanner::new(&storage, "my-machine-id");
39///
40/// let result = scanner.scan_file(Path::new("/path/to/transcript.jsonl"))?;
41/// println!("Inserted {} events", result.inserted);
42/// ```
43pub struct TranscriptScanner<'a, S: Storage> {
44    storage: &'a S,
45    machine_id: String,
46}
47
48impl<'a, S: Storage> TranscriptScanner<'a, S> {
49    /// Create a new scanner with the given storage and machine ID.
50    ///
51    /// The machine ID is used to tag events with their source machine,
52    /// enabling multi-machine aggregation.
53    pub fn new(storage: &'a S, machine_id: impl Into<String>) -> Self {
54        Self {
55            storage,
56            machine_id: machine_id.into(),
57        }
58    }
59
60    /// Scan a transcript file and insert events into storage.
61    ///
62    /// This method:
63    /// 1. Loads the last scanned position for the file
64    /// 2. Parses new entries since that position
65    /// 3. Inserts new events (checking for duplicates by UUID)
66    /// 4. Updates the position for future incremental scans
67    ///
68    /// # Returns
69    ///
70    /// Returns a [`ScanResult`] with the count of inserted events and
71    /// parse errors encountered.
72    ///
73    /// # Errors
74    ///
75    /// Returns a [`ScanError`] if:
76    /// - The file cannot be read
77    /// - Storage operations fail
78    pub fn scan_file(&self, path: &Path) -> Result<ScanResult, ScanError> {
79        // Get last scanned position (or start from beginning)
80        let start_position = self
81            .storage
82            .get_transcript_position(path)?
83            .unwrap_or_default();
84
85        // Parse the transcript file
86        let parser = TranscriptParser::new(&self.machine_id);
87        let result = parser.parse_incremental(path, &start_position)?;
88
89        let parse_errors = result.parse_errors;
90
91        // Insert new events, skipping duplicates
92        let mut inserted = 0;
93        for event in result.events {
94            // Check for duplicate by UUID in metadata
95            if let Some(metadata) = &event.metadata
96                && let Ok(meta_json) = serde_json::from_str::<serde_json::Value>(metadata)
97                && let Some(uuid) = meta_json.get("uuid").and_then(|v| v.as_str())
98            {
99                // Skip if already exists
100                if self.storage.event_exists_by_uuid(&event.session_id, uuid)? {
101                    continue;
102                }
103            }
104
105            self.storage.insert(&event)?;
106            inserted += 1;
107        }
108
109        // Update position for next scan
110        self.storage
111            .set_transcript_position(path, &result.position)?;
112
113        Ok(ScanResult {
114            inserted,
115            parse_errors,
116        })
117    }
118
119    /// Scan transcripts for specific sessions by ID.
120    ///
121    /// Looks up each session, checks if it has a `transcript_path` that exists,
122    /// and scans it. Sessions without transcript paths or with missing files
123    /// are silently skipped.
124    ///
125    /// # Arguments
126    ///
127    /// * `session_ids` - Slice of session IDs to scan
128    ///
129    /// # Returns
130    ///
131    /// Returns an aggregated [`ScanResult`] with the total count of inserted
132    /// events and parse errors across all scanned files.
133    ///
134    /// # Errors
135    ///
136    /// Returns a [`ScanError`] if a storage operation fails or a transcript
137    /// file cannot be parsed.
138    pub fn scan_sessions(&self, session_ids: &[&str]) -> Result<ScanResult, ScanError> {
139        let mut total = ScanResult::default();
140        for session_id in session_ids {
141            if let Some(session) = self.storage.get_session(session_id)?
142                && let Some(ref transcript_path) = session.transcript_path
143            {
144                let path = Path::new(transcript_path);
145                if path.exists() {
146                    let result = self.scan_file(path)?;
147                    total.inserted += result.inserted;
148                    total.parse_errors += result.parse_errors;
149                }
150            }
151        }
152        Ok(total)
153    }
154
155    /// Backfill the initial prompt for a session from its transcript file.
156    ///
157    /// This is a fallback for when the UserPromptSubmit hook doesn't fire,
158    /// such as when Claude is started with an inline prompt like
159    /// `claude 'initial prompt'`.
160    ///
161    /// The method:
162    /// 1. Checks if the session exists and has no first_user_message
163    /// 2. Reads the transcript file to find the first user prompt
164    /// 3. Creates a UserPromptSubmit event with the prompt
165    ///
166    /// This is idempotent - if the session already has a first_user_message,
167    /// this method does nothing.
168    ///
169    /// # Arguments
170    ///
171    /// * `session_id` - The session ID to backfill
172    /// * `transcript_path` - Path to the transcript file
173    ///
174    /// # Returns
175    ///
176    /// Returns `true` if a prompt was backfilled, `false` otherwise.
177    pub fn backfill_initial_prompt(
178        &self,
179        session_id: &str,
180        transcript_path: &Path,
181    ) -> Result<bool, ScanError> {
182        // Check if session exists and needs backfill
183        let session = match self.storage.get_session(session_id)? {
184            Some(s) => s,
185            None => return Ok(false),
186        };
187
188        // If session already has a first_user_message, nothing to do
189        if session.first_user_message.is_some() {
190            return Ok(false);
191        }
192
193        // Try to extract the first prompt from the transcript
194        let prompt = match extract_first_prompt(transcript_path) {
195            Some(p) => p,
196            None => return Ok(false),
197        };
198
199        // Create a UserPromptSubmit event with the prompt
200        let event = EventBuilder::new(&self.machine_id, EventType::UserPromptSubmit, session_id)
201            .source("transcript")
202            .framework("claude")
203            .payload(serde_json::json!({"prompt": prompt}).to_string())
204            .build();
205
206        // Insert the event (this will update first_user_message via session update logic)
207        self.storage.insert(&event)?;
208
209        Ok(true)
210    }
211}