use async_trait::async_trait;
use futures::{Stream, StreamExt};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, info, warn};
use uuid::Uuid;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::spawn_local;
use web_sys::{Worker, MessageEvent};
use crate::command::{
CommandContext, CommandEvent, CommandExecutor, CommandHandle, CommandRequest, CommandResult,
};
use crate::error::UbiquityError;
pub struct WasmCommandExecutor {
context: Arc<CommandContext>,
event_buffer_size: usize,
worker_pool: Arc<RwLock<WorkerPool>>,
command_registry: Arc<RwLock<CommandRegistry>>,
}
struct WorkerPool {
workers: Vec<WorkerHandle>,
max_workers: usize,
}
struct WorkerHandle {
id: Uuid,
worker: Worker,
busy: bool,
command_id: Option<Uuid>,
}
struct CommandRegistry {
commands: HashMap<String, Box<dyn SimulatedCommand>>,
}
trait SimulatedCommand: Send + Sync {
fn execute(
&self,
args: Vec<String>,
env: HashMap<String, String>,
stdin: Option<String>,
) -> CommandSimulation;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CommandSimulation {
stdout: Vec<String>,
stderr: Vec<String>,
exit_code: i32,
duration_ms: u64,
progress_updates: Vec<(f32, String)>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
enum WorkerMessage {
Execute {
request: CommandRequest,
},
Cancel {
command_id: Uuid,
},
Progress {
command_id: Uuid,
percentage: f32,
message: String,
},
Stdout {
command_id: Uuid,
line: String,
},
Stderr {
command_id: Uuid,
line: String,
},
Completed {
command_id: Uuid,
exit_code: i32,
duration_ms: u64,
},
Failed {
command_id: Uuid,
error: String,
duration_ms: u64,
},
}
impl WasmCommandExecutor {
pub fn new() -> Result<Self, UbiquityError> {
Ok(Self {
context: Arc::new(CommandContext::new()),
event_buffer_size: 1024,
worker_pool: Arc::new(RwLock::new(WorkerPool::new(4)?)),
command_registry: Arc::new(RwLock::new(CommandRegistry::new())),
})
}
pub fn with_max_workers(mut self, max: usize) -> Result<Self, UbiquityError> {
self.worker_pool = Arc::new(RwLock::new(WorkerPool::new(max)?));
Ok(self)
}
pub async fn register_command<C: SimulatedCommand + 'static>(
&self,
name: impl Into<String>,
command: C,
) {
let mut registry = self.command_registry.write().await;
registry.commands.insert(name.into(), Box::new(command));
}
async fn execute_in_worker(
request: CommandRequest,
event_tx: mpsc::Sender<CommandEvent>,
worker_pool: Arc<RwLock<WorkerPool>>,
command_registry: Arc<RwLock<CommandRegistry>>,
) -> Result<(), UbiquityError> {
let start_time = Instant::now();
let command_id = request.id;
event_tx
.send(CommandEvent::Started {
command_id,
command: request.command.clone(),
args: request.args.clone(),
timestamp: chrono::Utc::now(),
})
.await
.map_err(|_| UbiquityError::Internal("Failed to send start event".to_string()))?;
let worker_handle = {
let mut pool = worker_pool.write().await;
pool.get_available_worker().await?
};
let simulation = {
let registry = command_registry.read().await;
if let Some(cmd) = registry.commands.get(&request.command) {
cmd.execute(
request.args.clone(),
request.env.clone(),
request.stdin.clone(),
)
} else {
CommandSimulation {
stdout: vec![format!("Command '{}' not found in WASM environment", request.command)],
stderr: vec![],
exit_code: 127,
duration_ms: 10,
progress_updates: vec![],
}
}
};
for (percentage, message) in &simulation.progress_updates {
event_tx
.send(CommandEvent::Progress {
command_id,
percentage: *percentage,
message: message.clone(),
timestamp: chrono::Utc::now(),
})
.await
.ok();
tokio::time::sleep(Duration::from_millis(50)).await;
}
for line in &simulation.stdout {
event_tx
.send(CommandEvent::Stdout {
command_id,
data: line.clone(),
timestamp: chrono::Utc::now(),
})
.await
.ok();
}
for line in &simulation.stderr {
event_tx
.send(CommandEvent::Stderr {
command_id,
data: line.clone(),
timestamp: chrono::Utc::now(),
})
.await
.ok();
}
let remaining_time = simulation.duration_ms.saturating_sub(start_time.elapsed().as_millis() as u64);
if remaining_time > 0 {
tokio::time::sleep(Duration::from_millis(remaining_time)).await;
}
let duration_ms = start_time.elapsed().as_millis() as u64;
if simulation.exit_code == 0 {
event_tx
.send(CommandEvent::Completed {
command_id,
exit_code: simulation.exit_code,
duration_ms,
timestamp: chrono::Utc::now(),
})
.await
.ok();
} else {
event_tx
.send(CommandEvent::Failed {
command_id,
error: format!("Command exited with code {}", simulation.exit_code),
duration_ms,
timestamp: chrono::Utc::now(),
})
.await
.ok();
}
{
let mut pool = worker_pool.write().await;
pool.release_worker(worker_handle.id).await;
}
Ok(())
}
}
impl WorkerPool {
fn new(max_workers: usize) -> Result<Self, UbiquityError> {
let mut workers = Vec::new();
let worker_script = include_str!("../assets/command_worker.js");
let blob = web_sys::Blob::new_with_str_sequence_and_options(
&js_sys::Array::from(&JsValue::from_str(worker_script)),
web_sys::BlobPropertyBag::new().type_("application/javascript"),
)
.map_err(|_| UbiquityError::Internal("Failed to create worker blob".to_string()))?;
let blob_url = web_sys::Url::create_object_url_with_blob(&blob)
.map_err(|_| UbiquityError::Internal("Failed to create blob URL".to_string()))?;
for _ in 0..max_workers {
let worker = Worker::new(&blob_url)
.map_err(|_| UbiquityError::Internal("Failed to create worker".to_string()))?;
let handle = WorkerHandle {
id: Uuid::new_v4(),
worker,
busy: false,
command_id: None,
};
workers.push(handle);
}
web_sys::Url::revoke_object_url(&blob_url)
.map_err(|_| UbiquityError::Internal("Failed to revoke blob URL".to_string()))?;
Ok(Self {
workers,
max_workers,
})
}
async fn get_available_worker(&mut self) -> Result<WorkerHandle, UbiquityError> {
for worker in &mut self.workers {
if !worker.busy {
worker.busy = true;
return Ok(WorkerHandle {
id: worker.id,
worker: worker.worker.clone(),
busy: true,
command_id: worker.command_id,
});
}
}
Err(UbiquityError::ResourceExhausted(
"No available workers in pool".to_string(),
))
}
async fn release_worker(&mut self, worker_id: Uuid) {
for worker in &mut self.workers {
if worker.id == worker_id {
worker.busy = false;
worker.command_id = None;
break;
}
}
}
}
impl CommandRegistry {
fn new() -> Self {
let mut registry = Self {
commands: HashMap::new(),
};
registry.register_builtin_commands();
registry
}
fn register_builtin_commands(&mut self) {
self.commands.insert(
"echo".to_string(),
Box::new(EchoCommand),
);
self.commands.insert(
"ls".to_string(),
Box::new(LsCommand),
);
self.commands.insert(
"cat".to_string(),
Box::new(CatCommand),
);
self.commands.insert(
"sleep".to_string(),
Box::new(SleepCommand),
);
}
}
struct EchoCommand;
impl SimulatedCommand for EchoCommand {
fn execute(
&self,
args: Vec<String>,
_env: HashMap<String, String>,
_stdin: Option<String>,
) -> CommandSimulation {
CommandSimulation {
stdout: vec![args.join(" ")],
stderr: vec![],
exit_code: 0,
duration_ms: 10,
progress_updates: vec![],
}
}
}
struct LsCommand;
impl SimulatedCommand for LsCommand {
fn execute(
&self,
_args: Vec<String>,
_env: HashMap<String, String>,
_stdin: Option<String>,
) -> CommandSimulation {
CommandSimulation {
stdout: vec![
"file1.txt".to_string(),
"file2.js".to_string(),
"directory/".to_string(),
"README.md".to_string(),
],
stderr: vec![],
exit_code: 0,
duration_ms: 50,
progress_updates: vec![(50.0, "Listing files...".to_string())],
}
}
}
struct CatCommand;
impl SimulatedCommand for CatCommand {
fn execute(
&self,
args: Vec<String>,
_env: HashMap<String, String>,
stdin: Option<String>,
) -> CommandSimulation {
if let Some(input) = stdin {
CommandSimulation {
stdout: input.lines().map(|s| s.to_string()).collect(),
stderr: vec![],
exit_code: 0,
duration_ms: 20,
progress_updates: vec![],
}
} else if !args.is_empty() {
CommandSimulation {
stdout: vec![format!("Contents of {}", args[0])],
stderr: vec![],
exit_code: 0,
duration_ms: 30,
progress_updates: vec![],
}
} else {
CommandSimulation {
stdout: vec![],
stderr: vec!["cat: missing file operand".to_string()],
exit_code: 1,
duration_ms: 10,
progress_updates: vec![],
}
}
}
}
struct SleepCommand;
impl SimulatedCommand for SleepCommand {
fn execute(
&self,
args: Vec<String>,
_env: HashMap<String, String>,
_stdin: Option<String>,
) -> CommandSimulation {
let duration = args
.get(0)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(1);
let progress_updates: Vec<(f32, String)> = (0..10)
.map(|i| {
let percentage = (i + 1) as f32 * 10.0;
(percentage, format!("Sleeping... {}%", percentage))
})
.collect();
CommandSimulation {
stdout: vec![],
stderr: vec![],
exit_code: 0,
duration_ms: duration * 1000,
progress_updates,
}
}
}
#[async_trait]
impl CommandExecutor for WasmCommandExecutor {
async fn execute(
&self,
request: CommandRequest,
) -> Result<Pin<Box<dyn Stream<Item = CommandEvent> + Send>>, UbiquityError> {
let (event_tx, event_rx) = mpsc::channel(self.event_buffer_size);
let (cancel_tx, cancel_rx) = mpsc::channel(1);
let (status_tx, status_rx) = mpsc::channel(1);
let command_id = request.id;
let handle = CommandHandle::new(command_id, cancel_tx, status_tx);
self.context.register(command_id, handle).await;
let worker_pool = self.worker_pool.clone();
let command_registry = self.command_registry.clone();
let context = self.context.clone();
spawn_local(async move {
let result = Self::execute_in_worker(
request,
event_tx,
worker_pool,
command_registry,
)
.await;
context.unregister(&command_id).await;
if let Err(e) = result {
warn!("WASM command execution error: {}", e);
}
});
Ok(Box::pin(event_rx))
}
async fn cancel(&self, command_id: Uuid) -> Result<(), UbiquityError> {
self.context.cancel(&command_id).await
}
async fn status(&self, command_id: Uuid) -> Result<Option<CommandResult>, UbiquityError> {
if let Some(handle) = self.context.get(&command_id).await {
Ok(Some(handle.status().await?))
} else {
Ok(None)
}
}
}
#[cfg(target_arch = "wasm32")]
#[cfg(test)]
mod tests {
use super::*;
use wasm_bindgen_test::*;
wasm_bindgen_test_configure!(run_in_browser);
#[wasm_bindgen_test]
async fn test_wasm_echo_command() {
let executor = WasmCommandExecutor::new().unwrap();
let request = CommandRequest::new("echo").with_args(vec!["hello wasm".to_string()]);
let mut stream = executor.execute(request).await.unwrap();
let mut stdout_found = false;
while let Some(event) = stream.next().await {
if let CommandEvent::Stdout { data, .. } = event {
assert_eq!(data, "hello wasm");
stdout_found = true;
}
}
assert!(stdout_found);
}
}