theater 0.3.28

A WebAssembly actor system for AI agents
Documentation
//! # Actor Handle
//!
//! This module provides the `ActorHandle` type, which serves as the primary interface
//! for interacting with actors in the Theater system.

use anyhow::Result;

use tokio::sync::{mpsc, oneshot};
use tokio::time::timeout;
use tracing::error;

use crate::actor::types::{
    ActorError, ActorOperation, WasiHttpResponse, DEFAULT_OPERATION_TIMEOUT,
};
use crate::metrics::ActorMetrics;
use crate::pack_bridge::{self, InterfaceHash, Value};

use super::types::{ActorControl, ActorInfo};

/// # ActorHandle
///
/// A handle to an actor in the Theater system, providing methods to interact with the actor.
///
/// ## Purpose
///
/// ActorHandle provides a high-level interface for communicating with actors, managing their
/// lifecycle, and accessing their state and events. It encapsulates the details of message
/// passing and synchronization between the caller and the actor's execution environment.
#[derive(Clone, Debug)]
pub struct ActorHandle {
    operation_tx: mpsc::Sender<ActorOperation>,
    info_tx: mpsc::Sender<ActorInfo>,
    control_tx: mpsc::Sender<ActorControl>,
}

impl ActorHandle {
    /// Creates a new ActorHandle with the given operation channel.
    ///
    /// ## Parameters
    ///
    /// * `operation_tx` - The sender side of a channel used to send operations to the actor.
    ///
    /// ## Returns
    ///
    /// A new ActorHandle instance.
    pub fn new(
        operation_tx: mpsc::Sender<ActorOperation>,
        info_tx: mpsc::Sender<ActorInfo>,
        control_tx: mpsc::Sender<ActorControl>,
    ) -> Self {
        Self {
            operation_tx,
            info_tx,
            control_tx,
        }
    }

    /// Calls a function on the actor with Pack-native Value params and return.
    ///
    /// ## Parameters
    ///
    /// * `name` - The name of the function to call on the actor.
    /// * `params` - The parameters as a Pack `Value` (preserves structured type info).
    ///
    /// ## Returns
    ///
    /// * `Ok(Value)` - The return value from the function call as a Pack `Value`.
    /// * `Err(ActorError)` - An error occurred during the function call.
    pub async fn call_function(&self, name: String, params: Value) -> Result<Value, ActorError> {
        let (tx, rx) = oneshot::channel();

        let params_bytes = pack_bridge::encode_value(&params).map_err(|e| {
            error!("Failed to encode params: {}", e);
            ActorError::SerializationError
        })?;

        self.operation_tx
            .send(ActorOperation::CallFunctionPack {
                name,
                params: params_bytes,
                response_tx: tx,
            })
            .await
            .map_err(|e| {
                error!("Failed to send operation: {}", e);
                ActorError::ChannelClosed
            })?;

        match timeout(DEFAULT_OPERATION_TIMEOUT, rx).await {
            Ok(result) => match result {
                Ok(inner) => {
                    let result_bytes = inner?;
                    if result_bytes.is_empty() {
                        Ok(pack_bridge::Value::Tuple(vec![]))
                    } else {
                        pack_bridge::decode_value(&result_bytes).map_err(|e| {
                            error!("Failed to decode result: {}", e);
                            ActorError::SerializationError
                        })
                    }
                }
                Err(e) => {
                    error!("Channel closed while waiting for response: {:?}", e);
                    Err(ActorError::ChannelClosed)
                }
            },
            Err(_) => {
                error!("Operation timed out after {:?}", DEFAULT_OPERATION_TIMEOUT);
                Err(ActorError::OperationTimeout(
                    DEFAULT_OPERATION_TIMEOUT.as_secs(),
                ))
            }
        }
    }

