escher-execution-engine 0.1.2

Production-ready async execution engine for system commands
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
//! Integration tests for ExecutionEngine
//!
//! End-to-end tests that verify the complete execution pipeline.

use execution_engine::*;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;

#[tokio::test]
async fn test_simple_shell_command() {
    // Create engine with default config
    let config = ExecutionConfig::default();
    let engine = ExecutionEngine::new(config).unwrap();

    // Create a simple shell command request
    let request = ExecutionRequest {
        id: Uuid::new_v4(),
        command: Command::Shell {
            command: "echo 'Hello from execution engine!'".to_string(),
            shell: "bash".to_string(),
        },
        env: HashMap::new(),
        working_dir: None,
        timeout_ms: Some(5000),
        output_log_path: None,
        metadata: ExecutionMetadata::default(),
    };

    // Execute
    let execution_id = engine.execute(request).await.unwrap();
    println!("Execution started with ID: {}", execution_id);

    // Wait for completion
    let result = engine.wait_for_completion(execution_id).await.unwrap();

    // Verify results
    assert_eq!(result.status, ExecutionStatus::Completed);
    assert_eq!(result.exit_code, 0);
    assert!(result.success);
    assert!(result.stdout.contains("Hello from execution engine!"));

    println!("✅ Simple shell command test passed!");
    println!("   Status: {:?}", result.status);
    println!("   Exit code: {}", result.exit_code);
    println!("   Duration: {:?}", result.duration);
    println!("   Stdout: {}", result.stdout);
}

#[tokio::test]
async fn test_script_execution() {
    // Get the script path
    let script_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
        .join("test-scripts")
        .join("simple-test.sh");

    // Verify script exists
    if !script_path.exists() {
        panic!("Test script not found at: {:?}", script_path);
    }

    // Create engine
    let config = ExecutionConfig::default();
    let engine = ExecutionEngine::new(config).unwrap();

    // Create request with environment variable
    let mut env = HashMap::new();
    env.insert("TEST_VAR".to_string(), "test_value_123".to_string());

    let request = ExecutionRequest {
        id: Uuid::new_v4(),
        command: Command::Script {
            path: script_path.clone(),
            interpreter: Some("bash".to_string()),
        },
        env,
        working_dir: None,
        timeout_ms: Some(5000),
        output_log_path: None,
        metadata: ExecutionMetadata::default(),
    };

    // Execute
    let execution_id = engine.execute(request).await.unwrap();
    println!("Script execution started with ID: {}", execution_id);

    // Wait for completion
    let result = engine.wait_for_completion(execution_id).await.unwrap();

    // Verify results
    assert_eq!(result.status, ExecutionStatus::Completed);
    assert_eq!(result.exit_code, 0);
    assert!(result.success);

    // Check stdout contains expected output
    assert!(result.stdout.contains("Starting execution..."));
    assert!(result.stdout.contains("TEST_VAR is set to: test_value_123"));
    assert!(result.stdout.contains("Execution completed successfully!"));

    // Check stderr contains the stderr message
    assert!(result.stderr.contains("This is a stderr message"));

    println!("✅ Script execution test passed!");
    println!("   Status: {:?}", result.status);
    println!("   Exit code: {}", result.exit_code);
    println!("   Duration: {:?}", result.duration);
    println!("   Stdout lines: {}", result.stdout.lines().count());
    println!("   Stderr lines: {}", result.stderr.lines().count());
    println!("\nStdout:\n{}", result.stdout);
    println!("\nStderr:\n{}", result.stderr);
}

#[tokio::test]
async fn test_concurrent_executions() {
    // Create engine with concurrency limit
    let config = ExecutionConfig {
        max_concurrent_executions: 10,
        ..Default::default()
    };
    let engine = Arc::new(ExecutionEngine::new(config).unwrap());

    // Execute 5 commands concurrently
    let mut handles = vec![];

    for i in 0..5 {
        let engine_clone = Arc::clone(&engine);
        let handle = tokio::spawn(async move {
            let request = ExecutionRequest {
                id: Uuid::new_v4(),
                command: Command::Shell {
                    command: format!("echo 'Task {}' && sleep 0.1", i),
                    shell: "bash".to_string(),
                },
                env: HashMap::new(),
                working_dir: None,
                timeout_ms: Some(5000),
                output_log_path: None,
                metadata: ExecutionMetadata::default(),
            };

            let execution_id = engine_clone.execute(request).await.unwrap();
            let result = engine_clone
                .wait_for_completion(execution_id)
                .await
                .unwrap();
            (i, result)
        });
        handles.push(handle);
    }

    // Wait for all to complete
    let mut results = vec![];
    for handle in handles {
        let (task_id, result) = handle.await.unwrap();
        results.push((task_id, result));
    }

    // Verify all completed successfully
    assert_eq!(results.len(), 5);
    for (task_id, result) in results {
        assert_eq!(result.status, ExecutionStatus::Completed);
        assert_eq!(result.exit_code, 0);
        assert!(result.stdout.contains(&format!("Task {}", task_id)));
    }

    println!("✅ Concurrent execution test passed!");
    println!("   Executed 5 tasks concurrently");
    println!("   All tasks completed successfully");
}

