escher-execution-engine 0.1.2

Production-ready async execution engine for system commands
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Repository Overview

**Execution Engine** - Production-ready async execution engine for system commands, built in Rust.

This is a standalone Rust library (crate) designed to be consumed by the Escher Tauri desktop application. It provides async/non-blocking command execution with real-time streaming, concurrency control, and event-driven updates.

**Part of**: Escher V2 monorepo (`/v2/v2-execution-engine-rust`)
**Status**: Implementation complete, production-ready
**Test Coverage**: 69 tests (62 unit + 6 integration + 1 doc-test)
**Code Quality**: 0 clippy warnings, 0 duplication in critical paths

> **📦 PLAYBOOK INTEGRATION (v2.0.0)**: This execution engine executes **individual commands only** and does NOT parse or handle playbook files. Playbook orchestration (multi-step execution, parameter injection, step dependencies) is handled by the **Execution Manager** (TypeScript npm package in the client layer). As of v2.0.0, playbooks use a single-file structure (playbook.json). See [../v2-architecture-docs/working-docs/PLAYBOOK-STRUCTURE-MIGRATION-IMPACT.md](../v2-architecture-docs/working-docs/PLAYBOOK-STRUCTURE-MIGRATION-IMPACT.md) for playbook structure details.

## Common Commands

### Building and Testing

```bash
# Run all tests (unit + integration + doc-tests)
cargo test

# Run only unit tests (in src/ modules)
cargo test --lib

# Run only integration tests
cargo test --test integration_test

# Run specific test by name
cargo test test_executor_timeout

# Run tests with output visible
cargo test -- --nocapture

# Run tests with specific filter
cargo test executor --lib
```

### Code Quality

```bash
# Format code (always run before committing)
cargo fmt

# Check if code needs formatting
cargo fmt --check

# Run linter (clippy with strict warnings)
cargo clippy -- -D warnings

# Build in release mode
cargo build --release

# Check compilation without building
cargo check
```

### Development Workflow

```bash
# Watch mode - rebuild on file changes (requires cargo-watch)
cargo watch -x test

# Run with environment variables
RUST_LOG=debug cargo test -- --nocapture

# Clean build artifacts
cargo clean
```

## Architecture Overview

### Module Structure

The codebase follows a layered architecture:

```
ExecutionEngine (engine.rs)
    ↓ orchestrates
Executor (executor.rs)
    ↓ uses
Command Builder (commands.rs)
    ↓ executes via
tokio::process::Command

Supporting modules:
- types.rs:   Core data structures (ExecutionRequest, ExecutionResult, ExecutionState)
- config.rs:  Configuration (ExecutionConfig, OversizedOutputStrategy)
- events.rs:  Event system (EventHandler trait, ExecutionEvent enum)
- errors.rs:  Error types (ExecutionError, Result alias)
- cleanup.rs: Memory cleanup background task
```

### Key Design Patterns

**1. Async Background Execution**
- `ExecutionEngine::execute()` returns immediately with `execution_id`
- Command execution happens in background tokio task
- Caller uses `get_status()`, `get_result()`, or `wait_for_completion()` to check progress

**2. Semaphore-Based Concurrency**
- `ExecutionConfig::max_concurrent_executions` limits parallel executions
- Tokio semaphore enforces limit (default: 100 concurrent)
- `execute()` returns `ConcurrencyLimitReached` error when at limit
- Permits automatically released when execution completes

**3. State Management**
- Type alias: `ExecutionMap = Arc<RwLock<HashMap<Uuid, Arc<RwLock<ExecutionState>>>>>`
- Type alias: `ExecutionHandle = Arc<RwLock<ExecutionState>>`
- Each execution has isolated state protected by RwLock
- State transitions: Pending → Running → Completed/Failed/Timeout/Cancelled
- Cleanup task periodically removes old executions based on retention policy

**4. Real-Time Streaming with Batched Writes**
- Stdout/stderr captured line-by-line via tokio `BufReader::lines()`
- **Batched writes**: 100 lines OR 64KB per batch (reduces lock contention by 50-100x)
- Each line triggers `ExecutionEvent::Stdout` or `ExecutionEvent::Stderr`
- Events sent to pluggable `EventHandler` trait implementations
- Lines accumulated in batches, then written to `ExecutionState` in single lock
- **Performance**: Handles 50,000+ lines of output efficiently

