wasm-sandbox 0.4.1

A secure WebAssembly sandbox with dead-simple ease of use, progressive complexity APIs, and comprehensive safety controls
Documentation
//! Streaming execution APIs for large datasets and batch operations

use std::pin::Pin;
use futures::{Stream, StreamExt};
use serde_json::Value;
use async_trait::async_trait;

use crate::error::{Result, SandboxError, InstanceId};

/// Function call for batch execution
#[derive(Debug, Clone)]
pub struct FunctionCall {
    pub function_name: String,
    pub parameters: Vec<Value>,
    pub timeout: Option<std::time::Duration>,
}

/// Result of a function call
#[derive(Debug)]
pub struct FunctionResult {
    pub call: FunctionCall,
    pub result: Result<Value>,
    pub execution_time: std::time::Duration,
    pub resource_usage: Option<crate::monitoring::DetailedResourceUsage>,
}

/// Streaming execution trait for processing large datasets
#[async_trait]
pub trait StreamingExecution {
    /// Execute a stream of function calls, yielding results as they complete
    async fn execute_stream<S>(&self, input: S) -> Pin<Box<dyn Stream<Item = FunctionResult> + Send>>
    where 
        S: Stream<Item = FunctionCall> + Send + 'static;
    
    /// Execute a batch of function calls
    async fn execute_batch<I>(&self, calls: I) -> Vec<FunctionResult>
    where 
        I: IntoIterator<Item = FunctionCall> + Send,
        I::IntoIter: Send;

    /// Execute a function with streaming input data
    async fn execute_with_streaming_input<S>(
        &self, 
        function_name: &str,
        input_stream: S
    ) -> Pin<Box<dyn Stream<Item = Result<Value>> + Send>>
    where 
        S: Stream<Item = Value> + Send + 'static;

    /// Execute a function that produces streaming output
    async fn execute_with_streaming_output(
        &self,
        function_name: &str,
        parameters: &[Value]
    ) -> Pin<Box<dyn Stream<Item = Result<Value>> + Send>>;
}

/// Configuration for streaming execution
#[derive(Debug, Clone)]
pub struct StreamingConfig {
    /// Maximum number of concurrent executions
    pub max_concurrency: usize,
    
    /// Buffer size for streaming operations
    pub buffer_size: usize,
    
    /// Timeout for individual operations
    pub operation_timeout: std::time::Duration,
    
    /// Whether to stop on first error or continue
    pub fail_fast: bool,
    
    /// Enable resource monitoring for each operation
    pub monitor_resources: bool,
}

impl Default for StreamingConfig {
    fn default() -> Self {
        Self {
            max_concurrency: 10,
            buffer_size: 1000,
            operation_timeout: std::time::Duration::from_secs(30),
            fail_fast: false,
            monitor_resources: true,
        }
    }
}

/// Streaming executor implementation
pub struct StreamingExecutor {
    instance_id: InstanceId,
    config: StreamingConfig,
}

impl StreamingExecutor {
    /// Create a new streaming executor
    pub fn new(instance_id: InstanceId, config: StreamingConfig) -> Self {
        Self {
            instance_id,
            config,
        }
    }

    /// Create with default configuration
    pub fn with_defaults(instance_id: InstanceId) -> Self {
        Self::new(instance_id, StreamingConfig::default())
    }
}

#[async_trait]
impl StreamingExecution for StreamingExecutor {
    async fn execute_stream<S>(&self, input: S) -> Pin<Box<dyn Stream<Item = FunctionResult> + Send>>
    where 
        S: Stream<Item = FunctionCall> + Send + 'static
    {
        let instance_id = self.instance_id;
        let config = self.config.clone();
        
        let stream = input
            .map(move |call| {
                let _instance_id = instance_id;
                let config = config.clone();
                async move {
                    let start_time = std::time::Instant::now();
                    
                    // Execute the function using the sandbox (simplified implementation)
                    // In a full implementation, this would integrate directly with the WasmInstance
                    let result = if call.function_name == "error_function" {
                        Err(SandboxError::FunctionCall {
                            function_name: call.function_name.clone(),
                            reason: "Simulated error".to_string(),
                        })
                    } else {
                        // For now, return a placeholder result
                        // TODO: Integrate with actual WasmSandbox.call_function
                        Ok(Value::String(format!("Result for {}", call.function_name)))
                    };
                    
                    let execution_time = start_time.elapsed();
                    
                    FunctionResult {
                        call,
                        result,
                        execution_time,
                        resource_usage: if config.monitor_resources {
                            // Placeholder for resource monitoring integration
                            // TODO: Integrate with actual ResourceMonitor from the sandbox instance
                            None
                        } else {
                            None
                        },
                    }
                }
            })
            .buffer_unordered(self.config.max_concurrency);

        Box::pin(stream)
    }

