use std::pin::Pin;
use futures::{Stream, StreamExt};
use serde_json::Value;
use async_trait::async_trait;
use crate::error::{Result, SandboxError, InstanceId};
#[derive(Debug, Clone)]
pub struct FunctionCall {
pub function_name: String,
pub parameters: Vec<Value>,
pub timeout: Option<std::time::Duration>,
}
#[derive(Debug)]
pub struct FunctionResult {
pub call: FunctionCall,
pub result: Result<Value>,
pub execution_time: std::time::Duration,
pub resource_usage: Option<crate::monitoring::DetailedResourceUsage>,
}
#[async_trait]
pub trait StreamingExecution {
async fn execute_stream<S>(&self, input: S) -> Pin<Box<dyn Stream<Item = FunctionResult> + Send>>
where
S: Stream<Item = FunctionCall> + Send + 'static;
async fn execute_batch<I>(&self, calls: I) -> Vec<FunctionResult>
where
I: IntoIterator<Item = FunctionCall> + Send,
I::IntoIter: Send;
async fn execute_with_streaming_input<S>(
&self,
function_name: &str,
input_stream: S
) -> Pin<Box<dyn Stream<Item = Result<Value>> + Send>>
where
S: Stream<Item = Value> + Send + 'static;
async fn execute_with_streaming_output(
&self,
function_name: &str,
parameters: &[Value]
) -> Pin<Box<dyn Stream<Item = Result<Value>> + Send>>;
}
#[derive(Debug, Clone)]
pub struct StreamingConfig {
pub max_concurrency: usize,
pub buffer_size: usize,
pub operation_timeout: std::time::Duration,
pub fail_fast: bool,
pub monitor_resources: bool,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
max_concurrency: 10,
buffer_size: 1000,
operation_timeout: std::time::Duration::from_secs(30),
fail_fast: false,
monitor_resources: true,
}
}
}
pub struct StreamingExecutor {
instance_id: InstanceId,
config: StreamingConfig,
}
impl StreamingExecutor {
pub fn new(instance_id: InstanceId, config: StreamingConfig) -> Self {
Self {
instance_id,
config,
}
}
pub fn with_defaults(instance_id: InstanceId) -> Self {
Self::new(instance_id, StreamingConfig::default())
}
}
#[async_trait]
impl StreamingExecution for StreamingExecutor {
async fn execute_stream<S>(&self, input: S) -> Pin<Box<dyn Stream<Item = FunctionResult> + Send>>
where
S: Stream<Item = FunctionCall> + Send + 'static
{
let instance_id = self.instance_id;
let config = self.config.clone();
let stream = input
.map(move |call| {
let _instance_id = instance_id;
let config = config.clone();
async move {
let start_time = std::time::Instant::now();
let result = if call.function_name == "error_function" {
Err(SandboxError::FunctionCall {
function_name: call.function_name.clone(),
reason: "Simulated error".to_string(),
})
} else {
Ok(Value::String(format!("Result for {}", call.function_name)))
};
let execution_time = start_time.elapsed();
FunctionResult {
call,
result,
execution_time,
resource_usage: if config.monitor_resources {
None
} else {
None
},
}
}
})
.buffer_unordered(self.config.max_concurrency);
Box::pin(stream)
}
async fn execute_batch<I>(&self, calls: I) -> Vec<FunctionResult>
where
I: IntoIterator<Item = FunctionCall> + Send,
I::IntoIter: Send
{
let calls_vec: Vec<_> = calls.into_iter().collect();
let stream = futures::stream::iter(calls_vec);
let result_stream = self.execute_stream(stream).await;
result_stream.collect().await
}
async fn execute_with_streaming_input<S>(
&self,
function_name: &str,
input_stream: S
) -> Pin<Box<dyn Stream<Item = Result<Value>> + Send>>
where
S: Stream<Item = Value> + Send + 'static
{
let function_name = function_name.to_string();
let stream = input_stream.map(move |_input_value| {
let function_name = function_name.clone();
async move {
Ok(Value::String(format!("Processed {} with input", function_name)))
}
})
.buffer_unordered(self.config.max_concurrency);
Box::pin(stream)
}
async fn execute_with_streaming_output(
&self,
function_name: &str,
parameters: &[Value]
) -> Pin<Box<dyn Stream<Item = Result<Value>> + Send>> {
let function_name = function_name.to_string();
let parameters = parameters.to_vec();
let stream = futures::stream::iter(0..5).map(move |i| {
Ok(Value::Object({
let mut obj = serde_json::Map::new();
obj.insert("chunk".to_string(), Value::Number(i.into()));
obj.insert("function".to_string(), Value::String(function_name.clone()));
obj.insert("params".to_string(), Value::Array(parameters.clone()));
obj
}))
});
Box::pin(stream)
}
}
#[derive(Debug)]
pub struct StreamingConfigBuilder {
config: StreamingConfig,
}
impl StreamingConfigBuilder {
pub fn new() -> Self {
Self {
config: StreamingConfig::default(),
}
}
pub fn max_concurrency(mut self, max: usize) -> Self {
self.config.max_concurrency = max;
self
}
pub fn buffer_size(mut self, size: usize) -> Self {
self.config.buffer_size = size;
self
}
pub fn operation_timeout(mut self, timeout: std::time::Duration) -> Self {
self.config.operation_timeout = timeout;
self
}
pub fn fail_fast(mut self, fail_fast: bool) -> Self {
self.config.fail_fast = fail_fast;
self
}
pub fn monitor_resources(mut self, monitor: bool) -> Self {
self.config.monitor_resources = monitor;
self
}
pub fn build(self) -> StreamingConfig {
self.config
}
}
impl Default for StreamingConfigBuilder {
fn default() -> Self {
Self::new()
}
}
pub trait StreamingConfigExt {
fn builder() -> StreamingConfigBuilder;
}
impl StreamingConfigExt for StreamingConfig {
fn builder() -> StreamingConfigBuilder {
StreamingConfigBuilder::new()
}
}