**5. Generic Streaming (Zero Duplication)**
- Single `stream_output<R: AsyncRead + Unpin>()` function for both stdout/stderr
- `StreamType` enum differentiates behavior (Stdout vs Stderr)
- Thin wrapper functions delegate to generic implementation
- **Code quality**: Eliminated 126 lines of duplication (15% reduction)

**6. Cancellation Support**
- Each execution gets a `CancellationToken` (tokio-util)
- `cancel()` triggers token, executor checks between operations
- Child process terminated via `tokio::process::Child::kill()`
- State transitions to `Cancelled` status

### Critical Implementation Details

**Streaming Error Handling (Critical Fixes Applied)**

Streaming task errors are properly handled to prevent silent failures:

```rust
// Wait for streaming tasks and check for errors
let stdout_result = match stdout_handle.await {
    Ok(Ok(())) => Ok(()),
    Ok(Err(e)) => {
        eprintln!("stdout streaming failed: {e}");
        Err(e)
    }
    Err(e) => {
        eprintln!("stdout task panicked: {e}");
        Err(ExecutionError::Internal(format!("stdout task panicked: {e}")))
    }
};

// Streaming errors take precedence over exit status (fixes race condition)
if let Err(e) = stdout_result {
    let mut state_lock = state.write().await;
    if !state_lock.status.is_terminal() {
        state_lock.status = ExecutionStatus::Failed;
        state_lock.error = Some(e.to_string());
        state_lock.completed_at = Some(Utc::now());
    }
    return Err(e);  // Short-circuit before checking exit code
}
```

**Lifetime Constraints in Async Spawning**

The executor uses `tokio::spawn()` which requires `'static` lifetime. This prevents borrowing `self`:

```rust
// WRONG - won't compile:
tokio::spawn(self.stream_stdout(execution_id, stdout, state));

// CORRECT - use static methods:
let event_handler = self.event_handler.clone();
tokio::spawn(async move {
    Self::stream_stdout_static(execution_id, stdout, state, event_handler).await
});
```

**Arc/RwLock Cloning Pattern**

For sharing state across async tasks:

```rust
let state = Arc::new(RwLock::new(ExecutionState::new(request)));
let state_clone = state.clone();  // Clone Arc, not inner data

tokio::spawn(async move {
    let mut state_lock = state_clone.write().await;  // Acquire write lock
    state_lock.status = ExecutionStatus::Running;     // Modify
    // Lock automatically released when state_lock goes out of scope
});
```

**Batched Write Pattern**

Reduces lock contention for high-volume output:

```rust
let mut buffer = Vec::new();
const BATCH_SIZE: usize = 100;      // Lines per batch
const BATCH_BYTES: usize = 64 * 1024;  // Bytes per batch (64KB)

while let Some(line) = lines.next_line().await? {
    buffer.push(line);

    // Flush batch when limit reached
    if buffer.len() >= BATCH_SIZE || total_size % BATCH_BYTES < line_size {
        let batch = buffer.join("\n");
        let mut state_lock = state.write().await;
        let output_field = stream_type.get_output_mut(&mut state_lock);
        output_field.push_str(&batch);
        drop(state_lock);  // Explicit lock release
        buffer.clear();
    }
}
```

**Generic Streaming with StreamType**

```rust
enum StreamType {
    Stdout,
    Stderr,
}

impl StreamType {
    fn get_output_mut<'a>(&self, state: &'a mut ExecutionState) -> &'a mut String {
        match self {
            StreamType::Stdout => &mut state.stdout,
            StreamType::Stderr => &mut state.stderr,
        }
    }

    fn create_event(&self, execution_id: Uuid, line: String) -> ExecutionEvent {
        // Returns appropriate Stdout or Stderr event
    }
}

// Single implementation for both streams
async fn stream_output<R: AsyncRead + Unpin>(
    output: R,
    stream_type: StreamType,
    // ... other params
) -> Result<()> {
    // Generic streaming logic using stream_type for differentiation
}
```

### Command Types

The engine supports three command types (see `Command` enum in types.rs):

1. **Shell**: Execute string command via shell (bash/sh)
   - Example: `Command::Shell { command: "ls -la", shell: "/bin/bash" }`
   - Use for: Complex commands with pipes, redirects, wildcards

