ralph_workflow/json_parser/opencode/parser.rs
1// OpenCode parser implementation.
2//
3// Contains the OpenCodeParser struct and its core methods.
4
5/// `OpenCode` event parser
6pub struct OpenCodeParser {
7 colors: Colors,
8 verbosity: Verbosity,
9 /// Relative path to log file (if logging enabled)
10 log_path: Option<std::path::PathBuf>,
11 display_name: String,
12 /// Unified streaming session for state tracking
13 streaming_session: Rc<RefCell<StreamingSession>>,
14 /// Terminal mode for output formatting
15 terminal_mode: RefCell<TerminalMode>,
16 /// Track last rendered content for append-only streaming.
17 last_rendered_content: RefCell<std::collections::HashMap<String, String>>,
18 /// Whether to show streaming quality metrics
19 show_streaming_metrics: bool,
20 /// Output printer for capturing or displaying output
21 printer: SharedPrinter,
22 /// Counter for step IDs when events lack stable identifiers
23 fallback_step_counter: Cell<u64>,
24}
25
26impl OpenCodeParser {
27 pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
28 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
29 }
30
31 /// Create a new `OpenCodeParser` with a custom printer.
32 ///
33 /// # Arguments
34 ///
35 /// * `colors` - Colors for terminal output
36 /// * `verbosity` - Verbosity level for output
37 /// * `printer` - Shared printer for output
38 ///
39 /// # Returns
40 ///
41 /// A new `OpenCodeParser` instance
42 pub(crate) fn with_printer(
43 colors: Colors,
44 verbosity: Verbosity,
45 printer: SharedPrinter,
46 ) -> Self {
47 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
48 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
49
50 // Use the printer's is_terminal method to validate it's connected correctly
51 let _printer_is_terminal = printer.borrow().is_terminal();
52
53 Self {
54 colors,
55 verbosity,
56 log_path: None,
57 display_name: "OpenCode".to_string(),
58 streaming_session: Rc::new(RefCell::new(streaming_session)),
59 terminal_mode: RefCell::new(TerminalMode::detect()),
60 last_rendered_content: RefCell::new(std::collections::HashMap::new()),
61 show_streaming_metrics: false,
62 printer,
63 fallback_step_counter: Cell::new(0),
64 }
65 }
66
67 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
68 self.show_streaming_metrics = show;
69 self
70 }
71
72 pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
73 self.display_name = display_name.to_string();
74 self
75 }
76
77 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
78 self.log_path = Some(std::path::PathBuf::from(path));
79 self
80 }
81
82 #[cfg(any(test, feature = "test-utils"))]
83 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
84 *self.terminal_mode.borrow_mut() = mode;
85 self
86 }
87
88 /// Create a new parser with a test printer.
89 ///
90 /// This is the primary entry point for integration tests that need
91 /// to capture parser output for verification.
92 ///
93 /// Defaults to `TerminalMode::Full` for testing streaming behavior.
94 /// Integration tests that verify streaming output need Full mode to
95 /// see per-delta rendering (non-TTY modes suppress deltas and flush at completion).
96 #[cfg(feature = "test-utils")]
97 pub fn with_printer_for_test(
98 colors: Colors,
99 verbosity: Verbosity,
100 printer: SharedPrinter,
101 ) -> Self {
102 Self::with_printer(colors, verbosity, printer).with_terminal_mode(TerminalMode::Full)
103 }
104
105 /// Set the log file path for testing.
106 ///
107 /// This allows tests to verify log file content after parsing.
108 #[cfg(feature = "test-utils")]
109 pub fn with_log_file_for_test(mut self, path: &str) -> Self {
110 self.log_path = Some(std::path::PathBuf::from(path));
111 self
112 }
113
114 /// Parse a stream for testing purposes.
115 ///
116 /// This exposes the internal `parse_stream` method for integration tests.
117 #[cfg(feature = "test-utils")]
118 pub fn parse_stream_for_test<R: std::io::BufRead>(
119 &self,
120 reader: R,
121 workspace: &dyn crate::workspace::Workspace,
122 ) -> std::io::Result<()> {
123 self.parse_stream(reader, workspace)
124 }
125
126 /// Get a shared reference to the printer.
127 ///
128 /// This allows tests, monitoring, and other code to access the printer after parsing
129 /// to verify output content, check for duplicates, or capture output for analysis.
130 /// Only available with the `test-utils` feature.
131 ///
132 /// # Returns
133 ///
134 /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
135 #[cfg(feature = "test-utils")]
136 pub fn printer(&self) -> SharedPrinter {
137 Rc::clone(&self.printer)
138 }
139
140 /// Get streaming quality metrics from the current session.
141 ///
142 /// This provides insight into the deduplication and streaming quality of the
143 /// parsing session. Only available with the `test-utils` feature.
144 ///
145 /// # Returns
146 ///
147 /// A copy of the streaming quality metrics from the internal `StreamingSession`.
148 #[cfg(feature = "test-utils")]
149 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
150 self.streaming_session
151 .borrow()
152 .get_streaming_quality_metrics()
153 }
154
155 /// Parse and display a single `OpenCode` JSON event
156 ///
157 /// From OpenCode source (`run.ts` lines 146-201), the NDJSON format uses events with:
158 /// - `step_start`: Step initialization with snapshot info
159 /// - `step_finish`: Step completion with reason, cost, tokens
160 /// - `tool_use`: Tool invocation with tool name, callID, and state (status, input, output)
161 /// - `text`: Streaming text content
162 /// - `error`: Session/API error events
163 pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
164 let event: OpenCodeEvent = if let Ok(e) = serde_json::from_str(line) {
165 e
166 } else {
167 let trimmed = line.trim();
168 if !trimmed.is_empty() && !trimmed.starts_with('{') {
169 return Some(format!("{trimmed}\n"));
170 }
171 return None;
172 };
173 let c = &self.colors;
174 let prefix = &self.display_name;
175
176 let output = match event.event_type.as_str() {
177 "step_start" => self.format_step_start_event(&event),
178 "step_finish" => self.format_step_finish_event(&event),
179 "tool_use" => self.format_tool_use_event(&event),
180 "text" => self.format_text_event(&event),
181 "error" => self.format_error_event(&event, line),
182 _ => {
183 // Unknown event type - use the generic formatter in verbose mode
184 format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
185 }
186 };
187
188 if output.is_empty() {
189 None
190 } else {
191 Some(output)
192 }
193 }
194
195 fn next_fallback_step_id(&self, session: &str, timestamp: Option<u64>) -> String {
196 let counter = self.fallback_step_counter.get().saturating_add(1);
197 self.fallback_step_counter.set(counter);
198 match timestamp {
199 Some(ts) => format!("{session}:{ts}:{counter}"),
200 None => format!("{session}:fallback:{counter}"),
201 }
202 }
203
204 /// Check if an `OpenCode` event is a control event (state management with no user output)
205 ///
206 /// Control events are valid JSON that represent state transitions rather than
207 /// user-facing content. They should be tracked separately from "ignored" events
208 /// to avoid false health warnings.
209 fn is_control_event(event: &OpenCodeEvent) -> bool {
210 match event.event_type.as_str() {
211 // Step lifecycle events are control events
212 "step_start" | "step_finish" => true,
213 _ => false,
214 }
215 }
216
217 /// Check if an `OpenCode` event is a partial/delta event (streaming content displayed incrementally)
218 ///
219 /// Partial events represent streaming text deltas that are shown to the user
220 /// in real-time. These should be tracked separately to avoid inflating "ignored" percentages.
221 fn is_partial_event(event: &OpenCodeEvent) -> bool {
222 match event.event_type.as_str() {
223 // Text events produce streaming content
224 "text" => true,
225 _ => false,
226 }
227 }
228
229 /// Parse a stream of `OpenCode` NDJSON events
230 pub(crate) fn parse_stream<R: BufRead>(
231 &self,
232 mut reader: R,
233 workspace: &dyn crate::workspace::Workspace,
234 ) -> io::Result<()> {
235 use super::incremental_parser::IncrementalNdjsonParser;
236
237 let c = &self.colors;
238 let monitor = HealthMonitor::new("OpenCode");
239 // Accumulate log content in memory, write to workspace at the end
240 let logging_enabled = self.log_path.is_some();
241 let mut log_buffer: Vec<u8> = Vec::new();
242
243 // Use incremental parser for true real-time streaming
244 // This processes JSON as soon as it's complete, not waiting for newlines
245 let mut incremental_parser = IncrementalNdjsonParser::new();
246 let mut byte_buffer = Vec::new();
247
248 loop {
249 // Read available bytes
250 byte_buffer.clear();
251 let chunk = reader.fill_buf()?;
252 if chunk.is_empty() {
253 break;
254 }
255
256 // Process all bytes immediately
257 byte_buffer.extend_from_slice(chunk);
258 let consumed = chunk.len();
259 reader.consume(consumed);
260
261 // Feed bytes to incremental parser
262 let json_events = incremental_parser.feed(&byte_buffer);
263
264 // Process each complete JSON event immediately
265 for line in json_events {
266 let trimmed = line.trim();
267 if trimmed.is_empty() {
268 continue;
269 }
270
271 if self.verbosity.is_debug() {
272 let mut printer = self.printer.borrow_mut();
273 writeln!(
274 printer,
275 "{}[DEBUG]{} {}{}{}",
276 c.dim(),
277 c.reset(),
278 c.dim(),
279 &line,
280 c.reset()
281 )?;
282 printer.flush()?;
283 }
284
285 // Parse the event once - parse_event handles malformed JSON by returning None
286 match self.parse_event(&line) {
287 Some(output) => {
288 // Check if this is a partial/delta event (streaming content)
289 if trimmed.starts_with('{') {
290 if let Ok(event) = serde_json::from_str::<OpenCodeEvent>(&line) {
291 if Self::is_partial_event(&event) {
292 monitor.record_partial_event();
293 } else {
294 monitor.record_parsed();
295 }
296 } else {
297 monitor.record_parsed();
298 }
299 } else {
300 monitor.record_parsed();
301 }
302 // Write output to printer
303 let mut printer = self.printer.borrow_mut();
304 write!(printer, "{output}")?;
305 printer.flush()?;
306 }
307 None => {
308 // Check if this was a control event (state management with no user output)
309 if trimmed.starts_with('{') {
310 if let Ok(event) = serde_json::from_str::<OpenCodeEvent>(&line) {
311 if Self::is_control_event(&event) {
312 monitor.record_control_event();
313 } else {
314 // Valid JSON but not a control event - track as unknown
315 monitor.record_unknown_event();
316 }
317 } else {
318 // Failed to deserialize - track as parse error
319 monitor.record_parse_error();
320 }
321 } else {
322 monitor.record_ignored();
323 }
324 }
325 }
326
327 if logging_enabled {
328 writeln!(log_buffer, "{line}")?;
329 }
330 }
331 }
332
333 // Handle any remaining buffered data when the stream ends.
334 // Only process if it's valid JSON - incomplete buffered data should be skipped.
335 if let Some(remaining) = incremental_parser.finish() {
336 let trimmed = remaining.trim();
337 if !trimmed.is_empty()
338 && trimmed.starts_with('{')
339 && serde_json::from_str::<OpenCodeEvent>(&remaining).is_ok()
340 {
341 // Process the remaining event
342 if let Some(output) = self.parse_event(&remaining) {
343 monitor.record_parsed();
344 let mut printer = self.printer.borrow_mut();
345 write!(printer, "{output}")?;
346 printer.flush()?;
347 }
348 // Write to log buffer
349 if logging_enabled {
350 writeln!(log_buffer, "{remaining}")?;
351 }
352 }
353 }
354
355 // Write accumulated log content to workspace
356 if let Some(log_path) = &self.log_path {
357 workspace.append_bytes(log_path, &log_buffer)?;
358 }
359
360 // OpenCode models may emit XML directly in text output (without using tools to write
361 // `.agent/tmp/*.xml`). Capture known XML artifacts from the accumulated text stream and
362 // write them to standard artifact paths so phase extractors can validate them via
363 // file-based extraction.
364 //
365 // SECURITY: Bound the amount of accumulated text we scan and the size of the extracted
366 // XML we write. This prevents pathological model output from causing unbounded memory/IO.
367 const MAX_XML_SEARCH_BYTES: usize = 512 * 1024;
368 const MAX_XML_BYTES: usize = 128 * 1024;
369 if let Some(accumulated) = self
370 .streaming_session
371 .borrow()
372 .get_accumulated(ContentType::Text, "main")
373 {
374 let accumulated_tail = if accumulated.len() > MAX_XML_SEARCH_BYTES {
375 let mut start = accumulated.len() - MAX_XML_SEARCH_BYTES;
376 while start < accumulated.len() && !accumulated.is_char_boundary(start) {
377 start += 1;
378 }
379 &accumulated[start..]
380 } else {
381 accumulated
382 };
383
384 if let Some(xml) = crate::files::llm_output_extraction::xml_extraction::extract_xml_commit(
385 accumulated_tail,
386 ) {
387 if xml.len() <= MAX_XML_BYTES {
388 workspace.create_dir_all(Path::new(".agent/tmp"))?;
389 workspace.write(
390 Path::new(crate::files::llm_output_extraction::file_based_extraction::paths::COMMIT_MESSAGE_XML),
391 &xml,
392 )?;
393 }
394 }
395
396 if let Some(xml) = crate::files::llm_output_extraction::extract_issues_xml(
397 accumulated_tail,
398 ) {
399 if xml.len() <= MAX_XML_BYTES {
400 workspace.create_dir_all(Path::new(".agent/tmp"))?;
401 workspace.write(
402 Path::new(crate::files::llm_output_extraction::file_based_extraction::paths::ISSUES_XML),
403 &xml,
404 )?;
405 }
406 }
407 }
408 if let Some(warning) = monitor.check_and_warn(*c) {
409 let mut printer = self.printer.borrow_mut();
410 writeln!(printer, "{warning}")?;
411 }
412 Ok(())
413 }
414}