theater 0.3.28

A WebAssembly actor system for AI agents
Documentation
//! # Actor Store
//!
//! This module provides an abstraction for sharing resources between an actor and the Theater runtime system.
//! The ActorStore serves as a container for state, event chains, and communication channels that are
//! needed for WASM host functions to interface with the Actor system.

use crate::actor::handle::ActorHandle;
use crate::chain::{ChainEvent, StateChain};
use crate::events::{ChainEventData, ChainEventPayload};
use crate::id::TheaterId;
use crate::messages::TheaterCommand;
use crate::pack_bridge::Value;
use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock as StdRwLock};
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use wasmtime::component::ResourceTable;

/// # ActorStore
///
/// A container for sharing actor resources with WebAssembly host functions.
///
/// The ActorStore serves as a central repository for all resources related to a specific actor instance.
/// It provides access to:
/// - The actor's unique identifier
/// - Communication channels to the Theater runtime
/// - The event chain for audit and verification
/// - The actor's current state data
/// - A handle to interact with the actor
/// - A resource table for Component Model resources (pollables, file handles, etc.)
#[derive(Clone)]
pub struct ActorStore {
    /// Unique identifier for the actor
    pub id: TheaterId,

    /// Channel for sending commands to the Theater runtime
    pub theater_tx: Sender<TheaterCommand>,

    /// The event chain that records all actor operations for verification and audit
    pub chain: Arc<RwLock<StateChain>>,

    /// The current state of the actor, stored as a Pack Value
    pub state: Value,

    /// Handle to interact with the actor
    pub actor_handle: ActorHandle,

    /// Resource table for managing Component Model resources
    /// This table stores all resources (pollables, file handles, etc.) that are exposed to the actor
    pub resource_table: Arc<Mutex<ResourceTable>>,

    /// Extension storage for handlers to store arbitrary data
    /// Keyed by TypeId of the data type for type-safe retrieval
    /// This allows handlers to pass data from setup_host_functions to Host trait implementations
    pub extensions: Arc<StdRwLock<HashMap<TypeId, Box<dyn Any + Send + Sync>>>>,
}

impl ActorStore {
    /// # Create a new ActorStore
    ///
    /// Creates a new instance of the ActorStore with the given parameters.
    ///
    /// ## Parameters
    ///
    /// * `id` - Unique identifier for the actor
    /// * `theater_tx` - Channel for sending commands to the Theater runtime
    /// * `actor_handle` - Handle for interacting with the actor
    /// * `chain` - The event chain for this actor
    ///
    /// ## Returns
    ///
    /// A new ActorStore instance configured with the provided parameters.
    pub fn new(
        id: TheaterId,
        theater_tx: Sender<TheaterCommand>,
        actor_handle: ActorHandle,
        chain: Arc<RwLock<StateChain>>,
        initial_state: Value,
    ) -> Self {
        Self {
            id,
            theater_tx: theater_tx.clone(),
            chain,
            state: initial_state,
            actor_handle,
            resource_table: Arc::new(Mutex::new(ResourceTable::new())),
            extensions: Arc::new(StdRwLock::new(HashMap::new())),
        }
    }

    /// # Get the actor's ID
    ///
    /// Retrieves the unique identifier for this actor.
    ///
    /// ## Returns
    ///
    /// A clone of the actor's TheaterId.
    pub fn get_id(&self) -> TheaterId {
        self.id
    }

    /// # Get the Theater command channel
    ///
    /// Retrieves the channel used for sending commands to the Theater runtime.
    ///
    /// ## Returns
    ///
    /// A clone of the Sender<TheaterCommand> channel.
    pub fn get_theater_tx(&self) -> Sender<TheaterCommand> {
        self.theater_tx.clone()
    }

    /// # Get the actor's state
    ///
    /// Retrieves the current state data for this actor.
    ///
    /// ## Returns
    ///
    /// A clone of the actor's current state Value.
    pub fn get_state(&self) -> Value {
        self.state.clone()
    }

    /// # Set the actor's state
    ///
    /// Updates the current state data for this actor.
    ///
    /// ## Parameters
    ///
    /// * `state` - The new state Value
    pub fn set_state(&mut self, state: Value) {
        self.state = state;
    }

    /// # Record an event in the chain
    ///
    /// Adds a new event to the actor's event chain.
    ///
    /// ## Parameters
    ///
    /// * `event_data` - The event data to record
    ///
    /// ## Returns
    ///
    /// The ChainEvent that was created and added to the chain.
    pub async fn record_event(&self, event_data: ChainEventData) -> ChainEvent {
        let mut chain = self.chain.write().await;
        chain
            .add_typed_event(event_data)
            .await
            .expect("Failed to record event")
    }

    /// Record a WebAssembly execution event.
    ///
    /// This is used for recording WASM function calls, results, and errors.
    ///
    /// ## Parameters
    ///
    /// * `event_type` - A string identifier for this event (e.g., "wasm-call")
    /// * `data` - The WasmEventData containing the event details
    pub async fn record_wasm_event(
        &self,
        event_type: String,
        data: crate::events::wasm::WasmEventData,
    ) -> ChainEvent {
        self.record_event(ChainEventData {
            event_type,
            data: ChainEventPayload::Wasm(data),
        })
        .await
    }

