run-kit 0.7.1

Universal multi-language runner and smart REPL
Documentation
use super::*;
use crate::v2::{Error, Result};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;

#[derive(Debug, Clone)]
pub struct AsyncConfig {
    pub timeout: Option<Duration>,
    pub max_concurrent: usize,
    pub enable_fuel: bool,
    pub fuel_limit: u64,
}

impl Default for AsyncConfig {
    fn default() -> Self {
        Self {
            timeout: Some(Duration::from_secs(30)),
            max_concurrent: 100,
            enable_fuel: true,
            fuel_limit: 1_000_000_000,
        }
    }
}

#[derive(Debug)]
pub struct AsyncCallResult {
    pub result: ExecutionResult,
    pub wait_time_ms: u64,
    pub exec_time_ms: u64,
}

#[cfg(feature = "v2")]
pub struct EpochTicker {
    running: Arc<AtomicBool>,
    handle: Option<tokio::task::JoinHandle<()>>,
}

#[cfg(feature = "v2")]
impl EpochTicker {
    pub fn start(engine: wasmtime::Engine, interval: Duration) -> Self {
        let running = Arc::new(AtomicBool::new(true));
        let running_clone = running.clone();

        let handle = tokio::spawn(async move {
            let mut interval_timer = tokio::time::interval(interval);
            while running_clone.load(Ordering::SeqCst) {
                interval_timer.tick().await;
                engine.increment_epoch();
            }
        });

        Self {
            running,
            handle: Some(handle),
        }
    }

    pub fn stop(&mut self) {
        self.running.store(false, Ordering::SeqCst);
        if let Some(handle) = self.handle.take() {
            handle.abort();
        }
    }
}

#[cfg(feature = "v2")]
impl Drop for EpochTicker {
    fn drop(&mut self) {
        self.stop();
    }
}

pub async fn call_async(
    runtime: &RuntimeEngine,
    handle: &InstanceHandle,
    function: &str,
    args: Vec<ComponentValue>,
    config: AsyncConfig,
) -> Result<AsyncCallResult> {
    let start = std::time::Instant::now();

    #[cfg(feature = "v2")]
    let _ticker = if config.timeout.is_some() {
        Some(EpochTicker::start(
            runtime.wasmtime_engine().clone(),
            Duration::from_millis(10),
        ))
    } else {
        None
    };

    let result = if let Some(timeout) = config.timeout {
        tokio::time::timeout(timeout, async { runtime.call(handle, function, args) })
            .await
            .map_err(|_| Error::ExecutionFailed {
                component: handle.component_id.clone(),
                reason: format!("Call timed out after {:?}", timeout),
            })?
    } else {
        runtime.call(handle, function, args)
    };

    let exec_time_ms = start.elapsed().as_millis() as u64;

    result.map(|r| AsyncCallResult {
        result: r,
        wait_time_ms: 0,
        exec_time_ms,
    })
}

#[cfg(not(feature = "v2"))]
pub async fn call_async_fallback(
    runtime: &RuntimeEngine,
    handle: &InstanceHandle,
    function: &str,
    args: Vec<ComponentValue>,
    config: AsyncConfig,
) -> Result<AsyncCallResult> {
    let start = std::time::Instant::now();

    let result = if let Some(timeout) = config.timeout {
        tokio::time::timeout(timeout, async { runtime.call(handle, function, args) })
            .await
            .map_err(|_| Error::ExecutionFailed {
                component: handle.component_id.clone(),
                reason: format!("Call timed out after {:?}", timeout),
            })?
    } else {
        runtime.call(handle, function, args)
    };

    let exec_time_ms = start.elapsed().as_millis() as u64;

    result.map(|r| AsyncCallResult {
        result: r,
        wait_time_ms: 0,
        exec_time_ms,
    })
}

pub async fn call_parallel<'a>(
    runtime: &'a RuntimeEngine,
    calls: Vec<(&'a InstanceHandle, &'a str, Vec<ComponentValue>)>,
    config: AsyncConfig,
) -> Vec<Result<AsyncCallResult>> {
    let handles: Vec<_> = calls
        .into_iter()
        .map(|(handle, function, args)| {
            let runtime = runtime;
            let config = config.clone();
            async move { call_async(runtime, handle, function, args, config).await }
        })
        .collect();

    futures::future::join_all(handles).await
}

