use crate::actor::handle::ActorHandle;
use crate::actor::store::ActorStore;
use crate::chain::ChainEvent;
use crate::config::actor_manifest::HandlerConfig;
use crate::id::TheaterId;
use crate::pack_bridge::{HostLinkerBuilder, LinkerError, PackInstance, TypeHash};
use crate::shutdown::{ShutdownController, ShutdownReceiver};
use anyhow::Result;
use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::RwLock;
pub type SharedActorInstance = Arc<RwLock<Option<PackInstance>>>;
#[derive(Debug, Clone)]
pub struct HandlerContext {
pub satisfied_imports: HashSet<String>,
pub actor_id: Option<TheaterId>,
pub shutdown_controller: Option<ShutdownController>,
}
impl Default for HandlerContext {
fn default() -> Self {
Self::new()
}
}
impl HandlerContext {
pub fn new() -> Self {
Self {
satisfied_imports: HashSet::new(),
actor_id: None,
shutdown_controller: None,
}
}
pub fn with_shutdown_controller(shutdown_controller: ShutdownController) -> Self {
Self {
satisfied_imports: HashSet::new(),
actor_id: None,
shutdown_controller: Some(shutdown_controller),
}
}
pub fn subscribe_shutdown(&mut self) -> Option<ShutdownReceiver> {
self.shutdown_controller.as_mut().map(|c| c.subscribe())
}
pub fn is_satisfied(&self, import: &str) -> bool {
self.satisfied_imports.contains(import)
}
pub fn mark_satisfied(&mut self, import: &str) {
self.satisfied_imports.insert(import.to_string());
}
pub fn mark_all_satisfied(&mut self, imports: &[String]) {
for import in imports {
self.satisfied_imports.insert(import.clone());
}
}
}
pub struct HandlerRegistry {
handlers: Vec<Box<dyn Handler>>,
replay_chain: Option<Vec<ChainEvent>>,
}
impl Default for HandlerRegistry {
fn default() -> Self {
Self::new()
}
}
impl HandlerRegistry {
pub fn new() -> Self {
Self {
handlers: Vec::new(),
replay_chain: None,
}
}
pub fn set_replay_chain(&mut self, chain: Vec<ChainEvent>) {
self.replay_chain = Some(chain);
}
pub fn replay_chain(&self) -> Option<&Vec<ChainEvent>> {
self.replay_chain.as_ref()
}
pub fn take_replay_chain(&mut self) -> Option<Vec<ChainEvent>> {
self.replay_chain.take()
}
pub fn is_replay_mode(&self) -> bool {
self.replay_chain.is_some()
}
pub fn register<H: Handler>(&mut self, handler: H) {
self.handlers.push(Box::new(handler));
}
pub fn prepend<H: Handler>(&mut self, handler: H) {
self.handlers.insert(0, Box::new(handler));
}
}
impl Clone for HandlerRegistry {
fn clone(&self) -> Self {
let mut new_registry = HandlerRegistry::new();
for handler in &self.handlers {
new_registry.handlers.push(handler.create_instance(None));
}
if let Some(chain) = &self.replay_chain {
new_registry.replay_chain = Some(chain.clone());
}
new_registry
}
}
impl HandlerRegistry {
pub fn get_handlers(&self) -> Vec<Box<dyn Handler>> {
self.handlers
.iter()
.map(|h| h.create_instance(None))
.collect()
}
pub fn clone_with_configs(&self, configs: &[HandlerConfig]) -> Self {
let mut new_registry = HandlerRegistry::new();
for handler in &self.handlers {
let matching_config = configs.iter().find(|c| c.handler_name() == handler.name());
new_registry
.handlers
.push(handler.create_instance(matching_config));
}
if let Some(chain) = &self.replay_chain {
new_registry.replay_chain = Some(chain.clone());
}
new_registry
}
}
pub trait Handler: Send + Sync + 'static {
fn create_instance(&self, config: Option<&HandlerConfig>) -> Box<dyn Handler>;
fn init(&mut self, _actor_handle: ActorHandle, _actor_instance: SharedActorInstance) {
}
fn run(
&mut self,
shutdown_receiver: ShutdownReceiver,
_event_rx: broadcast::Receiver<ChainEvent>,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
Box::pin(async move {
shutdown_receiver.wait_for_shutdown().await;
Ok(())
})
}
fn setup(
&mut self,
actor_handle: ActorHandle,
actor_instance: SharedActorInstance,
shutdown_receiver: ShutdownReceiver,
event_rx: broadcast::Receiver<ChainEvent>,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
self.init(actor_handle, actor_instance);
self.run(shutdown_receiver, event_rx)
}
fn setup_host_functions_composite(
&mut self,
_builder: &mut HostLinkerBuilder<'_, ActorStore>,
_ctx: &mut HandlerContext,
) -> Result<(), LinkerError> {
Ok(())
}
fn name(&self) -> &str;
fn imports(&self) -> Option<Vec<String>>;
fn exports(&self) -> Option<Vec<String>>;
fn interface_hashes(&self) -> Vec<(String, TypeHash)> {
vec![]
}
fn interfaces(&self) -> Vec<crate::pack_bridge::InterfaceImpl> {
vec![]
}
fn supports_composite(&self) -> bool {
false
}
}