ubiquity-core 0.1.1

Core types and traits for Ubiquity consciousness-aware mesh
Documentation
//! Unified command executor that provides parity across all platforms

use async_trait::async_trait;
use futures::Stream;
use std::pin::Pin;
use std::sync::Arc;
use tracing::{debug, info};
use uuid::Uuid;

use crate::command::{CommandEvent, CommandExecutor, CommandRequest, CommandResult};
use crate::error::UbiquityError;

#[cfg(not(target_arch = "wasm32"))]
use crate::command_local::LocalCommandExecutor;

#[cfg(target_arch = "wasm32")]
use crate::command_wasm::WasmCommandExecutor;

use crate::command_cloud::CloudCommandExecutor;

/// Execution mode for the unified executor
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExecutionMode {
    /// Use local process execution (not available in WASM)
    Local,
    /// Use WebAssembly Workers (browser only)
    Wasm,
    /// Use Cloudflare Workers Durable Objects
    Cloud,
    /// Automatically detect the best mode
    Auto,
}

/// Configuration for unified command executor
#[derive(Debug, Clone)]
pub struct UnifiedExecutorConfig {
    pub mode: ExecutionMode,
    pub cloud_worker_url: Option<String>,
    pub cloud_api_token: Option<String>,
    pub cloud_namespace_id: Option<String>,
    pub event_buffer_size: usize,
    pub max_wasm_workers: usize,
}

impl Default for UnifiedExecutorConfig {
    fn default() -> Self {
        Self {
            mode: ExecutionMode::Auto,
            cloud_worker_url: None,
            cloud_api_token: None,
            cloud_namespace_id: None,
            event_buffer_size: 1024,
            max_wasm_workers: 4,
        }
    }
}

/// Unified command executor that provides parity across platforms
pub struct UnifiedCommandExecutor {
    config: UnifiedExecutorConfig,
    executor: Arc<dyn CommandExecutor>,
}

impl UnifiedCommandExecutor {
    /// Create a new unified executor with the given configuration
    pub fn new(config: UnifiedExecutorConfig) -> Result<Self, UbiquityError> {
        let executor = Self::create_executor(&config)?;
        
        Ok(Self {
            config,
            executor: Arc::from(executor),
        })
    }
    
    /// Create a new unified executor with automatic mode detection
    pub fn auto() -> Result<Self, UbiquityError> {
        Self::new(UnifiedExecutorConfig::default())
    }
    
    /// Get the current execution mode
    pub fn mode(&self) -> ExecutionMode {
        if self.config.mode == ExecutionMode::Auto {
            Self::detect_mode()
        } else {
            self.config.mode
        }
    }
    
    /// Detect the best execution mode for the current platform
    fn detect_mode() -> ExecutionMode {
        #[cfg(target_arch = "wasm32")]
        {
            ExecutionMode::Wasm
        }
        
        #[cfg(not(target_arch = "wasm32"))]
        {
            ExecutionMode::Local
        }
    }
    
    /// Create the appropriate executor based on configuration
    fn create_executor(config: &UnifiedExecutorConfig) -> Result<Box<dyn CommandExecutor>, UbiquityError> {
        let mode = if config.mode == ExecutionMode::Auto {
            Self::detect_mode()
        } else {
            config.mode
        };
        
        info!("Creating command executor in {:?} mode", mode);
        
        match mode {
            #[cfg(not(target_arch = "wasm32"))]
            ExecutionMode::Local => {
                let executor = LocalCommandExecutor::new()
                    .with_event_buffer_size(config.event_buffer_size);
                Ok(Box::new(executor))
            }
            
            #[cfg(target_arch = "wasm32")]
            ExecutionMode::Wasm => {
                let executor = WasmCommandExecutor::new()?
                    .with_max_workers(config.max_wasm_workers)?;
                Ok(Box::new(executor))
            }
            
            ExecutionMode::Cloud => {
                let worker_url = config.cloud_worker_url.as_ref()
                    .ok_or_else(|| UbiquityError::Configuration(
                        "Cloud worker URL not configured".to_string()
                    ))?;
                
                let api_token = config.cloud_api_token.as_ref()
                    .ok_or_else(|| UbiquityError::Configuration(
                        "Cloud API token not configured".to_string()
                    ))?;
                
                let namespace_id = config.cloud_namespace_id.as_ref()
                    .ok_or_else(|| UbiquityError::Configuration(
                        "Cloud namespace ID not configured".to_string()
                    ))?;
                
                let executor = CloudCommandExecutor::new(
                    worker_url.clone(),
                    api_token.clone(),
                    namespace_id.clone(),
                );
                
                Ok(Box::new(executor))
            }
            
            _ => Err(UbiquityError::Configuration(format!(
                "Execution mode {:?} not available on this platform",
                mode
            ))),
        }
    }
    
    /// Switch to a different execution mode
    pub fn switch_mode(&mut self, mode: ExecutionMode) -> Result<(), UbiquityError> {
        self.config.mode = mode;
        self.executor = Arc::from(Self::create_executor(&self.config)?);
        Ok(())
    }
    
    /// Create a command builder
    pub fn command(&self, command: impl Into<String>) -> CommandBuilder {
        CommandBuilder::new(self, command)
    }
}

#[async_trait]
impl CommandExecutor for UnifiedCommandExecutor {
    async fn execute(
        &self,
        request: CommandRequest,
    ) -> Result<Pin<Box<dyn Stream<Item = CommandEvent> + Send>>, UbiquityError> {
        debug!("Executing command {} in {:?} mode", request.command, self.mode());
        self.executor.execute(request).await
    }