2. **Exec**: Execute program with argument array
   - Example: `Command::Exec { program: "echo", args: vec!["hello".to_string()] }`
   - Use for: Direct program execution without shell interpretation

3. **Script**: Execute script file with interpreter
   - Example: `Command::Script { path: PathBuf::from("script.sh"), interpreter: Some("bash") }`
   - Use for: Running script files (.sh, .py, .js, etc.)

4. **AwsCli**: AWS CLI command builder (convenience wrapper)
   - Example: `Command::AwsCli { service: "s3", command: "ls", args: vec!["s3://bucket"] }`
   - Use for: AWS CLI operations with standardized structure

## Configuration

### ExecutionConfig Fields

Located in [src/config.rs](src/config.rs):

- **default_timeout_ms**: Default timeout when request doesn't specify (5 min)
- **max_timeout_ms**: Maximum allowed timeout to prevent abuse (1 hour)
- **stream_output**: Enable line-by-line streaming vs buffered (true)
- **log_dir**: Optional directory for execution logs (None = no logging)
- **max_concurrent_executions**: Semaphore limit (100)
- **max_in_memory_executions**: Memory cleanup threshold (1000)
- **execution_retention_secs**: How long to keep completed executions (1 hour)
- **enable_auto_cleanup**: Run periodic cleanup task (true)
- **max_output_size_bytes**: Max stdout+stderr size (10 MB)
- **oversized_output_strategy**: How to handle large output (TruncateWithWarning, FailExecution, StreamToFile)

### Strategy Enum: OversizedOutputStrategy

When output exceeds `max_output_size_bytes`:
- **TruncateWithWarning**: Keep first N bytes, append warning message
- **FailExecution**: Fail with `ExecutionError::OutputSizeExceeded` (streaming error takes precedence)
- **StreamToFile**: Streams remaining output to temp file, stores path in `stdout_overflow_file`/`stderr_overflow_file` fields

## Testing Philosophy

### Test Organization

- **Unit tests**: Inline `#[cfg(test)] mod tests` in each module file
- **Integration tests**: `tests/integration_test.rs` for end-to-end scenarios
- **Doc-tests**: Examples in `//!` documentation comments in [src/lib.rs](src/lib.rs)

### Running Test Subsets

```bash
# Single module's unit tests
cargo test --lib commands::tests

# Single integration test
cargo test --test integration_test test_script_execution

# All tests matching pattern
cargo test timeout

# Specific test with full output
cargo test test_executor_timeout -- --nocapture --exact
```

### Test Script Pattern

Integration tests use [test-scripts/simple-test.sh](test-scripts/simple-test.sh) to verify:
- Script execution with interpreter detection
- Environment variable passing
- Stdout/stderr capture
- Exit code handling

Create test scripts as needed in `test-scripts/` directory.

## Code Style Standards

### Rust Best Practices

1. **No deprecated code** - Remove deprecated code entirely, never suppress warnings with `#[allow(deprecated)]`
2. **Import scoping** - Keep imports in narrowest scope (prefer test module over file-level)
3. **Error propagation** - Use `?` operator, avoid `.unwrap()` except in tests
4. **Async ownership** - Clone Arc references before moving into spawned tasks
5. **Documentation** - All public items must have `///` doc comments
6. **Type aliases** - Use type aliases for complex types (e.g., `ExecutionMap`, `ExecutionHandle`)
7. **Generic functions** - Prefer generic implementations over duplicated code (e.g., `stream_output<R>`)

### Formatting Rules

- **Always run `cargo fmt` before committing**
- Use `cargo fmt --check` in CI to enforce
- Follow Rust 2021 edition style guide
- 100 character line limit (soft, enforced by rustfmt)

### Clippy Linting

Run with warnings as errors:
```bash
cargo clippy -- -D warnings
```

**Current status**: 0 warnings (maintained through 3 phases of code quality improvements)

Common issues that were fixed:
- ~~Unused imports~~ - Fixed: moved to test modules or removed
- ~~Unnecessary clones~~ - Fixed: proper Arc vs inner data analysis
- ~~Missing `#[must_use]`~~ - Fixed: added to 14 Result-returning functions
- ~~Complex types~~ - Fixed: added type aliases (`ExecutionMap`, `ExecutionHandle`)
- ~~Parameter types~~ - Fixed: `&PathBuf` → `&Path` for idiomatic Rust
- ~~Code duplication~~ - Fixed: generic `stream_output()` eliminates 96% duplication

