# Output & Error Stream Storage - Detailed Explanation
This document explains exactly where and how stdout/stderr are stored during command execution in the Execution Engine.
---
## Quick Answer
**Output streams are stored in TWO places simultaneously:**
1. **In-Memory** → `ExecutionState.stdout` and `ExecutionState.stderr` (String fields)
2. **Event Stream** → Emitted line-by-line to `EventHandler` implementations
---
## Detailed Flow
### 1. Data Structure: ExecutionState
**Location**: [src/types.rs:277-287](src/types.rs#L277-L287)
```rust
pub struct ExecutionState {
pub id: Uuid,
pub request: ExecutionRequest,
pub status: ExecutionStatus,
pub started_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
// OUTPUT STORAGE - Two String buffers
pub stdout: String, // ← Accumulates all stdout lines
pub stderr: String, // ← Accumulates all stderr lines
pub exit_code: Option<i32>,
pub error: Option<String>,
}
```
**Initialization**: [src/types.rs:298-299](src/types.rs#L298-L299)
```rust
stdout: String::new(), // Starts empty
stderr: String::new(), // Starts empty
```
**Storage Location in Engine**: [src/engine.rs:26](src/engine.rs#L26)
```rust
pub struct ExecutionEngine {
// ...
executions: Arc<RwLock<HashMap<Uuid, Arc<RwLock<ExecutionState>>>>>,
// ^^^^^^^^^^^^^^^^^^^^
// Each execution has its own state
}
```
---
### 2. Capture Process: Spawning the Command
**Location**: [src/executor.rs:66-79](src/executor.rs#L66-L79)
```rust
// Build and spawn command
let mut cmd = build_command(&request)?;
let mut child = cmd.spawn()?;
// Capture stdout and stderr handles from spawned process
let stdout = child.stdout.take() // ← Take ownership of stdout pipe
.ok_or_else(|| ExecutionError::Internal("Failed to capture stdout"))?;
let stderr = child.stderr.take() // ← Take ownership of stderr pipe
.ok_or_else(|| ExecutionError::Internal("Failed to capture stderr"))?;
```
**Key Point**: These are `tokio::process::ChildStdout` and `ChildStderr` - async streams from the child process.
---
### 3. Parallel Streaming: Two Background Tasks
**Location**: [src/executor.rs:81-92](src/executor.rs#L81-L92)
```rust
// Spawn STDOUT streaming task
let state_clone = Arc::clone(&state);
let event_handler = self.event_handler.clone();
let stdout_handle = tokio::spawn(async move {
Self::stream_stdout_static(execution_id, stdout, state_clone, event_handler).await
});
// Spawn STDERR streaming task (in parallel)
let state_clone = Arc::clone(&state);
let event_handler = self.event_handler.clone();
let stderr_handle = tokio::spawn(async move {
Self::stream_stderr_static(execution_id, stderr, state_clone, event_handler).await
});
```
**Important**: Both streams are processed **in parallel**, not sequentially!
---
### 4. Line-by-Line Accumulation: STDOUT
**Location**: [src/executor.rs:203-232](src/executor.rs#L203-L232)
```rust
async fn stream_stdout_static(
execution_id: uuid::Uuid,
stdout: ChildStdout,
state: Arc<RwLock<ExecutionState>>,
event_handler: Option<Arc<dyn EventHandler>>,
) -> Result<()> {
// Create async buffered reader
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
// Read line by line
while let Some(line) = lines.next_line().await? {
// 1. EMIT EVENT (if handler exists)
if let Some(handler) = &event_handler {
handler.handle_event(ExecutionEvent::Stdout {
execution_id,
line: line.clone(),
timestamp: Utc::now(),
}).await;
}
// 2. APPEND TO IN-MEMORY STATE
let mut state_lock = state.write().await; // Acquire write lock
// Add newline if not first line
if !state_lock.stdout.is_empty() {
state_lock.stdout.push('\n');
}
// Append the line
state_lock.stdout.push_str(&line);
// Lock released automatically when state_lock drops
}
Ok(())
}
```
**Flow**:
1. Read one line from stdout pipe
2. **Emit event** → Sent to EventHandler (e.g., Tauri frontend)
3. **Append to state** → Added to `ExecutionState.stdout` String
4. Repeat until stream closes
---
### 5. Line-by-Line Accumulation: STDERR
**Location**: [src/executor.rs:234-263](src/executor.rs#L234-L263)
```rust
async fn stream_stderr_static(
execution_id: uuid::Uuid,
stderr: ChildStderr,
state: Arc<RwLock<ExecutionState>>,
event_handler: Option<Arc<dyn EventHandler>>,
) -> Result<()> {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
// 1. EMIT EVENT
if let Some(handler) = &event_handler {
handler.handle_event(ExecutionEvent::Stderr {
execution_id,
line: line.clone(),
timestamp: Utc::now(),
}).await;
}
// 2. APPEND TO IN-MEMORY STATE
let mut state_lock = state.write().await;
if !state_lock.stderr.is_empty() {
state_lock.stderr.push('\n');
}
state_lock.stderr.push_str(&line);
}
Ok(())
}
```
**Identical logic to stdout**, just different field (`stderr` instead of `stdout`).
---
### 6. Waiting for Completion
**Location**: [src/executor.rs:94-104](src/executor.rs#L94-L104)
```rust
// Wait for process with timeout
let wait_result = self.wait_with_timeout_and_cancel(
child,
timeout_ms,
cancel_token
).await;
// Wait for output streaming to complete
let _ = stdout_handle.await; // ← Wait for stdout task to finish
let _ = stderr_handle.await; // ← Wait for stderr task to finish
```
**Critical**: Process might finish before all output is read, so we explicitly wait for streaming tasks.
---
### 7. Final Result Assembly
**Location**: [src/executor.rs:107-132](src/executor.rs#L107-L132)
```rust
let result = match wait_result {
Ok(exit_status) => {
let exit_code = exit_status.code().unwrap_or(-1);
let state_lock = state.read().await; // Read lock
let final_status = if exit_code == 0 {
ExecutionStatus::Completed
} else {
ExecutionStatus::Failed
};
// Create result - COPIES from state
let result = ExecutionResult {
id: execution_id,
status: final_status,
success: exit_code == 0,
exit_code,
stdout: state_lock.stdout.clone(), // ← Copy accumulated stdout
stderr: state_lock.stderr.clone(), // ← Copy accumulated stderr
duration: (Utc::now() - state_lock.started_at).to_std().unwrap_or(Duration::from_secs(0)),
started_at: state_lock.started_at,
completed_at: Some(Utc::now()),
error: None,
};
Ok(result)
}
// ... handle errors
}
```
**Key**: Output is **copied** from `ExecutionState` into `ExecutionResult`.
---
## Storage Lifecycle
### Timeline:
```
1. Command Spawned
└─ ExecutionState created with empty stdout/stderr
2. Process Running
├─ STDOUT Task: Read line → Emit event → Append to state.stdout
└─ STDERR Task: Read line → Emit event → Append to state.stderr
(Both happen in parallel, continuously)
3. Process Completes
├─ Wait for both streaming tasks to finish
└─ Copy stdout/stderr from state into ExecutionResult
4. Result Returned
└─ ExecutionResult contains complete output
5. State Retained in Memory
└─ ExecutionState remains in engine.executions HashMap
(Until cleanup task removes it after retention period)
```
---
## Memory Implications
### Per Execution Memory Usage:
```rust
ExecutionState memory footprint:
- Uuid: 16 bytes
- ExecutionRequest: ~100-500 bytes (depends on command length)
- ExecutionStatus: 1 byte (enum)
- Timestamps: 24 bytes (DateTime<Utc> = 12 bytes each)
- stdout: Variable (all stdout accumulated as String)
- stderr: Variable (all stderr accumulated as String)
- exit_code: 8 bytes (Option<i32>)
- error: Variable (Option<String>)
Total: ~150 bytes + stdout.len() + stderr.len() + request size
```
### Example Calculation:
| `echo hello` | 6 bytes | 0 bytes | ~156 bytes | Minimal |
| `aws s3 ls` | 10 KB | 0 bytes | ~10.15 KB | Typical |
| `docker build` | 500 KB | 50 KB | ~550 KB | Build logs |
| Verbose script | 5 MB | 100 KB | ~5.1 MB | Large output |
| **Runaway command** | 100 MB | 10 MB | ~110 MB | **Problem!** |
### Memory Risk:
**Current Implementation** (as of this analysis):
- ❌ **No size limit enforced** during streaming
- ❌ `max_output_size_bytes` config exists but not checked
- ⚠️ **Risk**: Commands with unbounded output can cause OOM
**Example OOM Scenario**:
```bash
# This would accumulate 1GB in memory!
---
## What About Log Files?
### Log Writing (Separate from Memory Storage)
**Location**: [src/executor.rs:100-103](src/executor.rs#L100-L103)
```rust
// Write logs if successful
if let Ok(ref exec_result) = result {
let _ = executor.write_logs(execution_id, exec_result).await;
}
```
**Logs are written AFTER execution completes**, not during streaming.
**Log Contents**: The complete stdout+stderr from `ExecutionResult` is written to a file.
**Log Path**: Determined by `config.log_dir` or temp directory.
**Important**: Logs are **separate** from in-memory storage. They're a persistent copy.
---
## Retrieval: How Users Get Output
### Method 1: Get Result (After Completion)
```rust
let result = engine.get_result(execution_id).await?;
println!("Stdout: {}", result.stdout); // ← Complete output as String
println!("Stderr: {}", result.stderr); // ← Complete errors as String
```
**Source**: Copied from `ExecutionState` → `ExecutionResult`
### Method 2: Wait for Completion
```rust
let result = engine.wait_for_completion(execution_id).await?;
// Same as get_result but blocks until complete
```
### Method 3: Real-Time Events (During Execution)
```rust
// In Tauri app or custom EventHandler
impl EventHandler for MyHandler {
async fn handle_event(&self, event: ExecutionEvent) {
match event {
ExecutionEvent::Stdout { execution_id, line, timestamp } => {
println!("STDOUT: {}", line); // ← Line-by-line as it happens
}
ExecutionEvent::Stderr { execution_id, line, timestamp } => {
eprintln!("STDERR: {}", line); // ← Line-by-line as it happens
}
_ => {}
}
}
}
```
**Source**: Emitted during streaming, before accumulation in state
---
## Comparison with Design Specification
### What Design Expected (architecture.md):
- ✅ Line-by-line streaming via async IO
- ✅ Parallel stdout/stderr processing
- ✅ Event emission per line
- ✅ Accumulated in execution state
- ⚠️ **Output size limiting** (MISSING - not enforced)
### What's Implemented:
- ✅ All streaming mechanisms work correctly
- ✅ State storage pattern matches design
- ✅ Event system fully functional
- ❌ **No size checking during accumulation**
- ❌ **`OversizedOutputStrategy` not applied**
---
## Critical Issue: Unbounded Memory Growth
### Problem:
Current implementation appends **indefinitely** to `state.stdout` and `state.stderr`:
```rust
// No size check!
state_lock.stdout.push_str(&line);
```
### Solution Needed:
Add size tracking and apply `OversizedOutputStrategy`:
```rust
// Pseudocode fix:
while let Some(line) = lines.next_line().await? {
let mut state_lock = state.write().await;
// Check size before appending
if state_lock.stdout.len() + line.len() > max_output_size {
match config.oversized_output_strategy {
TruncateWithWarning => {
state_lock.stdout.push_str("\n[... output truncated ...]");
break; // Stop reading
}
FailExecution => {
return Err(ExecutionError::OutputSizeExceeded);
}
StreamToFile => {
// Switch to file streaming
}
}
}
state_lock.stdout.push_str(&line);
}
```
---
## Summary
### Where Output is Stored:
1. **Primary Storage**: `ExecutionState.stdout` / `ExecutionState.stderr` (in-memory Strings)
2. **Secondary Storage**: Log files (written after completion)
3. **Event Stream**: Emitted line-by-line to EventHandler (not stored, just transmitted)
### How It's Stored:
- **Format**: Plain text Strings (not structured)
- **Accumulation**: Line-by-line with newline separators
- **Concurrency**: Protected by `RwLock<ExecutionState>`
- **Lifetime**: Retained until cleanup task removes execution
### Current Status:
- ✅ **Streaming works correctly**
- ✅ **Storage mechanism is sound**
- ❌ **Size limiting not enforced** (HIGH PRIORITY FIX NEEDED)
### Memory Usage Per Execution:
```
~150 bytes (metadata) + stdout.len() + stderr.len()
```
For 1000 executions with average 100 KB output each:
```
1000 * (150 bytes + 100 KB) ≈ 100 MB
```
**Without cleanup or size limiting, memory grows unbounded!**