#[tokio::test]
async fn test_list_and_count_operations() {
    let config = ExecutionConfig::default();
    let engine = ExecutionEngine::new(config).unwrap();

    // Initially should be empty
    assert_eq!(engine.total_count().await, 0);
    assert_eq!(engine.running_count().await, 0);

    // Execute some commands
    let request1 = ExecutionRequest {
        id: Uuid::new_v4(),
        command: Command::Shell {
            command: "echo 'test1'".to_string(),
            shell: "bash".to_string(),
        },
        env: HashMap::new(),
        working_dir: None,
        timeout_ms: Some(5000),
        output_log_path: None,
        metadata: ExecutionMetadata::default(),
    };

    let request2 = ExecutionRequest {
        id: Uuid::new_v4(),
        command: Command::Shell {
            command: "echo 'test2'".to_string(),
            shell: "bash".to_string(),
        },
        env: HashMap::new(),
        working_dir: None,
        timeout_ms: Some(5000),
        output_log_path: None,
        metadata: ExecutionMetadata::default(),
    };

    let id1 = engine.execute(request1).await.unwrap();
    let id2 = engine.execute(request2).await.unwrap();

    // Wait for completion
    tokio::time::sleep(std::time::Duration::from_millis(500)).await;

    // Check counts
    let total = engine.total_count().await;
    assert_eq!(total, 2);

    // List executions
    let list = engine.list_executions().await;
    assert_eq!(list.len(), 2);

    // Verify both IDs are in the list
    let ids: Vec<Uuid> = list.iter().map(|s| s.id).collect();
    assert!(ids.contains(&id1));
    assert!(ids.contains(&id2));

    println!("✅ List and count operations test passed!");
    println!("   Total executions: {}", total);
    println!("   Listed executions: {}", list.len());
}

#[tokio::test]
async fn test_timeout_handling() {
    let config = ExecutionConfig {
        default_timeout_ms: 1000, // 1 second
        ..Default::default()
    };
    let engine = ExecutionEngine::new(config).unwrap();

    let request = ExecutionRequest {
        id: Uuid::new_v4(),
        command: Command::Shell {
            command: "sleep 10".to_string(), // Will timeout
            shell: "bash".to_string(),
        },
        env: HashMap::new(),
        working_dir: None,
        timeout_ms: Some(1000), // 1 second timeout
        output_log_path: None,
        metadata: ExecutionMetadata::default(),
    };

    let execution_id = engine.execute(request).await.unwrap();

    // Wait for it to timeout
    let result = engine.wait_for_completion(execution_id).await.unwrap();

    // Should have timed out
    assert_eq!(result.status, ExecutionStatus::Timeout);
    assert!(!result.success);

    // Check status
    let status = engine.get_status(execution_id).await.unwrap();
    assert_eq!(status, ExecutionStatus::Timeout);

    println!("✅ Timeout handling test passed!");
    println!("   Execution timed out as expected");
    println!("   Status: {:?}", result.status);
}

#[tokio::test]
async fn test_failed_command() {
    let config = ExecutionConfig::default();
    let engine = ExecutionEngine::new(config).unwrap();

    let request = ExecutionRequest {
        id: Uuid::new_v4(),
        command: Command::Shell {
            command: "exit 42".to_string(), // Non-zero exit code
            shell: "bash".to_string(),
        },
        env: HashMap::new(),
        working_dir: None,
        timeout_ms: Some(5000),
        output_log_path: None,
        metadata: ExecutionMetadata::default(),
    };

    let execution_id = engine.execute(request).await.unwrap();
    let result = engine.wait_for_completion(execution_id).await.unwrap();

    // Should complete but with failure status
    assert_eq!(result.status, ExecutionStatus::Failed);
    assert_eq!(result.exit_code, 42);
    assert!(!result.success);

    println!("✅ Failed command test passed!");
    println!("   Exit code: {}", result.exit_code);
    println!("   Status: {:?}", result.status);
}