    /// Call a function with pre-encoded Pack ABI params, discarding the return value.
    ///
    /// Routes through `CallFunctionPack` which decodes the Pack ABI bytes back to
    /// a structured Value before passing to the wasm module.
    pub async fn call_function_pack_void(
        &self,
        name: String,
        params: Vec<u8>,
    ) -> Result<(), ActorError> {
        let (tx, rx) = oneshot::channel();

        self.operation_tx
            .send(ActorOperation::CallFunctionPack {
                name: name.clone(),
                params,
                response_tx: tx,
            })
            .await
            .map_err(|e| {
                error!("Failed to send operation: {}", e);
                ActorError::ChannelClosed
            })?;

        match timeout(DEFAULT_OPERATION_TIMEOUT, rx).await {
            Ok(result) => match result {
                Ok(inner_result) => match inner_result {
                    Ok(_) => Ok(()),
                    Err(e) => {
                        error!("Function call '{}' failed: {:?}", name, e);
                        Err(e)
                    }
                },
                Err(_) => {
                    error!("Channel closed while waiting for function call '{}'", name);
                    Err(ActorError::ChannelClosed)
                }
            },
            Err(_) => {
                error!("Operation timed out after {:?}", DEFAULT_OPERATION_TIMEOUT);
                Err(ActorError::OperationTimeout(
                    DEFAULT_OPERATION_TIMEOUT.as_secs(),
                ))
            }
        }
    }

    /// Handle a WASI HTTP incoming request.
    ///
    /// ## Purpose
    ///
    /// This method handles an incoming HTTP request by creating WASI HTTP resources
    /// in the actor's store and calling the actor's exported `wasi:http/incoming-handler.handle`
    /// function.
    ///
    /// ## Parameters
    ///
    /// * `method` - HTTP method (GET, POST, etc.)
    /// * `scheme` - URL scheme (http, https, etc.)
    /// * `authority` - Authority component (host:port)
    /// * `path_with_query` - Path with optional query string
    /// * `headers` - Request headers as (name, value) pairs
    /// * `body` - Request body bytes
    ///
    /// ## Returns
    ///
    /// * `Ok(WasiHttpResponse)` - The HTTP response from the actor
    /// * `Err(ActorError)` - An error occurred during request handling
    pub async fn handle_wasi_http_request(
        &self,
        method: String,
        scheme: Option<String>,
        authority: Option<String>,
        path_with_query: Option<String>,
        headers: Vec<(String, Vec<u8>)>,
        body: Vec<u8>,
    ) -> Result<WasiHttpResponse, ActorError> {
        let (tx, rx) = oneshot::channel();

        self.operation_tx
            .send(ActorOperation::HandleWasiHttpRequest {
                method,
                scheme,
                authority,
                path_with_query,
                headers,
                body,
                response_tx: tx,
            })
            .await
            .map_err(|e| {
                error!("Failed to send HandleWasiHttpRequest operation: {}", e);
                ActorError::ChannelClosed
            })?;

        match timeout(DEFAULT_OPERATION_TIMEOUT, rx).await {
            Ok(result) => match result {
                Ok(result) => result,
                Err(e) => {
                    error!("Channel closed while waiting for HTTP response: {:?}", e);
                    Err(ActorError::ChannelClosed)
                }
            },
            Err(_) => {
                error!(
                    "HTTP request timed out after {:?}",
                    DEFAULT_OPERATION_TIMEOUT
                );
                Err(ActorError::OperationTimeout(
                    DEFAULT_OPERATION_TIMEOUT.as_secs(),
                ))
            }
        }
    }

    /// Retrieves the current state of the actor.
    ///
    /// ## Purpose
    ///
    /// This method allows access to the actor's current state, which is useful for
    /// inspecting the actor's internal data or for backup purposes.
    ///
    /// ## Returns
    ///
    /// * `Ok(Value)` - The actor's current state as a Value.
    /// * `Err(ActorError)` - An error occurred while retrieving the state.
    pub async fn get_state(&self) -> Result<Value, ActorError> {
        let (tx, rx) = oneshot::channel();

        self.info_tx
            .send(ActorInfo::GetState { response_tx: tx })
            .await
            .map_err(|e| {
                error!("Failed to send GetState operation: {}", e);
                ActorError::ChannelClosed
            })?;

        match timeout(DEFAULT_OPERATION_TIMEOUT, rx).await {
            Ok(result) => result.map_err(|e| {
                error!(
                    "Channel closed while waiting for GetState response: {:?}",
                    e
                );
                ActorError::ChannelClosed
            })?,
            Err(_) => {
                error!(
                    "GetState operation timed out after {:?}",
                    DEFAULT_OPERATION_TIMEOUT
                );
                Err(ActorError::OperationTimeout(
                    DEFAULT_OPERATION_TIMEOUT.as_secs(),
                ))
            }
        }
    }

