use async_trait::async_trait;
use futures::Stream;
use std::pin::Pin;
use std::sync::Arc;
use tracing::{debug, info};
use uuid::Uuid;
use crate::command::{CommandEvent, CommandExecutor, CommandRequest, CommandResult};
use crate::error::UbiquityError;
#[cfg(not(target_arch = "wasm32"))]
use crate::command_local::LocalCommandExecutor;
#[cfg(target_arch = "wasm32")]
use crate::command_wasm::WasmCommandExecutor;
use crate::command_cloud::CloudCommandExecutor;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExecutionMode {
Local,
Wasm,
Cloud,
Auto,
}
#[derive(Debug, Clone)]
pub struct UnifiedExecutorConfig {
pub mode: ExecutionMode,
pub cloud_worker_url: Option<String>,
pub cloud_api_token: Option<String>,
pub cloud_namespace_id: Option<String>,
pub event_buffer_size: usize,
pub max_wasm_workers: usize,
}
impl Default for UnifiedExecutorConfig {
fn default() -> Self {
Self {
mode: ExecutionMode::Auto,
cloud_worker_url: None,
cloud_api_token: None,
cloud_namespace_id: None,
event_buffer_size: 1024,
max_wasm_workers: 4,
}
}
}
pub struct UnifiedCommandExecutor {
config: UnifiedExecutorConfig,
executor: Arc<dyn CommandExecutor>,
}
impl UnifiedCommandExecutor {
pub fn new(config: UnifiedExecutorConfig) -> Result<Self, UbiquityError> {
let executor = Self::create_executor(&config)?;
Ok(Self {
config,
executor: Arc::from(executor),
})
}
pub fn auto() -> Result<Self, UbiquityError> {
Self::new(UnifiedExecutorConfig::default())
}
pub fn mode(&self) -> ExecutionMode {
if self.config.mode == ExecutionMode::Auto {
Self::detect_mode()
} else {
self.config.mode
}
}
fn detect_mode() -> ExecutionMode {
#[cfg(target_arch = "wasm32")]
{
ExecutionMode::Wasm
}
#[cfg(not(target_arch = "wasm32"))]
{
ExecutionMode::Local
}
}
fn create_executor(config: &UnifiedExecutorConfig) -> Result<Box<dyn CommandExecutor>, UbiquityError> {
let mode = if config.mode == ExecutionMode::Auto {
Self::detect_mode()
} else {
config.mode
};
info!("Creating command executor in {:?} mode", mode);
match mode {
#[cfg(not(target_arch = "wasm32"))]
ExecutionMode::Local => {
let executor = LocalCommandExecutor::new()
.with_event_buffer_size(config.event_buffer_size);
Ok(Box::new(executor))
}
#[cfg(target_arch = "wasm32")]
ExecutionMode::Wasm => {
let executor = WasmCommandExecutor::new()?
.with_max_workers(config.max_wasm_workers)?;
Ok(Box::new(executor))
}
ExecutionMode::Cloud => {
let worker_url = config.cloud_worker_url.as_ref()
.ok_or_else(|| UbiquityError::Configuration(
"Cloud worker URL not configured".to_string()
))?;
let api_token = config.cloud_api_token.as_ref()
.ok_or_else(|| UbiquityError::Configuration(
"Cloud API token not configured".to_string()
))?;
let namespace_id = config.cloud_namespace_id.as_ref()
.ok_or_else(|| UbiquityError::Configuration(
"Cloud namespace ID not configured".to_string()
))?;
let executor = CloudCommandExecutor::new(
worker_url.clone(),
api_token.clone(),
namespace_id.clone(),
);
Ok(Box::new(executor))
}
_ => Err(UbiquityError::Configuration(format!(
"Execution mode {:?} not available on this platform",
mode
))),
}
}
pub fn switch_mode(&mut self, mode: ExecutionMode) -> Result<(), UbiquityError> {
self.config.mode = mode;
self.executor = Arc::from(Self::create_executor(&self.config)?);
Ok(())
}
pub fn command(&self, command: impl Into<String>) -> CommandBuilder {
CommandBuilder::new(self, command)
}
}
#[async_trait]
impl CommandExecutor for UnifiedCommandExecutor {
async fn execute(
&self,
request: CommandRequest,
) -> Result<Pin<Box<dyn Stream<Item = CommandEvent> + Send>>, UbiquityError> {
debug!("Executing command {} in {:?} mode", request.command, self.mode());
self.executor.execute(request).await
}
async fn cancel(&self, command_id: Uuid) -> Result<(), UbiquityError> {
debug!("Cancelling command {} in {:?} mode", command_id, self.mode());
self.executor.cancel(command_id).await
}
async fn status(&self, command_id: Uuid) -> Result<Option<CommandResult>, UbiquityError> {
self.executor.status(command_id).await
}
}
pub struct CommandBuilder<'a> {
executor: &'a UnifiedCommandExecutor,
request: CommandRequest,
}
impl<'a> CommandBuilder<'a> {
fn new(executor: &'a UnifiedCommandExecutor, command: impl Into<String>) -> Self {
Self {
executor,
request: CommandRequest::new(command),
}
}
pub fn arg(mut self, arg: impl Into<String>) -> Self {
self.request.args.push(arg.into());
self
}
pub fn args<I, S>(mut self, args: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.request.args.extend(args.into_iter().map(Into::into));
self
}
pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.request.env.insert(key.into(), value.into());
self
}
pub fn envs<I, K, V>(mut self, vars: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: Into<String>,
V: Into<String>,
{
for (k, v) in vars {
self.request.env.insert(k.into(), v.into());
}
self
}
pub fn current_dir(mut self, dir: impl Into<String>) -> Self {
self.request.working_dir = Some(dir.into());
self
}
pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
self.request.timeout = Some(timeout);
self
}
pub fn stdin(mut self, stdin: impl Into<String>) -> Self {
self.request.stdin = Some(stdin.into());
self
}
pub async fn execute(self) -> Result<Pin<Box<dyn Stream<Item = CommandEvent> + Send>>, UbiquityError> {
self.executor.execute(self.request).await
}
pub async fn output(self) -> Result<CommandOutput, UbiquityError> {
use futures::StreamExt;
let request_id = self.request.id;
let mut stream = self.execute().await?;
let mut output = CommandOutput::new(request_id);
while let Some(event) = stream.next().await {
output.process_event(event);
}
Ok(output)
}
}
#[derive(Debug, Clone)]
pub struct CommandOutput {
pub id: Uuid,
pub stdout: String,
pub stderr: String,
pub exit_code: Option<i32>,
pub success: bool,
pub duration_ms: Option<u64>,
pub cancelled: bool,
}
impl CommandOutput {
fn new(id: Uuid) -> Self {
Self {
id,
stdout: String::new(),
stderr: String::new(),
exit_code: None,
success: false,
duration_ms: None,
cancelled: false,
}
}
fn process_event(&mut self, event: CommandEvent) {
match event {
CommandEvent::Stdout { data, .. } => {
if !self.stdout.is_empty() {
self.stdout.push('\n');
}
self.stdout.push_str(&data);
}
CommandEvent::Stderr { data, .. } => {
if !self.stderr.is_empty() {
self.stderr.push('\n');
}
self.stderr.push_str(&data);
}
CommandEvent::Completed { exit_code, duration_ms, .. } => {
self.exit_code = Some(exit_code);
self.success = exit_code == 0;
self.duration_ms = Some(duration_ms);
}
CommandEvent::Failed { duration_ms, .. } => {
self.success = false;
self.duration_ms = Some(duration_ms);
}
CommandEvent::Cancelled { duration_ms, .. } => {
self.cancelled = true;
self.duration_ms = Some(duration_ms);
}
_ => {}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
#[tokio::test]
async fn test_unified_executor_auto_mode() {
let executor = UnifiedCommandExecutor::auto().unwrap();
#[cfg(not(target_arch = "wasm32"))]
assert_eq!(executor.mode(), ExecutionMode::Local);
#[cfg(target_arch = "wasm32")]
assert_eq!(executor.mode(), ExecutionMode::Wasm);
}
#[tokio::test]
async fn test_command_builder() {
let executor = UnifiedCommandExecutor::auto().unwrap();
let output = executor
.command("echo")
.arg("hello")
.arg("world")
.env("TEST", "value")
.timeout(std::time::Duration::from_secs(5))
.output()
.await
.unwrap();
assert!(output.success);
assert_eq!(output.stdout.trim(), "hello world");
}
#[tokio::test]
async fn test_command_streaming() {
let executor = UnifiedCommandExecutor::auto().unwrap();
let mut stream = executor
.command("echo")
.args(["line1", "line2"])
.execute()
.await
.unwrap();
let mut events = Vec::new();
while let Some(event) = stream.next().await {
events.push(event);
}
assert!(!events.is_empty());
assert!(events.iter().any(|e| matches!(e, CommandEvent::Started { .. })));
}
}