    /// Record a theater runtime event (for debugging/audit purposes).
    ///
    /// Note: These events are recorded as Wasm events with a special event type
    /// since they're primarily for debugging and not essential for replay.
    ///
    /// ## Parameters
    ///
    /// * `event_type` - A string identifier for this event
    /// * `data` - The TheaterRuntimeEventData
    pub async fn record_theater_runtime_event(
        &self,
        event_type: String,
        data: crate::events::theater_runtime::TheaterRuntimeEventData,
    ) -> ChainEvent {
        // Convert theater runtime events to Wasm events for storage
        // These are primarily for debugging/audit and not essential for replay
        let wasm_data = crate::events::wasm::WasmEventData::WasmCall {
            function_name: event_type.clone(),
            params: Value::String(serde_json::to_string(&data).unwrap_or_default()),
        };
        self.record_event(ChainEventData {
            event_type: format!("theater-runtime/{}", event_type),
            data: ChainEventPayload::Wasm(wasm_data),
        })
        .await
    }

    /// # Get the hash of the most recently emitted event.
    ///
    /// Used by callers that want to know the chain head without subscribing
    /// to the stream. Returns `None` if no events have been emitted yet.
    pub async fn head_hash(&self) -> Option<Vec<u8>> {
        let chain = self.chain.read().await;
        chain.head_hash().map(|h| h.to_vec())
    }

    /// Subscribe to events as they are recorded to the chain.
    ///
    /// Registers `tx` as a subscriber on the actor's chain. Each new event
    /// is dispatched as `(actor_id, event)` via `.send().await` — see
    /// [`crate::chain::StateChain`] docs for the back-pressure / best-effort
    /// tradeoff.
    pub async fn subscribe_to_events(
        &self,
        tx: tokio::sync::mpsc::Sender<(crate::TheaterId, ChainEvent)>,
    ) {
        let mut chain = self.chain.write().await;
        chain.add_subscriber(tx);
    }

    pub fn get_actor_handle(&self) -> ActorHandle {
        self.actor_handle.clone()
    }

    // =========================================================================
    // Extension Methods
    // =========================================================================

    /// Store extension data of a specific type
    ///
    /// Handlers use this to store data during setup that can be retrieved
    /// later in Host trait implementations. Each type can only have one value;
    /// calling this again with the same type will overwrite the previous value.
    ///
    /// ## Type Parameters
    ///
    /// * `T` - The type of data to store. Must be Send + Sync + 'static.
    ///
    /// ## Parameters
    ///
    /// * `value` - The value to store
    ///
    /// ## Example
    ///
    /// ```rust,ignore
    /// #[derive(Clone)]
    /// struct MyHandlerConfig { path: PathBuf }
    ///
    /// // In setup_host_functions:
    /// actor_store.set_extension(MyHandlerConfig { path: "/tmp".into() });
    /// ```
    pub fn set_extension<T: Send + Sync + 'static>(&self, value: T) {
        let mut extensions = self.extensions.write().unwrap();
        extensions.insert(TypeId::of::<T>(), Box::new(value));
    }

    /// Retrieve extension data of a specific type
    ///
    /// Returns a clone of the stored value if it exists and matches the requested type.
    ///
    /// ## Type Parameters
    ///
    /// * `T` - The type of data to retrieve. Must be Clone + Send + Sync + 'static.
    ///
    /// ## Returns
    ///
    /// * `Some(T)` - A clone of the stored value
    /// * `None` - If no value of this type was stored
    ///
    /// ## Example
    ///
    /// ```rust,ignore
    /// // In Host trait implementation:
    /// if let Some(config) = self.get_extension::<MyHandlerConfig>() {
    ///     println!("Using path: {:?}", config.path);
    /// }
    /// ```
    pub fn get_extension<T: Clone + Send + Sync + 'static>(&self) -> Option<T> {
        let extensions = self.extensions.read().unwrap();
        extensions
            .get(&TypeId::of::<T>())
            .and_then(|boxed| boxed.downcast_ref::<T>())
            .cloned()
    }

    /// Check if extension data of a specific type exists
    ///
    /// ## Type Parameters
    ///
    /// * `T` - The type to check for
    ///
    /// ## Returns
    ///
    /// `true` if a value of this type is stored, `false` otherwise
    pub fn has_extension<T: Send + Sync + 'static>(&self) -> bool {
        let extensions = self.extensions.read().unwrap();
        extensions.contains_key(&TypeId::of::<T>())
    }

    /// Remove and return extension data of a specific type
    ///
    /// ## Type Parameters
    ///
    /// * `T` - The type of data to remove
    ///
    /// ## Returns
    ///
    /// * `Some(T)` - The removed value
    /// * `None` - If no value of this type was stored
    pub fn remove_extension<T: Send + Sync + 'static>(&self) -> Option<T> {
        let mut extensions = self.extensions.write().unwrap();
        extensions
            .remove(&TypeId::of::<T>())
            .and_then(|boxed| boxed.downcast::<T>().ok())
            .map(|b| *b)
    }
}