use std::fmt;
use tokio::sync::broadcast;
use tokio::sync::mpsc::Sender;
use tracing::{debug, warn};
use crate::events::ChainEventData;
use crate::messages::TheaterCommand;
use crate::store::ContentRef;
use crate::TheaterId;
pub use theater_chain::ChainEvent;
#[derive(Debug, Clone)]
pub struct HttpReplayChain(pub Vec<ChainEvent>);
impl HttpReplayChain {
pub fn events_by_type(&self, event_type: &str) -> Vec<&ChainEvent> {
self.0
.iter()
.filter(|e| e.event_type == event_type)
.collect()
}
pub fn http_incoming_events(&self) -> Vec<&ChainEvent> {
self.events_by_type("wasi:http/incoming-handler@0.2.0/handle")
}
}
#[derive(Clone)]
pub struct StateChain {
current_hash: Option<Vec<u8>>,
theater_tx: Sender<TheaterCommand>,
actor_id: TheaterId,
event_broadcast: broadcast::Sender<ChainEvent>,
}
impl fmt::Debug for StateChain {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StateChain")
.field("current_hash", &self.current_hash)
.field("actor_id", &self.actor_id)
.finish()
}
}
impl StateChain {
pub fn new(actor_id: TheaterId, theater_tx: Sender<TheaterCommand>) -> Self {
let (event_broadcast, _) = broadcast::channel(1024);
Self {
current_hash: None,
theater_tx,
actor_id,
event_broadcast,
}
}
pub fn add_typed_event(
&mut self,
event_data: ChainEventData,
) -> Result<ChainEvent, serde_json::Error> {
let mut event = event_data.to_chain_event(self.current_hash.clone());
let serialized_event = serde_json::to_vec(&event)?;
let content_ref = ContentRef::from_content(&serialized_event);
let hash_bytes = hex::decode(content_ref.hash()).unwrap();
event.hash = hash_bytes.clone();
self.current_hash = Some(event.hash.clone());
if let Err(e) = self.theater_tx.try_send(TheaterCommand::NewEvent {
actor_id: self.actor_id,
event: event.clone(),
}) {
warn!("Failed to send event notification: {}", e);
}
let _ = self.event_broadcast.send(event.clone());
debug!(
"Emitted event {} for actor {}",
content_ref.hash(),
self.actor_id
);
Ok(event)
}
pub fn head_hash(&self) -> Option<&[u8]> {
self.current_hash.as_deref()
}
pub fn subscribe(&self) -> broadcast::Receiver<ChainEvent> {
self.event_broadcast.subscribe()
}
}