    /// Retrieves performance metrics for the actor.
    ///
    /// ## Purpose
    ///
    /// This method provides access to performance metrics for the actor, which is useful
    /// for monitoring, debugging, and performance analysis.
    ///
    /// ## Returns
    ///
    /// * `Ok(ActorMetrics)` - The current metrics for the actor.
    /// * `Err(ActorError)` - An error occurred while retrieving the metrics.
    pub async fn get_metrics(&self) -> Result<ActorMetrics, ActorError> {
        let (tx, rx) = oneshot::channel();

        self.info_tx
            .send(ActorInfo::GetMetrics { response_tx: tx })
            .await
            .map_err(|e| {
                error!("Failed to send GetMetrics operation: {}", e);
                ActorError::ChannelClosed
            })?;

        match timeout(DEFAULT_OPERATION_TIMEOUT, rx).await {
            Ok(result) => result.map_err(|e| {
                error!(
                    "Channel closed while waiting for GetMetrics response: {:?}",
                    e
                );
                ActorError::ChannelClosed
            })?,
            Err(_) => {
                error!(
                    "GetMetrics operation timed out after {:?}",
                    DEFAULT_OPERATION_TIMEOUT
                );
                Err(ActorError::OperationTimeout(
                    DEFAULT_OPERATION_TIMEOUT.as_secs(),
                ))
            }
        }
    }

    /// Initiates an orderly shutdown of the actor.
    ///
    /// ## Purpose
    ///
    /// This method requests that the actor shut down gracefully, allowing it to
    /// complete any in-progress operations and perform any necessary cleanup.
    ///
    /// ## Returns
    ///
    /// * `Ok(())` - The actor was successfully shut down.
    /// * `Err(ActorError)` - An error occurred during the shutdown process.
    pub async fn shutdown(&self) -> Result<(), ActorError> {
        let (tx, rx) = oneshot::channel();

        self.control_tx
            .send(ActorControl::Shutdown { response_tx: tx })
            .await
            .map_err(|e| {
                error!("Failed to send Shutdown operation: {}", e);
                ActorError::ChannelClosed
            })?;

        match timeout(DEFAULT_OPERATION_TIMEOUT, rx).await {
            Ok(result) => result.map_err(|e| {
                error!(
                    "Channel closed while waiting for Shutdown response: {:?}",
                    e
                );
                ActorError::ChannelClosed
            })?,
            Err(_) => {
                error!(
                    "Shutdown operation timed out after {:?}",
                    DEFAULT_OPERATION_TIMEOUT
                );
                Err(ActorError::OperationTimeout(
                    DEFAULT_OPERATION_TIMEOUT.as_secs(),
                ))
            }
        }
    }

    /// Retrieves the interface hashes for all exported interfaces.
    ///
    /// ## Purpose
    ///
    /// This method queries the actor's Pack metadata to get the interface hashes
    /// for all exported interfaces. These hashes can be used for O(1) compatibility
    /// checking between actors.
    ///
    /// ## Returns
    ///
    /// * `Ok(Vec<InterfaceHash>)` - The list of exported interface hashes.
    /// * `Err(ActorError)` - An error occurred while retrieving the hashes.
    pub async fn get_export_hashes(&self) -> Result<Vec<InterfaceHash>, ActorError> {
        let (tx, rx) = oneshot::channel();

        self.info_tx
            .send(ActorInfo::GetExportHashes { response_tx: tx })
            .await
            .map_err(|e| {
                error!("Failed to send GetExportHashes operation: {}", e);
                ActorError::ChannelClosed
            })?;

        match timeout(DEFAULT_OPERATION_TIMEOUT, rx).await {
            Ok(result) => result.map_err(|e| {
                error!(
                    "Channel closed while waiting for GetExportHashes response: {:?}",
                    e
                );
                ActorError::ChannelClosed
            })?,
            Err(_) => {
                error!(
                    "GetExportHashes operation timed out after {:?}",
                    DEFAULT_OPERATION_TIMEOUT
                );
                Err(ActorError::OperationTimeout(
                    DEFAULT_OPERATION_TIMEOUT.as_secs(),
                ))
            }
        }
    }
}