Skip to main content

ralph_workflow/json_parser/opencode/
parser.rs

1// OpenCode parser implementation.
2//
3// Contains the OpenCodeParser struct and its core methods.
4
5/// `OpenCode` event parser
6pub struct OpenCodeParser {
7    colors: Colors,
8    verbosity: Verbosity,
9    /// Relative path to log file (if logging enabled)
10    log_path: Option<std::path::PathBuf>,
11    display_name: String,
12    /// Unified streaming session for state tracking
13    streaming_session: Rc<RefCell<StreamingSession>>,
14    /// Terminal mode for output formatting
15    terminal_mode: RefCell<TerminalMode>,
16    /// Track last rendered content for append-only streaming.
17    last_rendered_content: RefCell<std::collections::HashMap<String, String>>,
18    /// Whether to show streaming quality metrics
19    show_streaming_metrics: bool,
20    /// Output printer for capturing or displaying output
21    printer: SharedPrinter,
22    /// Counter for step IDs when events lack stable identifiers
23    fallback_step_counter: Cell<u64>,
24}
25
26impl OpenCodeParser {
27    pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
28        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
29    }
30
31    /// Create a new `OpenCodeParser` with a custom printer.
32    ///
33    /// # Arguments
34    ///
35    /// * `colors` - Colors for terminal output
36    /// * `verbosity` - Verbosity level for output
37    /// * `printer` - Shared printer for output
38    ///
39    /// # Returns
40    ///
41    /// A new `OpenCodeParser` instance
42    pub(crate) fn with_printer(
43        colors: Colors,
44        verbosity: Verbosity,
45        printer: SharedPrinter,
46    ) -> Self {
47        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
48        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
49
50        // Use the printer's is_terminal method to validate it's connected correctly
51        let _printer_is_terminal = printer.borrow().is_terminal();
52
53        Self {
54            colors,
55            verbosity,
56            log_path: None,
57            display_name: "OpenCode".to_string(),
58            streaming_session: Rc::new(RefCell::new(streaming_session)),
59            terminal_mode: RefCell::new(TerminalMode::detect()),
60            last_rendered_content: RefCell::new(std::collections::HashMap::new()),
61            show_streaming_metrics: false,
62            printer,
63            fallback_step_counter: Cell::new(0),
64        }
65    }
66
67    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
68        self.show_streaming_metrics = show;
69        self
70    }
71
72    pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
73        self.display_name = display_name.to_string();
74        self
75    }
76
77    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
78        self.log_path = Some(std::path::PathBuf::from(path));
79        self
80    }
81
82    #[cfg(any(test, feature = "test-utils"))]
83    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
84        *self.terminal_mode.borrow_mut() = mode;
85        self
86    }
87
88    /// Create a new parser with a test printer.
89    ///
90    /// This is the primary entry point for integration tests that need
91    /// to capture parser output for verification.
92    ///
93    /// Defaults to `TerminalMode::Full` for testing streaming behavior.
94    /// Integration tests that verify streaming output need Full mode to
95    /// see per-delta rendering (non-TTY modes suppress deltas and flush at completion).
96    #[cfg(feature = "test-utils")]
97    pub fn with_printer_for_test(
98        colors: Colors,
99        verbosity: Verbosity,
100        printer: SharedPrinter,
101    ) -> Self {
102        Self::with_printer(colors, verbosity, printer).with_terminal_mode(TerminalMode::Full)
103    }
104
105    /// Set the log file path for testing.
106    ///
107    /// This allows tests to verify log file content after parsing.
108    #[cfg(feature = "test-utils")]
109    pub fn with_log_file_for_test(mut self, path: &str) -> Self {
110        self.log_path = Some(std::path::PathBuf::from(path));
111        self
112    }
113
114    /// Parse a stream for testing purposes.
115    ///
116    /// This exposes the internal `parse_stream` method for integration tests.
117    #[cfg(feature = "test-utils")]
118    pub fn parse_stream_for_test<R: std::io::BufRead>(
119        &self,
120        reader: R,
121        workspace: &dyn crate::workspace::Workspace,
122    ) -> std::io::Result<()> {
123        self.parse_stream(reader, workspace)
124    }
125
126    /// Get a shared reference to the printer.
127    ///
128    /// This allows tests, monitoring, and other code to access the printer after parsing
129    /// to verify output content, check for duplicates, or capture output for analysis.
130    /// Only available with the `test-utils` feature.
131    ///
132    /// # Returns
133    ///
134    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
135    #[cfg(feature = "test-utils")]
136    pub fn printer(&self) -> SharedPrinter {
137        Rc::clone(&self.printer)
138    }
139
140    /// Get streaming quality metrics from the current session.
141    ///
142    /// This provides insight into the deduplication and streaming quality of the
143    /// parsing session. Only available with the `test-utils` feature.
144    ///
145    /// # Returns
146    ///
147    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
148    #[cfg(feature = "test-utils")]
149    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
150        self.streaming_session
151            .borrow()
152            .get_streaming_quality_metrics()
153    }
154
155    /// Parse and display a single `OpenCode` JSON event
156    ///
157    /// From OpenCode source (`run.ts` lines 146-201), the NDJSON format uses events with:
158    /// - `step_start`: Step initialization with snapshot info
159    /// - `step_finish`: Step completion with reason, cost, tokens
160    /// - `tool_use`: Tool invocation with tool name, callID, and state (status, input, output)
161    /// - `text`: Streaming text content
162    /// - `error`: Session/API error events
163    pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
164        let event: OpenCodeEvent = if let Ok(e) = serde_json::from_str(line) {
165            e
166        } else {
167            let trimmed = line.trim();
168            if !trimmed.is_empty() && !trimmed.starts_with('{') {
169                return Some(format!("{trimmed}\n"));
170            }
171            return None;
172        };
173        let c = &self.colors;
174        let prefix = &self.display_name;
175
176        let output = match event.event_type.as_str() {
177            "step_start" => self.format_step_start_event(&event),
178            "step_finish" => self.format_step_finish_event(&event),
179            "tool_use" => self.format_tool_use_event(&event),
180            "text" => self.format_text_event(&event),
181            "error" => self.format_error_event(&event, line),
182            _ => {
183                // Unknown event type - use the generic formatter in verbose mode
184                format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
185            }
186        };
187
188        if output.is_empty() {
189            None
190        } else {
191            Some(output)
192        }
193    }
194
195    fn next_fallback_step_id(&self, session: &str, timestamp: Option<u64>) -> String {
196        let counter = self.fallback_step_counter.get().saturating_add(1);
197        self.fallback_step_counter.set(counter);
198        match timestamp {
199            Some(ts) => format!("{session}:{ts}:{counter}"),
200            None => format!("{session}:fallback:{counter}"),
201        }
202    }
203
204    /// Check if an `OpenCode` event is a control event (state management with no user output)
205    ///
206    /// Control events are valid JSON that represent state transitions rather than
207    /// user-facing content. They should be tracked separately from "ignored" events
208    /// to avoid false health warnings.
209    fn is_control_event(event: &OpenCodeEvent) -> bool {
210        match event.event_type.as_str() {
211            // Step lifecycle events are control events
212            "step_start" | "step_finish" => true,
213            _ => false,
214        }
215    }
216
217    /// Check if an `OpenCode` event is a partial/delta event (streaming content displayed incrementally)
218    ///
219    /// Partial events represent streaming text deltas that are shown to the user
220    /// in real-time. These should be tracked separately to avoid inflating "ignored" percentages.
221    fn is_partial_event(event: &OpenCodeEvent) -> bool {
222        match event.event_type.as_str() {
223            // Text events produce streaming content
224            "text" => true,
225            _ => false,
226        }
227    }
228
229    /// Parse a stream of `OpenCode` NDJSON events
230    pub(crate) fn parse_stream<R: BufRead>(
231        &self,
232        mut reader: R,
233        workspace: &dyn crate::workspace::Workspace,
234    ) -> io::Result<()> {
235        use super::incremental_parser::IncrementalNdjsonParser;
236
237        let c = &self.colors;
238        let monitor = HealthMonitor::new("OpenCode");
239        // Accumulate log content in memory, write to workspace at the end
240        let logging_enabled = self.log_path.is_some();
241        let mut log_buffer: Vec<u8> = Vec::new();
242
243        // Use incremental parser for true real-time streaming
244        // This processes JSON as soon as it's complete, not waiting for newlines
245        let mut incremental_parser = IncrementalNdjsonParser::new();
246        let mut byte_buffer = Vec::new();
247
248        loop {
249            // Read available bytes
250            byte_buffer.clear();
251            let chunk = reader.fill_buf()?;
252            if chunk.is_empty() {
253                break;
254            }
255
256            // Process all bytes immediately
257            byte_buffer.extend_from_slice(chunk);
258            let consumed = chunk.len();
259            reader.consume(consumed);
260
261            // Feed bytes to incremental parser
262            let json_events = incremental_parser.feed(&byte_buffer);
263
264            // Process each complete JSON event immediately
265            for line in json_events {
266                let trimmed = line.trim();
267                if trimmed.is_empty() {
268                    continue;
269                }
270
271                if self.verbosity.is_debug() {
272                    let mut printer = self.printer.borrow_mut();
273                    writeln!(
274                        printer,
275                        "{}[DEBUG]{} {}{}{}",
276                        c.dim(),
277                        c.reset(),
278                        c.dim(),
279                        &line,
280                        c.reset()
281                    )?;
282                    printer.flush()?;
283                }
284
285                // Parse the event once - parse_event handles malformed JSON by returning None
286                match self.parse_event(&line) {
287                    Some(output) => {
288                        // Check if this is a partial/delta event (streaming content)
289                        if trimmed.starts_with('{') {
290                            if let Ok(event) = serde_json::from_str::<OpenCodeEvent>(&line) {
291                                if Self::is_partial_event(&event) {
292                                    monitor.record_partial_event();
293                                } else {
294                                    monitor.record_parsed();
295                                }
296                            } else {
297                                monitor.record_parsed();
298                            }
299                        } else {
300                            monitor.record_parsed();
301                        }
302                        // Write output to printer
303                        let mut printer = self.printer.borrow_mut();
304                        write!(printer, "{output}")?;
305                        printer.flush()?;
306                    }
307                    None => {
308                        // Check if this was a control event (state management with no user output)
309                        if trimmed.starts_with('{') {
310                            if let Ok(event) = serde_json::from_str::<OpenCodeEvent>(&line) {
311                                if Self::is_control_event(&event) {
312                                    monitor.record_control_event();
313                                } else {
314                                    // Valid JSON but not a control event - track as unknown
315                                    monitor.record_unknown_event();
316                                }
317                            } else {
318                                // Failed to deserialize - track as parse error
319                                monitor.record_parse_error();
320                            }
321                        } else {
322                            monitor.record_ignored();
323                        }
324                    }
325                }
326
327                if logging_enabled {
328                    writeln!(log_buffer, "{line}")?;
329                }
330            }
331        }
332
333        // Handle any remaining buffered data when the stream ends.
334        // Only process if it's valid JSON - incomplete buffered data should be skipped.
335        if let Some(remaining) = incremental_parser.finish() {
336            let trimmed = remaining.trim();
337            if !trimmed.is_empty()
338                && trimmed.starts_with('{')
339                && serde_json::from_str::<OpenCodeEvent>(&remaining).is_ok()
340            {
341                // Process the remaining event
342                if let Some(output) = self.parse_event(&remaining) {
343                    monitor.record_parsed();
344                    let mut printer = self.printer.borrow_mut();
345                    write!(printer, "{output}")?;
346                    printer.flush()?;
347                }
348                // Write to log buffer
349                if logging_enabled {
350                    writeln!(log_buffer, "{remaining}")?;
351                }
352            }
353        }
354
355        // Write accumulated log content to workspace
356        if let Some(log_path) = &self.log_path {
357            workspace.append_bytes(log_path, &log_buffer)?;
358        }
359
360        // OpenCode models may emit XML directly in text output (without using tools to write
361        // `.agent/tmp/*.xml`). Capture known XML artifacts from the accumulated text stream and
362        // write them to standard artifact paths so phase extractors can validate them via
363        // file-based extraction.
364        //
365        // SECURITY: Bound the amount of accumulated text we scan and the size of the extracted
366        // XML we write. This prevents pathological model output from causing unbounded memory/IO.
367        const MAX_XML_SEARCH_BYTES: usize = 512 * 1024;
368        const MAX_XML_BYTES: usize = 128 * 1024;
369        if let Some(accumulated) = self
370            .streaming_session
371            .borrow()
372            .get_accumulated(ContentType::Text, "main")
373        {
374            let accumulated_tail = if accumulated.len() > MAX_XML_SEARCH_BYTES {
375                let mut start = accumulated.len() - MAX_XML_SEARCH_BYTES;
376                while start < accumulated.len() && !accumulated.is_char_boundary(start) {
377                    start += 1;
378                }
379                &accumulated[start..]
380            } else {
381                accumulated
382            };
383
384            if let Some(xml) = crate::files::llm_output_extraction::xml_extraction::extract_xml_commit(
385                accumulated_tail,
386            ) {
387                if xml.len() <= MAX_XML_BYTES {
388                    workspace.create_dir_all(Path::new(".agent/tmp"))?;
389                    workspace.write(
390                        Path::new(crate::files::llm_output_extraction::file_based_extraction::paths::COMMIT_MESSAGE_XML),
391                        &xml,
392                    )?;
393                }
394            }
395
396            if let Some(xml) = crate::files::llm_output_extraction::extract_issues_xml(
397                accumulated_tail,
398            ) {
399                if xml.len() <= MAX_XML_BYTES {
400                    workspace.create_dir_all(Path::new(".agent/tmp"))?;
401                    workspace.write(
402                        Path::new(crate::files::llm_output_extraction::file_based_extraction::paths::ISSUES_XML),
403                        &xml,
404                    )?;
405                }
406            }
407        }
408        if let Some(warning) = monitor.check_and_warn(*c) {
409            let mut printer = self.printer.borrow_mut();
410            writeln!(printer, "{warning}")?;
411        }
412        Ok(())
413    }
414}