pub struct AsyncBatchExecutor {
    config: AsyncConfig,
    semaphore: Arc<tokio::sync::Semaphore>,
}

impl AsyncBatchExecutor {
    pub fn new(config: AsyncConfig) -> Self {
        let semaphore = Arc::new(tokio::sync::Semaphore::new(config.max_concurrent));
        Self { config, semaphore }
    }

    pub async fn execute(
        &self,
        runtime: &RuntimeEngine,
        handle: &InstanceHandle,
        function: &str,
        args: Vec<ComponentValue>,
    ) -> Result<AsyncCallResult> {
        let start = std::time::Instant::now();

        let _permit = self
            .semaphore
            .acquire()
            .await
            .map_err(|_| Error::ExecutionFailed {
                component: handle.component_id.clone(),
                reason: "Failed to acquire execution slot".to_string(),
            })?;

        let wait_time_ms = start.elapsed().as_millis() as u64;

        let exec_start = std::time::Instant::now();
        let result = call_async(runtime, handle, function, args, self.config.clone()).await?;

        Ok(AsyncCallResult {
            result: result.result,
            wait_time_ms,
            exec_time_ms: exec_start.elapsed().as_millis() as u64,
        })
    }

    pub async fn execute_batch<'a>(
        &'a self,
        runtime: &'a RuntimeEngine,
        calls: Vec<(&'a InstanceHandle, &'a str, Vec<ComponentValue>)>,
    ) -> Vec<Result<AsyncCallResult>> {
        let handles: Vec<_> = calls
            .into_iter()
            .map(|(handle, function, args)| self.execute(runtime, handle, function, args))
            .collect();

        futures::future::join_all(handles).await
    }
}

#[derive(Debug)]
pub enum AsyncEvent {
    Started {
        handle_id: String,
        function: String,
    },
    Completed {
        handle_id: String,
        function: String,
        duration_ms: u64,
    },
    Failed {
        handle_id: String,
        function: String,
        error: String,
    },
    Timeout {
        handle_id: String,
        function: String,
    },
}

#[derive(Debug, Default, Clone)]
pub struct AsyncMetrics {
    pub total_calls: u64,
    pub successful_calls: u64,
    pub failed_calls: u64,
    pub timed_out_calls: u64,
    pub total_exec_time_ms: u64,
    pub total_wait_time_ms: u64,
    pub peak_concurrent: usize,
}

impl AsyncMetrics {
    pub fn record_success(&mut self, exec_time_ms: u64, wait_time_ms: u64) {
        self.total_calls += 1;
        self.successful_calls += 1;
        self.total_exec_time_ms += exec_time_ms;
        self.total_wait_time_ms += wait_time_ms;
    }

    pub fn record_failure(&mut self) {
        self.total_calls += 1;
        self.failed_calls += 1;
    }

    pub fn record_timeout(&mut self) {
        self.total_calls += 1;
        self.timed_out_calls += 1;
    }

    pub fn average_exec_time_ms(&self) -> f64 {
        if self.successful_calls == 0 {
            0.0
        } else {
            self.total_exec_time_ms as f64 / self.successful_calls as f64
        }
    }

    pub fn success_rate(&self) -> f64 {
        if self.total_calls == 0 {
            1.0
        } else {
            self.successful_calls as f64 / self.total_calls as f64
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_async_config_default() {
        let config = AsyncConfig::default();
        assert_eq!(config.max_concurrent, 100);
        assert!(config.enable_fuel);
    }

    #[test]
    fn test_async_metrics() {
        let mut metrics = AsyncMetrics::default();
        metrics.record_success(100, 10);
        metrics.record_success(200, 20);
        metrics.record_failure();

        assert_eq!(metrics.total_calls, 3);
        assert_eq!(metrics.successful_calls, 2);
        assert_eq!(metrics.failed_calls, 1);
        assert_eq!(metrics.average_exec_time_ms(), 150.0);
    }
}