ralph-workflow 0.3.6

PROMPT-driven multi-agent orchestrator for git repos
# JSON Streaming Support Implementation Plan

## Summary

This plan addresses the goal of improving JSON streaming support in Ralph (wt-streaming) to handle delta/partial content from AI coding agents more effectively. The current implementation ignores many delta events being streamed, resulting in health warnings like "Claude parser ignored 97.5% of events (2049 of 2102)." This plan aims to make the JSON parsers fault-tolerant, properly handle delta streaming for all agent types, and provide user-friendly display of partial vs. complete content.

## Current State Analysis

### Existing Delta Handling

**Claude Parser** (`src/json_parser/claude.rs`):
- ✅ Has delta accumulation via `DeltaAccumulator`
- ✅ Handles `StreamEvent` with nested `StreamInnerEvent`
- ✅ Supports `TextDelta`, `ThinkingDelta`, and `ToolUseDelta`
- ⚠️ Only shows deltas in verbose mode (`is_verbose()`)
- ❌ Health warnings indicate many events are still being ignored

**Codex Parser** (`src/json_parser/codex.rs`):
- ✅ Has delta accumulator but limited usage
- ✅ Handles `agent_message` and `reasoning` item types with deltas
- ❌ Incomplete delta handling for many event types
- ❌ No systematic approach to delta vs. complete content differentiation

**OpenCode Parser** (`src/json_parser/opencode.rs`):
- ✅ Has delta accumulator
- ✅ Handles `text` events with delta accumulation
- ❌ Missing delta handling for tool input streaming and other event types

**Health Monitoring** (`src/json_parser/health.rs`):
- ✅ Good distinction between "unknown events" (valid JSON but unhandled) and "parse errors" (malformed JSON)
- ✅ Only parse errors trigger health warnings (not unknown events)
- ⚠️ The 97.5% ignored events warning suggests many valid events aren't being handled

### Key Issues Identified

1. **Inconsistent Delta Handling**: Each parser handles deltas differently, with some parsers missing key delta event types
2. **Verbosity-Dependent Display**: Deltas are only shown in verbose mode, making partial content invisible to normal users
3. **No Clear Differentiation**: No visual or categorical distinction between partial (delta) and complete content
4. **Missing Delta Types**: Tool input streaming, partial JSON, and other delta types are not consistently handled
5. **Parser Brittleness**: Parsers may fail on unexpected JSON formats instead of gracefully degrading

## Implementation Steps

### Step 1: Enhance Delta Accumulation System

**File**: `src/json_parser/types.rs`

**Changes**:
1. Add `is_delta` flag to `ContentType` to track whether content is partial or complete
2. Add metadata tracking to `DeltaAccumulator` to track:
   - Whether content is being accumulated (partial) or complete
   - Event source (delta event vs. complete event)
   - Timestamp of last update for real-time display
3. Add methods to differentiate partial vs. complete content:
   ```rust
   impl DeltaAccumulator {
       pub fn mark_as_delta(&mut self, content_type: ContentType, key: &str);
       pub fn mark_as_complete(&mut self, content_type: ContentType, key: &str);
       pub fn is_delta(&self, content_type: ContentType, key: &str) -> bool;
       pub fn get_display_content(&self, content_type: ContentType, key: &str) -> Option<&str>;
   }
   ```

### Step 2: Create Unified Delta Display System

**File**: `src/json_parser/delta_display.rs` (new file)

**Purpose**: Centralized logic for displaying partial vs. complete content consistently across all parsers

**Components**:
1. `DeltaDisplayFormatter` - Formats delta content for user display
2. `PartialContentRenderer` - Renders partial content with visual indicators
3. `ContentStateTracker` - Tracks whether to show delta or complete content

**Features**:
- Visual distinction between partial and complete content (e.g., dimmed text, special prefix)
- Real-time streaming display of deltas
- Automatic transition from delta to complete content when available
- Configurable display modes (minimal, normal, verbose)

### Step 3: Enhance Claude Parser Delta Handling

**File**: `src/json_parser/claude.rs`

**Changes**:
1. Process ALL `ContentBlockDelta` types (currently only `TextDelta` and `ThinkingDelta`)
   - Add `ToolUseDelta` handling for partial tool input streaming
   - Add `InputJsonDelta` handling for partial JSON streaming
2. Show deltas in normal mode (not just verbose)
   - Use partial content renderer for deltas
   - Transition to complete content when `content_block_stop` received