    async fn execute_batch<I>(&self, calls: I) -> Vec<FunctionResult>
    where 
        I: IntoIterator<Item = FunctionCall> + Send,
        I::IntoIter: Send
    {
        let calls_vec: Vec<_> = calls.into_iter().collect();
        let stream = futures::stream::iter(calls_vec);
        let result_stream = self.execute_stream(stream).await;
        
        result_stream.collect().await
    }

    async fn execute_with_streaming_input<S>(
        &self,
        function_name: &str,
        input_stream: S
    ) -> Pin<Box<dyn Stream<Item = Result<Value>> + Send>>
    where 
        S: Stream<Item = Value> + Send + 'static
    {
        let function_name = function_name.to_string();
        
        let stream = input_stream.map(move |_input_value| {
            let function_name = function_name.clone();
            async move {
                // Simplified implementation for streaming input processing
                // TODO: Integrate with actual WasmSandbox.call_function for streaming input
                Ok(Value::String(format!("Processed {} with input", function_name)))
            }
        })
        .buffer_unordered(self.config.max_concurrency);

        Box::pin(stream)
    }

    async fn execute_with_streaming_output(
        &self,
        function_name: &str,
        parameters: &[Value]
    ) -> Pin<Box<dyn Stream<Item = Result<Value>> + Send>> {
        let function_name = function_name.to_string();
        let parameters = parameters.to_vec();
        
        // Simplified implementation for streaming output
        // TODO: Integrate with actual WasmSandbox for functions that support streaming output
        // This would require the WASM function to support streaming protocols or chunked results
        
        let stream = futures::stream::iter(0..5).map(move |i| {
            Ok(Value::Object({
                let mut obj = serde_json::Map::new();
                obj.insert("chunk".to_string(), Value::Number(i.into()));
                obj.insert("function".to_string(), Value::String(function_name.clone()));
                obj.insert("params".to_string(), Value::Array(parameters.clone()));
                obj
            }))
        });

        Box::pin(stream)
    }
}

/// Builder for streaming configuration
#[derive(Debug)]
pub struct StreamingConfigBuilder {
    config: StreamingConfig,
}

impl StreamingConfigBuilder {
    /// Create a new builder
    pub fn new() -> Self {
        Self {
            config: StreamingConfig::default(),
        }
    }

    /// Set maximum concurrency
    pub fn max_concurrency(mut self, max: usize) -> Self {
        self.config.max_concurrency = max;
        self
    }

    /// Set buffer size
    pub fn buffer_size(mut self, size: usize) -> Self {
        self.config.buffer_size = size;
        self
    }

    /// Set operation timeout
    pub fn operation_timeout(mut self, timeout: std::time::Duration) -> Self {
        self.config.operation_timeout = timeout;
        self
    }

    /// Enable or disable fail-fast behavior
    pub fn fail_fast(mut self, fail_fast: bool) -> Self {
        self.config.fail_fast = fail_fast;
        self
    }

    /// Enable or disable resource monitoring
    pub fn monitor_resources(mut self, monitor: bool) -> Self {
        self.config.monitor_resources = monitor;
        self
    }

    /// Build the configuration
    pub fn build(self) -> StreamingConfig {
        self.config
    }
}

impl Default for StreamingConfigBuilder {
    fn default() -> Self {
        Self::new()
    }
}

/// Extension trait for StreamingConfig
pub trait StreamingConfigExt {
    /// Create a new builder
    fn builder() -> StreamingConfigBuilder;
}

impl StreamingConfigExt for StreamingConfig {
    fn builder() -> StreamingConfigBuilder {
        StreamingConfigBuilder::new()
    }
}