theater 0.3.6

A WebAssembly actor system for AI agents
Documentation
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::oneshot::{Receiver, Sender};
use tracing::debug;

/// Default timeout for waiting for a component to shutdown gracefully
pub const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);

/// A signal indicating that a component should shutdown.
///
/// When this signal is dropped, it automatically sends a response to acknowledge
/// that shutdown handling is complete. This ensures the shutdown controller
/// doesn't wait indefinitely for responses.
#[derive(Debug)]
pub struct ShutdownSignal {
    /// Type of shutdown to perform
    pub shutdown_type: ShutdownType,
    /// Response channel - automatically sent when signal is dropped
    sender: Option<Sender<()>>,
}

impl Drop for ShutdownSignal {
    fn drop(&mut self) {
        // Automatically respond when the signal is dropped
        if let Some(sender) = self.sender.take() {
            let _ = sender.send(()); // Ignore error if receiver already closed
        }
    }
}

/// Type of shutdown to perform
#[derive(Debug, Clone, Copy)]
pub enum ShutdownType {
    Graceful,
    Force,
}

/// Controller that can broadcast shutdown signals to multiple receivers.
/// This type is Clone-able and all clones share the same subscriber list.
#[derive(Clone)]
pub struct ShutdownController {
    subscribers: Arc<Mutex<Vec<Sender<ShutdownSignal>>>>,
}

impl std::fmt::Debug for ShutdownController {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ShutdownController")
            .field("subscriber_count", &self.subscribers.lock().unwrap().len())
            .finish()
    }
}

impl Default for ShutdownController {
    fn default() -> Self {
        Self::new()
    }
}

impl ShutdownController {
    /// Create a new ShutdownController and a ShutdownReceiver
    pub fn new() -> Self {
        Self {
            subscribers: Arc::new(Mutex::new(Vec::new())),
        }
    }

    /// Get a new receiver for this controller
    pub fn subscribe(&mut self) -> ShutdownReceiver {
        let (sender, receiver) = tokio::sync::oneshot::channel();
        self.subscribers.lock().unwrap().push(sender);
        ShutdownReceiver { receiver }
    }

    /// Signal all receivers to shutdown
    pub async fn signal_shutdown(self, shutdown_type: ShutdownType) {
        debug!("Signaling shutdown to all subscribers");
        let subscribers = {
            let mut guard = self.subscribers.lock().unwrap();
            std::mem::take(&mut *guard)
        };
        let mut receivers = Vec::new();
        for sender in subscribers {
            let (responder, receiver) = tokio::sync::oneshot::channel();
            receivers.push(receiver);
            match sender.send(ShutdownSignal {
                shutdown_type,
                sender: Some(responder),
            }) {
                Ok(_) => {
                    debug!("Shutdown signal sent");
                }
                Err(e) => {
                    debug!("Failed to send shutdown signal: {:?}", e);
                }
            }
        }

        // Wait for all receivers to finish
        for receiver in receivers {
            if let Err(e) = receiver.await {
                debug!("Failed to receive shutdown signal: {:?}", e);
            }
        }
    }
}

/// Receiver that can wait for shutdown signals
pub struct ShutdownReceiver {
    pub receiver: Receiver<ShutdownSignal>,
}

impl ShutdownReceiver {
    /// Wait for a shutdown signal to be received
    pub async fn wait_for_shutdown(self) -> ShutdownSignal {
        debug!("Waiting for shutdown signal");
        match self.receiver.await {
            Ok(signal) => {
                debug!("Received shutdown signal");
                signal
            }
            Err(e) => {
                debug!("Shutdown channel error: {}, using default signal", e);
                ShutdownSignal {
                    sender: None,
                    shutdown_type: ShutdownType::Graceful,
                }
            }
        }
    }
}