3. Track delta state per content block index
4. Handle unknown stream events gracefully (log in debug, don't fail)

### Step 4: Enhance Codex Parser Delta Handling

**File**: `src/json_parser/codex.rs`

**Changes**:
1. Enable delta handling for ALL item types that support it:
   - `agent_message` - text deltas
   - `reasoning` - thinking deltas
   - `command_execution` - partial command display
   - `mcp_tool_call` - partial tool input streaming
   - `plan_update` - incremental plan updates
2. Add completion tracking for delta-based content
3. Differentiate between `item.started` (delta phase) and `item.completed` (complete phase)
4. Use unified delta display system

### Step 5: Enhance OpenCode Parser Delta Handling

**File**: `src/json_parser/opencode.rs`

**Changes**:
1. Process delta events for all streaming content types:
   - `text` events - already handled, enhance with unified display
   - `tool_use` events - add partial input streaming
   - Add handling for partial JSON in tool input
2. Track step lifecycle (`step_start` → deltas → `step_finish`)
3. Show partial tool input in real-time during execution

### Step 6: Implement Algorithmic Delta Detection

**File**: `src/json_parser/stream_classifier.rs` (enhance existing file)

**Purpose**: Automatically detect delta vs. complete content without prior format knowledge

**Algorithm**:
1. **Event Pattern Analysis**:
   - Detect event naming patterns: `delta`, `partial`, `progress`, `chunk`
   - Detect event lifecycle patterns: `start``update`/`delta``stop`/`done`/`complete`
2. **Content Analysis**:
   - Check for incremental content (small chunks being appended)
   - Detect partial JSON (incomplete structures, missing closing braces)
   - Track content accumulation over time
3. **Statistical Detection**:
   - Monitor event frequency (deltas come in rapid succession)
   - Track content growth patterns (deltas gradually build up content)

**Implementation**:
```rust
pub enum StreamEventType {
    Complete,
    PartialDelta,
    Unknown,
}

pub struct StreamEventClassifier {
    event_history: VecDeque<(String, StreamEventType)>,
    content_buffers: HashMap<String, String>,
}

impl StreamEventClassifier {
    pub fn classify_event(&mut self, event_type: &str, content: Option<&str>) -> StreamEventType;
    pub fn is_accumulating_content(&self, key: &str) -> bool;
}
```

### Step 7: Add Fault-Tolerant Fallback Parsing

**File**: `src/json_parser/fallback.rs` (new file)

**Purpose**: Gracefully handle unexpected JSON formats without losing information

**Strategy**:
1. **Generic JSON Parser**:
   - Parse any valid JSON as a generic `serde_json::Value`
   - Extract common fields (`type`, `event`, `delta`, `content`, `text`)
   - Display in structured format in verbose/debug modes
2. **Unknown Event Handler**:
   - Never ignore unknown events in verbose mode
   - Log event structure for debugging
   - Track unknown event patterns for future parser improvements
3. **Error Recovery**:
   - Continue parsing after malformed JSON (log error, skip line)
   - Use accumulated state to recover partial content
   - Provide fallback display for corrupted streams

### Step 8: Update Health Monitoring

**File**: `src/json_parser/health.rs`

**Changes**:
1. Separate delta events from ignored events in statistics
   - `delta_events` counter
   - `complete_events` counter
   - `unknown_events` counter (already exists)
2. Update warning logic to only warn on parse errors, not unknown/delta events
3. Add delta-specific statistics to warnings:
   ```
   "Parser processed 1500 delta events (streaming content) and 53 complete events.
    Encountered 50 unknown event types (valid JSON but unhandled format)."
   ```

### Step 9: Add Configuration Options

**File**: `src/config/types.rs` and `src/agents/config.rs`

**New Configuration**:
```toml
[json_streaming]
# How to display partial/delta content
delta_display = "minimal"  # options: "minimal", "normal", "verbose"

# Show partial content in real-time (vs. waiting for complete content)
show_delta_in_realtime = true

# Distinguish partial content visually (e.g., dimmed text, special prefix)
mark_partial_content = true

# In verbose mode, show all unknown events
show_unknown_events = true
```

### Step 10: Comprehensive Testing

**Files**: `src/json_parser/tests.rs` (update existing)

**Test Coverage**:
1. **Delta Accumulation Tests**:
   - Text delta accumulation and display
   - Thinking delta accumulation
   - Tool input delta streaming
   - Multiple simultaneous content blocks
2. **Fault Tolerance Tests**:
   - Malformed JSON handling
   - Unknown event type handling
   - Mixed delta and complete content
3. **Algorithmic Detection Tests**:
   - Pattern-based delta detection
   - Lifecycle-based detection
   - Statistical pattern detection
4. **Integration Tests**:
   - Full Claude streaming scenarios
   - Full Codex streaming scenarios
   - Full OpenCode streaming scenarios
   - Mixed agent scenarios
5. **Health Monitoring Tests**:
   - Verify no false warnings on high delta counts
   - Verify parse errors still trigger warnings
   - Verify unknown events don't trigger warnings

## Critical Files

### Files to Modify:
1. **`src/json_parser/types.rs`** - Enhance `DeltaAccumulator` and `ContentType`
2. **`src/json_parser/claude.rs`** - Improve delta handling for all event types
3. **`src/json_parser/codex.rs`** - Add comprehensive delta support
4. **`src/json_parser/opencode.rs`** - Enhance delta handling
5. **`src/json_parser/health.rs`** - Update health monitoring logic
6. **`src/json_parser/stream_classifier.rs`** - Implement algorithmic detection
7. **`src/config/types.rs`** - Add streaming configuration options
8. **`src/agents/config.rs`** - Add streaming config to agent config

### Files to Create:
1. **`src/json_parser/delta_display.rs`** - Unified delta display system
2. **`src/json_parser/fallback.rs`** - Fault-tolerant fallback parser

### Files to Update Tests:
1. **`src/json_parser/tests.rs`** - Add comprehensive test coverage

## Risks & Mitigations

### Risk 1: Performance Impact from Enhanced Delta Handling
**Mitigation**:
- Use efficient string accumulation (avoid excessive cloning)
- Implement display caching for repeated content
- Profile before/after to measure impact

### Risk 2: Breaking Changes to Existing Agent Behavior
**Mitigation**:
- Make new behavior opt-in via configuration
- Maintain backward compatibility with existing display modes
- Add feature flags for gradual rollout

### Risk 3: Increased Complexity Leading to Bugs
**Mitigation**:
- Comprehensive test coverage (unit + integration)
- Incremental implementation with testing at each step
- Code reviews focusing on error handling paths

### Risk 4: Algorithmic Detection False Positives
**Mitigation**:
- Use multiple detection methods in combination
- Allow manual override via configuration
- Log detection decisions for debugging
- Start conservative, expand based on real-world data

### Risk 5: Memory Usage from Content Accumulation
**Mitigation**:
- Implement buffer size limits
- Clear accumulators when content is complete
- Use bounded queues for event history
- Monitor memory usage in testing

## Verification Strategy

### Acceptance Criteria Validation:

1. **"All expected and unexpected json should be handled"**:
   - ✅ Test with valid but unknown event types - should not fail
   - ✅ Test with malformed JSON - should log and continue
   - ✅ Test with mixed delta and complete events - should handle both
   - ✅ Verify no "Parser Health Warning" for high delta percentages

2. **"Differentiate between normal streaming data and delta partial data"**:
   - ✅ Visual distinction in output (different formatting/prefixes)
   - ✅ Separate tracking in statistics
   - ✅ Algorithmic detection works without prior format knowledge
   - ✅ Real-time streaming display of partial content

3. **"No parser health warnings for high delta percentages"**:
   - ✅ Run with Claude agent - verify no warnings about 97% ignored events
   - ✅ Run with Codex agent - verify delta events are tracked separately
   - ✅ Run with OpenCode agent - verify all delta types handled

### Testing Checklist:

- [ ] Unit tests for `DeltaAccumulator` enhancements
- [ ] Unit tests for `DeltaDisplayFormatter`
- [ ] Unit tests for `StreamEventClassifier` detection
- [ ] Integration tests for Claude parser with all delta types
- [ ] Integration tests for Codex parser with all delta types
- [ ] Integration tests for OpenCode parser with all delta types
- [ ] Integration tests for fault tolerance (malformed JSON, unknown events)
- [ ] Performance benchmarks (memory, speed)
- [ ] Real-world testing with actual agent outputs
- [ ] Regression tests to ensure existing functionality works

### Manual Testing Scenarios:

1. **Claude Agent**:
   - Run with text generation only
   - Run with tool use (verify partial tool input displayed)
   - Run with extended thinking (verify thinking deltas displayed)
   - Run with web search tool (verify search streaming displayed)

2. **Codex Agent**:
   - Run with agent message streaming
   - Run with reasoning/thinking content
   - Run with tool execution (command, MCP tools)
   - Run with file operations

3. **OpenCode Agent**:
   - Run with text streaming
   - Run with tool use streaming
   - Run with step-by-step execution

4. **Edge Cases**:
   - Mixed delta and complete events
   - Malformed JSON in middle of stream
   - Unknown event types in verbose mode
   - Very long delta sequences (1000+ events)
   - Simultaneous multiple content blocks

## Sources

- [Claude Streaming Messages Documentation]https://platform.claude.com/docs/en/build-with-claude/streaming
- [OpenAI Responses API Streaming Events Guide]https://community.openai.com/t/responses-api-streaming-the-simple-guide-to-events/1363122
- [Claude Code Best Practices]https://www.anthropic.com/engineering/claude-code-best-practices
- [OpenAI Codex CLI Issues]https://github.com/openai/codex/issues/2556
- [Response Streaming Architecture]https://zread.ai/openai/codex/25-response-streaming-architecture

## Notes

- Implementation should follow the code quality specifications from PROMPT.md
- Use type system to make invalid states unrepresentable
- Early returns, minimal nesting, explicit error handling
- Single responsibility per function/class
- Document non-obvious design decisions
- Consider API ergonomics for future extensibility