## Integration with Tauri

This crate is designed to be consumed by the Escher Tauri desktop app located at `/v2/v2-desktop-app-tauri`.

### Typical Integration Pattern

```rust
// In Tauri app's Cargo.toml:
[dependencies]
execution-engine = { path = "../v2-execution-engine-rust" }

// In src-tauri/src/main.rs:
use execution_engine::{ExecutionEngine, ExecutionConfig, ExecutionRequest};
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let config = ExecutionConfig::default();
    let engine = Arc::new(ExecutionEngine::new(config).unwrap());

    tauri::Builder::default()
        .manage(engine)  // Share engine across all commands
        .invoke_handler(tauri::generate_handler![
            execute_command,
            get_execution_status,
        ])
        .run(tauri::generate_context!())
        .expect("error while running tauri application");
}

#[tauri::command]
async fn execute_command(
    request: ExecutionRequest,
    engine: tauri::State<'_, Arc<ExecutionEngine>>,
) -> Result<String, String> {
    engine.execute(request).await
        .map(|id| id.to_string())
        .map_err(|e| e.to_string())
}
```

### Event Handler for Tauri

Implement `EventHandler` trait to emit Tauri events:

```rust
use execution_engine::{EventHandler, ExecutionEvent};
use async_trait::async_trait;
use tauri::{Manager, AppHandle};

struct TauriEventHandler {
    app_handle: AppHandle,
}

#[async_trait]
impl EventHandler for TauriEventHandler {
    async fn handle_event(&self, event: ExecutionEvent) {
        // Emit to Tauri frontend
        self.app_handle.emit_all("execution-event", &event).ok();
    }
}

// Usage:
let handler = Arc::new(TauriEventHandler { app_handle: app.handle() });
let engine = ExecutionEngine::new(config)
    .unwrap()
    .with_event_handler(handler);
```

## Common Pitfalls

### 1. Forgetting to Handle ConcurrencyLimitReached

```rust
// BAD - will panic if at limit:
let id = engine.execute(request).await.unwrap();

// GOOD - handle error:
match engine.execute(request).await {
    Ok(id) => // proceed
    Err(ExecutionError::ConcurrencyLimitReached(limit)) => {
        // Queue for later or return error to user
    }
    Err(e) => // other error
}
```

### 2. Not Waiting for Completion

```rust
// BAD - may return None if not complete:
let result = engine.get_result(execution_id).await?;

// GOOD - wait for completion:
let result = engine.wait_for_completion(execution_id).await?;
// Returns ExecutionResult with status field
```

### 3. Leaking Executions

The cleanup task handles this automatically if `enable_auto_cleanup: true`, but you can manually clean up:

```rust
// Remove specific execution
engine.remove_execution(execution_id).await?;

// Or trigger cleanup manually
engine.cleanup().await;  // Removes executions older than retention period
```

### 4. Timeout vs ExecutionResult Status

A timeout is NOT an error - it returns `Ok(ExecutionResult)` with `status: Timeout`:

```rust
// BAD - wrong expectation:
let result = engine.wait_for_completion(id).await;
assert!(result.is_err());  // Wrong! Timeout is not Err

// GOOD - check status:
let result = engine.wait_for_completion(id).await?;
assert_eq!(result.status, ExecutionStatus::Timeout);
assert!(!result.success);  // success flag will be false
```

### 5. Output Size Limit Behavior (After Critical Fixes)

When using `OversizedOutputStrategy::FailExecution`, the execute() call will return an error:

```rust
// Output size exceeded returns Err (after C1/C2 fixes):
match engine.wait_for_completion(id).await {
    Ok(result) => // Process completed within limits
    Err(ExecutionError::OutputSizeExceeded(limit)) => {
        // Streaming failed due to output size
        // State is already marked as Failed
    }
    Err(e) => // Other errors
}
```

## Related Documentation

### Internal Documentation

Each source file has detailed module-level documentation:
- [src/lib.rs](src/lib.rs) - Public API overview with example
- [src/engine.rs](src/engine.rs) - ExecutionEngine implementation
- [src/executor.rs](src/executor.rs) - Command execution logic (batched writes, generic streaming)
- [src/types.rs](src/types.rs) - Data structure definitions
- [src/config.rs](src/config.rs) - Configuration specification
- [src/events.rs](src/events.rs) - Event system design
- [src/errors.rs](src/errors.rs) - Error types and patterns
- [src/commands.rs](src/commands.rs) - Command builder utilities
- [src/cleanup.rs](src/cleanup.rs) - Background cleanup implementation (uses type aliases)

