use crate::commands::{build_command, command_to_string};
use crate::config::ExecutionConfig;
use crate::errors::{ExecutionError, Result};
use crate::events::{EventHandler, ExecutionEvent};
use crate::types::{ExecutionRequest, ExecutionResult, ExecutionState, ExecutionStatus};
use chrono::Utc;
use std::path::PathBuf;
use std::process::ExitStatus;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStderr, ChildStdout};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
#[cfg(feature = "server-feedback")]
use cloudops_network::{ExecutionFeedback, ExecutionStatus as NetworkExecutionStatus};
#[derive(Debug, Clone, Copy)]
enum StreamType {
Stdout,
Stderr,
}
impl StreamType {
fn get_output_mut<'a>(&self, state: &'a mut ExecutionState) -> &'a mut String {
match self {
StreamType::Stdout => &mut state.stdout,
StreamType::Stderr => &mut state.stderr,
}
}
fn create_event(&self, execution_id: uuid::Uuid, line: String) -> ExecutionEvent {
let timestamp = Utc::now();
match self {
StreamType::Stdout => ExecutionEvent::Stdout {
execution_id,
line,
timestamp,
},
StreamType::Stderr => ExecutionEvent::Stderr {
execution_id,
line,
timestamp,
},
}
}
}
pub struct Executor {
config: ExecutionConfig,
event_handler: Option<Arc<dyn EventHandler>>,
}
impl Executor {
pub fn new(config: ExecutionConfig) -> Self {
Self {
config,
event_handler: None,
}
}
pub fn with_event_handler(mut self, handler: Arc<dyn EventHandler>) -> Self {
self.event_handler = Some(handler);
self
}
pub async fn execute(
&self,
request: ExecutionRequest,
state: Arc<RwLock<ExecutionState>>,
cancel_token: CancellationToken,
) -> Result<ExecutionResult> {
let execution_id = request.id;
let command_str = command_to_string(&request.command);
let output_log_path = request.output_log_path.clone();
self.emit_start_event(execution_id, &command_str, &state)
.await;
let mut child = self.spawn_command(&request)?;
let (stdout, stderr) = self.capture_streams(&mut child)?;
let (stdout_handle, stderr_handle) =
self.start_streaming_tasks(execution_id, stdout, stderr, &state, output_log_path);
let timeout_ms = request.timeout_ms.unwrap_or(self.config.default_timeout_ms);
let wait_result = self
.wait_with_timeout_and_cancel(child, timeout_ms, cancel_token)
.await;
self.handle_streaming_errors(stdout_handle, stderr_handle, &state)
.await?;
self.process_result(wait_result, state, execution_id, command_str)
.await
}
async fn emit_start_event(
&self,
execution_id: uuid::Uuid,
command_str: &str,
state: &Arc<RwLock<ExecutionState>>,
) {
self.emit_event(ExecutionEvent::Started {
execution_id,
command: command_str.to_string(),
timestamp: Utc::now(),
})
.await;
let mut state_lock = state.write().await;
state_lock.status = ExecutionStatus::Running;
}
fn spawn_command(&self, request: &ExecutionRequest) -> Result<Child> {
let mut cmd = build_command(request)?;
cmd.spawn()
.map_err(|e| ExecutionError::SpawnFailed(format!("Failed to spawn command: {e}")))
}
fn capture_streams(&self, child: &mut Child) -> Result<(ChildStdout, ChildStderr)> {
let stdout = child
.stdout
.take()
.ok_or_else(|| ExecutionError::Internal("Failed to capture stdout".to_string()))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| ExecutionError::Internal("Failed to capture stderr".to_string()))?;
Ok((stdout, stderr))
}
fn start_streaming_tasks(
&self,
execution_id: uuid::Uuid,
stdout: ChildStdout,
stderr: ChildStderr,
state: &Arc<RwLock<ExecutionState>>,
output_log_path: Option<PathBuf>,
) -> (
tokio::task::JoinHandle<Result<()>>,
tokio::task::JoinHandle<Result<()>>,
) {
let state_clone = Arc::clone(state);
let event_handler = self.event_handler.clone();
let max_output_size = self.config.max_output_size_bytes;
let oversized_strategy = self.config.oversized_output_strategy;
let log_path_clone = output_log_path.clone();
let stdout_handle = tokio::spawn(async move {
Self::stream_stdout_static(
execution_id,
stdout,
state_clone,
event_handler,
max_output_size,
oversized_strategy,
log_path_clone,
)
.await
});
let state_clone = Arc::clone(state);
let event_handler = self.event_handler.clone();
let max_output_size = self.config.max_output_size_bytes;
let oversized_strategy = self.config.oversized_output_strategy;
let stderr_handle = tokio::spawn(async move {
Self::stream_stderr_static(
execution_id,
stderr,
state_clone,
event_handler,
max_output_size,
oversized_strategy,
output_log_path,
)
.await
});
(stdout_handle, stderr_handle)
}
async fn handle_streaming_errors(
&self,
stdout_handle: tokio::task::JoinHandle<Result<()>>,
stderr_handle: tokio::task::JoinHandle<Result<()>>,
state: &Arc<RwLock<ExecutionState>>,
) -> Result<()> {
let stdout_result = match stdout_handle.await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => {
eprintln!("stdout streaming failed: {e}");
Err(e)
}
Err(e) => {
eprintln!("stdout task panicked: {e}");
Err(ExecutionError::Internal(format!(
"stdout task panicked: {e}"
)))
}
};
let stderr_result = match stderr_handle.await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => {
eprintln!("stderr streaming failed: {e}");
Err(e)
}
Err(e) => {
eprintln!("stderr task panicked: {e}");
Err(ExecutionError::Internal(format!(
"stderr task panicked: {e}"
)))
}
};
if let Err(e) = stdout_result {
let mut state_lock = state.write().await;
if !state_lock.status.is_terminal() {
state_lock.status = ExecutionStatus::Failed;
state_lock.error = Some(e.to_string());
state_lock.completed_at = Some(Utc::now());
}
return Err(e);
}
if let Err(e) = stderr_result {
let mut state_lock = state.write().await;
if !state_lock.status.is_terminal() {
state_lock.status = ExecutionStatus::Failed;
state_lock.error = Some(e.to_string());
state_lock.completed_at = Some(Utc::now());
}
return Err(e);
}
Ok(())
}
async fn process_result(
&self,
wait_result: Result<ExitStatus>,
state: Arc<RwLock<ExecutionState>>,
execution_id: uuid::Uuid,
command_str: String,
) -> Result<ExecutionResult> {
match wait_result {
Ok(exit_status) => {
let exit_code = exit_status.code().unwrap_or(-1);
let state_lock = state.read().await;
let final_status = if exit_code == 0 {
ExecutionStatus::Completed
} else {
ExecutionStatus::Failed
};
let result = ExecutionResult {
id: execution_id,
status: final_status,
success: exit_code == 0,
exit_code,
stdout: state_lock.stdout.clone(),
stderr: state_lock.stderr.clone(),
duration: (Utc::now() - state_lock.started_at)
.to_std()
.unwrap_or(Duration::from_secs(0)),
started_at: state_lock.started_at,
completed_at: Some(Utc::now()),
error: None,
stdout_overflow_file: state_lock.stdout_overflow_file.clone(),
stderr_overflow_file: state_lock.stderr_overflow_file.clone(),
};
drop(state_lock);
let mut state_lock = state.write().await;
state_lock.status = final_status;
state_lock.exit_code = Some(exit_code);
state_lock.completed_at = Some(Utc::now());
self.emit_event(ExecutionEvent::Completed {
execution_id,
result: result.clone(),
timestamp: Utc::now(),
})
.await;
#[cfg(feature = "server-feedback")]
self.send_feedback(&result, &command_str).await;
#[cfg(not(feature = "server-feedback"))]
let _ = command_str;
Ok(result)
}
Err(e) => self.handle_execution_error(e, state, execution_id).await,
}
}
async fn handle_execution_error(
&self,
error: ExecutionError,
state: Arc<RwLock<ExecutionState>>,
execution_id: uuid::Uuid,
) -> Result<ExecutionResult> {
match error {
ExecutionError::Timeout(ms) => {
let mut state_lock = state.write().await;
state_lock.status = ExecutionStatus::Timeout;
state_lock.completed_at = Some(Utc::now());
state_lock.error = Some(format!("Execution timed out after {ms}ms"));
self.emit_event(ExecutionEvent::Timeout {
execution_id,
timeout_ms: ms,
timestamp: Utc::now(),
})
.await;
Err(ExecutionError::Timeout(ms))
}
ExecutionError::Cancelled => {
let mut state_lock = state.write().await;
state_lock.status = ExecutionStatus::Cancelled;
state_lock.completed_at = Some(Utc::now());
state_lock.error = Some("Execution cancelled by user".to_string());
self.emit_event(ExecutionEvent::Cancelled {
execution_id,
timestamp: Utc::now(),
})
.await;
Err(ExecutionError::Cancelled)
}
e => {
let mut state_lock = state.write().await;
state_lock.status = ExecutionStatus::Failed;
state_lock.completed_at = Some(Utc::now());
state_lock.error = Some(e.to_string());
self.emit_event(ExecutionEvent::Failed {
execution_id,
error: e.to_string(),
timestamp: Utc::now(),
})
.await;
Err(e)
}
}
}
#[allow(clippy::too_many_arguments)]
async fn stream_output<R: AsyncRead + Unpin>(
execution_id: uuid::Uuid,
output: R,
state: Arc<RwLock<ExecutionState>>,
event_handler: Option<Arc<dyn EventHandler>>,
max_output_size: usize,
oversized_strategy: crate::config::OversizedOutputStrategy,
stream_type: StreamType,
output_log_path: Option<PathBuf>,
) -> Result<()> {
let reader = BufReader::new(output);
let mut lines = reader.lines();
let mut total_size: usize = 0;
let mut buffer = Vec::new();
let mut file_output = if let Some(path) = &output_log_path {
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
Some(
tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await?,
)
} else {
None
};
const BATCH_SIZE: usize = 100; const BATCH_BYTES: usize = 64 * 1024;
while let Some(line) = lines.next_line().await? {
if let Some(file) = &mut file_output {
file.write_all(format!("{}\n", line).as_bytes()).await?;
}
let line_size = line.len() + 1;
if total_size + line_size > max_output_size {
if !buffer.is_empty() {
let batch = buffer.join("\n");
let mut state_lock = state.write().await;
let output_field = stream_type.get_output_mut(&mut state_lock);
if !output_field.is_empty() {
output_field.push('\n');
}
output_field.push_str(&batch);
}
match oversized_strategy {
crate::config::OversizedOutputStrategy::TruncateWithWarning => {
let warning =
format!("\n[OUTPUT TRUNCATED: Exceeded {max_output_size} bytes limit]");
let mut state_lock = state.write().await;
let output_field = stream_type.get_output_mut(&mut state_lock);
output_field.push_str(&warning);
break;
}
crate::config::OversizedOutputStrategy::FailExecution => {
let mut state_lock = state.write().await;
state_lock.status = ExecutionStatus::Failed;
state_lock.error = Some(format!(
"Output size exceeded limit of {max_output_size} bytes"
));
return Err(ExecutionError::OutputSizeExceeded(max_output_size));
}
crate::config::OversizedOutputStrategy::StreamToFile => {
let temp_dir = std::env::temp_dir();
let overflow_filename = format!(
"execution_{}_{}_overflow.log",
execution_id,
match stream_type {
StreamType::Stdout => "stdout",
StreamType::Stderr => "stderr",
}
);
let overflow_path = temp_dir.join(overflow_filename);
let mut overflow_file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&overflow_path)
.await?;
let warning = format!(
"\n[OUTPUT LIMIT REACHED: Remaining output streamed to {}]",
overflow_path.display()
);
{
let mut state_lock = state.write().await;
let output_field = stream_type.get_output_mut(&mut state_lock);
output_field.push_str(&warning);
match stream_type {
StreamType::Stdout => {
state_lock.stdout_overflow_file = Some(overflow_path.clone())
}
StreamType::Stderr => {
state_lock.stderr_overflow_file = Some(overflow_path.clone())
}
}
}
overflow_file
.write_all(format!("{}\n", line).as_bytes())
.await?;
while let Some(line) = lines.next_line().await? {
overflow_file
.write_all(format!("{}\n", line).as_bytes())
.await?;
if let Some(handler) = &event_handler {
handler
.handle_event(stream_type.create_event(execution_id, line))
.await;
}
}
overflow_file.flush().await?;
return Ok(());
}
}
}
if let Some(handler) = &event_handler {
handler
.handle_event(stream_type.create_event(execution_id, line.clone()))
.await;
}
buffer.push(line);
total_size += line_size;
if buffer.len() >= BATCH_SIZE || total_size % BATCH_BYTES < line_size {
let batch = buffer.join("\n");
let mut state_lock = state.write().await;
let output_field = stream_type.get_output_mut(&mut state_lock);
if !output_field.is_empty() {
output_field.push('\n');
}
output_field.push_str(&batch);
drop(state_lock);
buffer.clear();
}
}
if !buffer.is_empty() {
let batch = buffer.join("\n");
let mut state_lock = state.write().await;
let output_field = stream_type.get_output_mut(&mut state_lock);
if !output_field.is_empty() {
output_field.push('\n');
}
output_field.push_str(&batch);
}
Ok(())
}
async fn stream_stdout_static(
execution_id: uuid::Uuid,
stdout: ChildStdout,
state: Arc<RwLock<ExecutionState>>,
event_handler: Option<Arc<dyn EventHandler>>,
max_output_size: usize,
oversized_strategy: crate::config::OversizedOutputStrategy,
output_log_path: Option<PathBuf>,
) -> Result<()> {
Self::stream_output(
execution_id,
stdout,
state,
event_handler,
max_output_size,
oversized_strategy,
StreamType::Stdout,
output_log_path,
)
.await
}
async fn stream_stderr_static(
execution_id: uuid::Uuid,
stderr: ChildStderr,
state: Arc<RwLock<ExecutionState>>,
event_handler: Option<Arc<dyn EventHandler>>,
max_output_size: usize,
oversized_strategy: crate::config::OversizedOutputStrategy,
output_log_path: Option<PathBuf>,
) -> Result<()> {
Self::stream_output(
execution_id,
stderr,
state,
event_handler,
max_output_size,
oversized_strategy,
StreamType::Stderr,
output_log_path,
)
.await
}
async fn wait_with_timeout_and_cancel(
&self,
mut child: Child,
timeout_ms: u64,
cancel_token: CancellationToken,
) -> Result<ExitStatus> {
let timeout_duration = Duration::from_millis(timeout_ms);
tokio::select! {
result = child.wait() => {
result.map_err(ExecutionError::Io)
}
_ = tokio::time::sleep(timeout_duration) => {
let _ = child.kill().await;
Err(ExecutionError::Timeout(timeout_ms))
}
_ = cancel_token.cancelled() => {
let _ = child.kill().await;
Err(ExecutionError::Cancelled)
}
}
}
async fn emit_event(&self, event: ExecutionEvent) {
if let Some(handler) = &self.event_handler {
handler.handle_event(event).await;
}
}
#[cfg(feature = "server-feedback")]
async fn send_feedback(&self, result: &ExecutionResult, command: &str) {
if !self.config.enable_server_feedback {
return;
}
let network_client = match &self.config.network_client {
Some(client) => client,
None => {
tracing::warn!("Server feedback enabled but no network client configured");
return;
}
};
let feedback = ExecutionFeedback {
execution_id: result.id.to_string(),
context_id: result.id.to_string(), status: if result.success {
NetworkExecutionStatus::Success
} else {
NetworkExecutionStatus::Error
},
command: command.to_string(),
exit_code: result.exit_code,
stdout: result.stdout.clone(),
stderr: result.stderr.clone(),
duration_ms: result.duration.as_millis() as i64,
error_details: result.error.clone(),
};
match network_client
.post::<_, serde_json::Value>("/feedback", &feedback)
.await
{
Ok(_) => {
tracing::debug!("Successfully sent execution feedback for {}", result.id);
}
Err(e) => {
tracing::warn!("Failed to send execution feedback for {}: {}", result.id, e);
if let Err(queue_err) = network_client.queue_request("/feedback", &feedback).await {
tracing::error!("Failed to queue feedback for {}: {}", result.id, queue_err);
}
}
}
}
pub fn get_log_path(&self, execution_id: uuid::Uuid) -> PathBuf {
if let Some(log_dir) = &self.config.log_dir {
log_dir.join(format!("{execution_id}.log"))
} else {
PathBuf::from(format!("/tmp/execution-{execution_id}.log"))
}
}
pub async fn write_logs(
&self,
execution_id: uuid::Uuid,
result: &ExecutionResult,
) -> Result<()> {
if self.config.log_dir.is_none() {
return Ok(()); }
let log_path = self.get_log_path(execution_id);
if let Some(parent) = log_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let mut log_content = String::new();
log_content.push_str(&format!("Execution ID: {execution_id}\n"));
log_content.push_str(&format!("Status: {:?}\n", result.status));
log_content.push_str(&format!("Exit Code: {}\n", result.exit_code));
log_content.push_str(&format!("Duration: {:?}\n", result.duration));
log_content.push_str(&format!("Started At: {}\n", result.started_at));
if let Some(completed) = result.completed_at {
log_content.push_str(&format!("Completed At: {completed}\n"));
}
log_content.push_str("\n=== STDOUT ===\n");
log_content.push_str(&result.stdout);
log_content.push_str("\n\n=== STDERR ===\n");
log_content.push_str(&result.stderr);
if let Some(error) = &result.error {
log_content.push_str(&format!("\n\n=== ERROR ===\n{error}\n"));
}
let mut file = tokio::fs::File::create(&log_path).await?;
file.write_all(log_content.as_bytes()).await?;
Ok(())
}
pub async fn read_logs(&self, execution_id: uuid::Uuid) -> Result<String> {
let log_path = self.get_log_path(execution_id);
if !log_path.exists() {
return Err(ExecutionError::NotFound(execution_id));
}
let content = tokio::fs::read_to_string(&log_path).await?;
Ok(content)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::Command;
use std::collections::HashMap;
fn create_test_request(command: Command) -> ExecutionRequest {
ExecutionRequest {
id: uuid::Uuid::new_v4(),
command,
env: HashMap::new(),
working_dir: None,
timeout_ms: Some(5000),
output_log_path: None,
metadata: Default::default(),
}
}
#[tokio::test]
async fn test_executor_simple_command() {
let config = ExecutionConfig::default();
let executor = Executor::new(config);
let request = create_test_request(Command::Shell {
command: "echo 'hello world'".to_string(),
shell: "bash".to_string(),
});
let state = Arc::new(RwLock::new(ExecutionState::new(request.clone())));
let cancel_token = CancellationToken::new();
let result = executor.execute(request, state, cancel_token).await;
assert!(result.is_ok());
let result = result.unwrap();
assert_eq!(result.status, ExecutionStatus::Completed);
assert_eq!(result.exit_code, 0);
assert!(result.stdout.contains("hello world"));
}
#[tokio::test]
async fn test_executor_failed_command() {
let config = ExecutionConfig::default();
let executor = Executor::new(config);
let request = create_test_request(Command::Shell {
command: "exit 1".to_string(),
shell: "bash".to_string(),
});
let state = Arc::new(RwLock::new(ExecutionState::new(request.clone())));
let cancel_token = CancellationToken::new();
let result = executor.execute(request, state, cancel_token).await;
assert!(result.is_ok());
let result = result.unwrap();
assert_eq!(result.status, ExecutionStatus::Failed);
assert_eq!(result.exit_code, 1);
}
#[tokio::test]
async fn test_executor_timeout() {
let config = ExecutionConfig {
default_timeout_ms: 1000,
..Default::default()
};
let executor = Executor::new(config);
let request = create_test_request(Command::Shell {
command: "sleep 10".to_string(),
shell: "bash".to_string(),
});
let state = Arc::new(RwLock::new(ExecutionState::new(request.clone())));
let cancel_token = CancellationToken::new();
let result = executor.execute(request, state.clone(), cancel_token).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), ExecutionError::Timeout(_)));
let state_lock = state.read().await;
assert_eq!(state_lock.status, ExecutionStatus::Timeout);
}
#[tokio::test]
async fn test_executor_cancellation() {
let config = ExecutionConfig::default();
let executor = Executor::new(config);
let request = create_test_request(Command::Shell {
command: "sleep 10".to_string(),
shell: "bash".to_string(),
});
let state = Arc::new(RwLock::new(ExecutionState::new(request.clone())));
let cancel_token = CancellationToken::new();
let cancel_token_clone = cancel_token.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
cancel_token_clone.cancel();
});
let result = executor.execute(request, state.clone(), cancel_token).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), ExecutionError::Cancelled));
let state_lock = state.read().await;
assert_eq!(state_lock.status, ExecutionStatus::Cancelled);
}
#[tokio::test]
async fn test_executor_stderr_capture() {
let config = ExecutionConfig::default();
let executor = Executor::new(config);
let request = create_test_request(Command::Shell {
command: "echo 'error message' >&2".to_string(),
shell: "bash".to_string(),
});
let state = Arc::new(RwLock::new(ExecutionState::new(request.clone())));
let cancel_token = CancellationToken::new();
let result = executor.execute(request, state, cancel_token).await;
assert!(result.is_ok());
let result = result.unwrap();
assert!(result.stderr.contains("error message"));
}
#[test]
fn test_get_log_path() {
let mut config = ExecutionConfig::default();
config.log_dir = Some(PathBuf::from("/tmp/logs"));
let executor = Executor::new(config);
let execution_id = uuid::Uuid::new_v4();
let log_path = executor.get_log_path(execution_id);
assert_eq!(
log_path,
PathBuf::from(format!("/tmp/logs/{}.log", execution_id))
);
}
#[tokio::test]
async fn test_output_size_truncate() {
let config = ExecutionConfig {
max_output_size_bytes: 100, oversized_output_strategy: crate::config::OversizedOutputStrategy::TruncateWithWarning,
..Default::default()
};
let executor = Executor::new(config);
let request = create_test_request(Command::Shell {
command: "for i in {1..50}; do echo 'This is a long line of output that will exceed the limit'; done".to_string(),
shell: "bash".to_string(),
});
let state = Arc::new(RwLock::new(ExecutionState::new(request.clone())));
let cancel_token = CancellationToken::new();
let result = executor.execute(request, state, cancel_token).await;
assert!(result.is_ok());
let result = result.unwrap();
assert!(result.stdout.contains("[OUTPUT TRUNCATED"));
assert!(result.stdout.len() < 1000); }
#[tokio::test]
async fn test_output_size_fail_execution() {
let config = ExecutionConfig {
max_output_size_bytes: 100, oversized_output_strategy: crate::config::OversizedOutputStrategy::FailExecution,
..Default::default()
};
let executor = Executor::new(config);
let request = create_test_request(Command::Shell {
command: "for i in {1..50}; do echo 'This is a long line of output that will exceed the limit'; done".to_string(),
shell: "bash".to_string(),
});
let state = Arc::new(RwLock::new(ExecutionState::new(request.clone())));
let cancel_token = CancellationToken::new();
let result = executor.execute(request, state.clone(), cancel_token).await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
ExecutionError::OutputSizeExceeded(_)
));
let state_lock = state.read().await;
assert_eq!(state_lock.status, ExecutionStatus::Failed);
assert!(state_lock.error.is_some());
assert!(state_lock
.error
.as_ref()
.unwrap()
.contains("Output size exceeded"));
}
}