#[tokio::test]
async fn test_cancellation() {
    let config = ExecutionConfig::default();
    let engine = ExecutionEngine::new(config).unwrap();

    // Execute a long-running command
    let request = ExecutionRequest {
        id: Uuid::new_v4(),
        command: Command::Shell {
            command: "sleep 30".to_string(), // Long sleep
            shell: "bash".to_string(),
        },
        env: HashMap::new(),
        working_dir: None,
        timeout_ms: Some(60000), // 60 second timeout (won't hit)
        output_log_path: None,
        metadata: ExecutionMetadata::default(),
    };

    let execution_id = engine.execute(request).await.unwrap();
    println!("Started execution: {}", execution_id);

    // Wait a bit for it to start
    tokio::time::sleep(std::time::Duration::from_millis(500)).await;

    // Cancel it
    let cancel_result = engine.cancel(execution_id).await;
    println!("Cancellation result: {:?}", cancel_result);
    assert!(cancel_result.is_ok(), "Cancellation should succeed");

    // Wait for it to actually terminate
    tokio::time::sleep(std::time::Duration::from_secs(2)).await;

    // Check status
    let result = engine.get_result(execution_id).await.unwrap();
    println!("Result status: {:?}", result.status);
    println!("Exit code: {}", result.exit_code);

    // Should be cancelled
    assert_eq!(result.status, ExecutionStatus::Cancelled);
    assert!(!result.success);

    println!("✅ Cancellation test passed!");
    println!("   Execution was successfully cancelled");
}

#[tokio::test]
async fn test_stream_to_file_strategy() {
    // Create config with small output limit and StreamToFile strategy
    let config = ExecutionConfig {
        max_output_size_bytes: 1024, // 1KB limit
        oversized_output_strategy: execution_engine::OversizedOutputStrategy::StreamToFile,
        ..Default::default()
    };
    let engine = ExecutionEngine::new(config).unwrap();

    // Generate large output that exceeds limit
    let request = ExecutionRequest {
        id: Uuid::new_v4(),
        command: Command::Shell {
            // Generate ~5KB of output (well over 1KB limit)
            command: "for i in {1..100}; do echo \"Line $i: This is a test line with some content to make it longer\"; done".to_string(),
            shell: "bash".to_string(),
        },
        env: HashMap::new(),
        working_dir: None,
        timeout_ms: Some(10000),
        output_log_path: None,
        metadata: ExecutionMetadata::default(),
    };

    let execution_id = engine.execute(request).await.unwrap();
    let result = engine.wait_for_completion(execution_id).await.unwrap();

    // Should complete successfully
    assert_eq!(result.status, ExecutionStatus::Completed);
    assert!(result.success);

    // In-memory stdout should be limited
    println!("Stdout size: {} bytes", result.stdout.len());
    assert!(
        result.stdout.len() <= 2000,
        "Stdout should be limited to ~1KB + warning"
    );

    // Should contain warning message
    assert!(
        result
            .stdout
            .contains("[OUTPUT LIMIT REACHED: Remaining output streamed to"),
        "Should contain overflow warning"
    );

    // Should have overflow file path
    assert!(
        result.stdout_overflow_file.is_some(),
        "Should have stdout overflow file"
    );

    // Verify overflow file exists and contains remaining output
    if let Some(overflow_path) = &result.stdout_overflow_file {
        assert!(
            overflow_path.exists(),
            "Overflow file should exist at {:?}",
            overflow_path
        );

        let overflow_content = tokio::fs::read_to_string(overflow_path).await.unwrap();
        println!("Overflow file size: {} bytes", overflow_content.len());
        assert!(
            overflow_content.len() > 1000,
            "Overflow file should contain substantial content"
        );

        // Cleanup: remove overflow file
        tokio::fs::remove_file(overflow_path).await.ok();
    }

    println!("✅ StreamToFile test passed!");
    println!("   In-memory output: {} bytes", result.stdout.len());
    println!("   Overflow file: {:?}", result.stdout_overflow_file);
}