    async fn cancel(&self, command_id: Uuid) -> Result<(), UbiquityError> {
        debug!("Cancelling command {} in {:?} mode", command_id, self.mode());
        self.executor.cancel(command_id).await
    }

    async fn status(&self, command_id: Uuid) -> Result<Option<CommandResult>, UbiquityError> {
        self.executor.status(command_id).await
    }
}

/// Builder for creating and executing commands
pub struct CommandBuilder<'a> {
    executor: &'a UnifiedCommandExecutor,
    request: CommandRequest,
}

impl<'a> CommandBuilder<'a> {
    fn new(executor: &'a UnifiedCommandExecutor, command: impl Into<String>) -> Self {
        Self {
            executor,
            request: CommandRequest::new(command),
        }
    }
    
    pub fn arg(mut self, arg: impl Into<String>) -> Self {
        self.request.args.push(arg.into());
        self
    }
    
    pub fn args<I, S>(mut self, args: I) -> Self
    where
        I: IntoIterator<Item = S>,
        S: Into<String>,
    {
        self.request.args.extend(args.into_iter().map(Into::into));
        self
    }
    
    pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
        self.request.env.insert(key.into(), value.into());
        self
    }
    
    pub fn envs<I, K, V>(mut self, vars: I) -> Self
    where
        I: IntoIterator<Item = (K, V)>,
        K: Into<String>,
        V: Into<String>,
    {
        for (k, v) in vars {
            self.request.env.insert(k.into(), v.into());
        }
        self
    }
    
    pub fn current_dir(mut self, dir: impl Into<String>) -> Self {
        self.request.working_dir = Some(dir.into());
        self
    }
    
    pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
        self.request.timeout = Some(timeout);
        self
    }
    
    pub fn stdin(mut self, stdin: impl Into<String>) -> Self {
        self.request.stdin = Some(stdin.into());
        self
    }
    
    pub async fn execute(self) -> Result<Pin<Box<dyn Stream<Item = CommandEvent> + Send>>, UbiquityError> {
        self.executor.execute(self.request).await
    }
    
    pub async fn output(self) -> Result<CommandOutput, UbiquityError> {
        use futures::StreamExt;
        
        let request_id = self.request.id;
        let mut stream = self.execute().await?;
        let mut output = CommandOutput::new(request_id);
        
        while let Some(event) = stream.next().await {
            output.process_event(event);
        }
        
        Ok(output)
    }
}

/// Collected output from a command execution
#[derive(Debug, Clone)]
pub struct CommandOutput {
    pub id: Uuid,
    pub stdout: String,
    pub stderr: String,
    pub exit_code: Option<i32>,
    pub success: bool,
    pub duration_ms: Option<u64>,
    pub cancelled: bool,
}

impl CommandOutput {
    fn new(id: Uuid) -> Self {
        Self {
            id,
            stdout: String::new(),
            stderr: String::new(),
            exit_code: None,
            success: false,
            duration_ms: None,
            cancelled: false,
        }
    }
    
    fn process_event(&mut self, event: CommandEvent) {
        match event {
            CommandEvent::Stdout { data, .. } => {
                if !self.stdout.is_empty() {
                    self.stdout.push('\n');
                }
                self.stdout.push_str(&data);
            }
            CommandEvent::Stderr { data, .. } => {
                if !self.stderr.is_empty() {
                    self.stderr.push('\n');
                }
                self.stderr.push_str(&data);
            }
            CommandEvent::Completed { exit_code, duration_ms, .. } => {
                self.exit_code = Some(exit_code);
                self.success = exit_code == 0;
                self.duration_ms = Some(duration_ms);
            }
            CommandEvent::Failed { duration_ms, .. } => {
                self.success = false;
                self.duration_ms = Some(duration_ms);
            }
            CommandEvent::Cancelled { duration_ms, .. } => {
                self.cancelled = true;
                self.duration_ms = Some(duration_ms);
            }
            _ => {}
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use futures::StreamExt;

    #[tokio::test]
    async fn test_unified_executor_auto_mode() {
        let executor = UnifiedCommandExecutor::auto().unwrap();
        
        #[cfg(not(target_arch = "wasm32"))]
        assert_eq!(executor.mode(), ExecutionMode::Local);
        
        #[cfg(target_arch = "wasm32")]
        assert_eq!(executor.mode(), ExecutionMode::Wasm);
    }

    #[tokio::test]
    async fn test_command_builder() {
        let executor = UnifiedCommandExecutor::auto().unwrap();
        
        let output = executor
            .command("echo")
            .arg("hello")
            .arg("world")
            .env("TEST", "value")
            .timeout(std::time::Duration::from_secs(5))
            .output()
            .await
            .unwrap();
        
        assert!(output.success);
        assert_eq!(output.stdout.trim(), "hello world");
    }

    #[tokio::test]
    async fn test_command_streaming() {
        let executor = UnifiedCommandExecutor::auto().unwrap();
        
        let mut stream = executor
            .command("echo")
            .args(["line1", "line2"])
            .execute()
            .await
            .unwrap();
        
        let mut events = Vec::new();
        while let Some(event) = stream.next().await {
            events.push(event);
        }
        
        assert!(!events.is_empty());
        assert!(events.iter().any(|e| matches!(e, CommandEvent::Started { .. })));
    }
}