### Code Review Documentation

Comprehensive code review and quality improvements documented in:
- [docs/CODE-REVIEW-ACTION-PLAN.md](docs/CODE-REVIEW-ACTION-PLAN.md) - Master action plan with 3 phases completed
- [docs/CODE-REVIEW-EXECUTOR.md](docs/CODE-REVIEW-EXECUTOR.md) - Technical deep dive (6,000 lines)
- [docs/CODE-QUALITY-MAINTENANCE.md](docs/CODE-QUALITY-MAINTENANCE.md) - Quality audit

**Phase 1: Quick Wins** ✅
- 98 clippy warnings → 0
- Added type aliases, #[must_use] attributes, comprehensive documentation

**Phase 2: Critical Issues** ✅
- Fixed streaming error handling (C1)
- Fixed race condition in output size limit (C2)
- Implemented batched writes (C3) - 50-100x performance improvement

**Phase 3: Code Quality** ✅
- Eliminated 96% code duplication (I2) - generic streaming implementation
- 126 lines of duplication removed (15% code reduction)

### External Documentation (Architecture Docs Repo)

Full specification with diagrams and design decisions:
- Architecture: `/v2-architecture-docs/docs/04-services/libraries/execution-engine/`
- API Reference: `api.md`
- Types Reference: `types.md`
- Event Handlers: `event-handlers.md`
- Error Handling: `error-handling.md`
- Cargo Integration: `cargo-integration.md`

## Development Workflow

### Making Changes

1. **Create feature branch** from main
2. **Implement changes** with inline unit tests
3. **Add integration test** if adding new feature
4. **Run full test suite**: `cargo test`
5. **Format code**: `cargo fmt`
6. **Run linter**: `cargo clippy -- -D warnings`
7. **Update doc comments** if API changed
8. **Test doc examples**: `cargo test --doc`

### Before Committing

```bash
# Check everything passes
cargo fmt --check && \
cargo clippy -- -D warnings && \
cargo test && \
echo "✅ All checks passed"
```

### Debug Tips

```bash
# Enable debug logging
RUST_LOG=execution_engine=debug cargo test -- --nocapture

# Run with backtrace on panic
RUST_BACKTRACE=1 cargo test

# Run single test in isolation with full output
cargo test test_name -- --exact --nocapture

# Check for unused dependencies (requires nightly)
cargo +nightly udeps
```

## Performance Considerations

### Benchmarking

The crate includes Criterion benchmarks (planned for future):

```bash
# Run all benchmarks
cargo bench

# Run specific benchmark
cargo bench execution_overhead

# Generate HTML report
cargo bench -- --save-baseline main
```

### Performance Targets (Achieved)

- **Execution overhead**: <50ms per command ✅
- **Concurrent throughput**: 100+ concurrent executions ✅
- **Memory per execution**: ~10KB baseline + stdout/stderr size ✅
- **Cleanup performance**: <1s for 1000 executions ✅
- **Lock contention**: 50-100x reduction via batched writes ✅
- **High-volume output**: 50,000+ lines handled efficiently ✅

### Memory Management

- Completed executions kept in memory for `execution_retention_secs` (default: 1 hour)
- Cleanup task runs every 60 seconds when `enable_auto_cleanup: true`
- Large output (>10MB) handled by `OversizedOutputStrategy`
- Arc/RwLock pattern ensures no data races in concurrent access
- **Batched writes**: Minimize lock acquisitions (100 lines OR 64KB per batch)

### Optimization Strategies

1. **Reduce retention time** if memory constrained (lower `execution_retention_secs`)
2. **Lower concurrency limit** if CPU/IO bound (lower `max_concurrent_executions`)
3. **Enable StreamToFile** for large output operations (set `oversized_output_strategy`)
4. **Disable streaming** if only final result needed (set `stream_output: false`)
5. **Manual cleanup** after retrieval (call `remove_execution()` immediately)
6. **Batching already optimized** - 100 lines or 64KB batches reduce lock contention by 50-100x