use std::fmt;
use std::time::Instant;
use tokio::sync::mpsc::Sender;
use tracing::{debug, error};
use crate::events::ChainEventData;
use crate::store::ContentRef;
use crate::TheaterId;
pub use theater_chain::ChainEvent;
#[derive(Debug, Clone)]
pub struct HttpReplayChain(pub Vec<ChainEvent>);
impl HttpReplayChain {
pub fn events_by_type(&self, event_type: &str) -> Vec<&ChainEvent> {
self.0
.iter()
.filter(|e| e.event_type == event_type)
.collect()
}
pub fn http_incoming_events(&self) -> Vec<&ChainEvent> {
self.events_by_type("wasi:http/incoming-handler@0.2.0/handle")
}
}
pub struct StateChain {
current_hash: Option<Vec<u8>>,
actor_id: TheaterId,
subscribers: Vec<Sender<(TheaterId, ChainEvent)>>,
}
impl fmt::Debug for StateChain {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StateChain")
.field("current_hash", &self.current_hash)
.field("actor_id", &self.actor_id)
.field("subscribers", &self.subscribers.len())
.finish()
}
}
impl StateChain {
pub fn new(actor_id: TheaterId) -> Self {
Self {
current_hash: None,
actor_id,
subscribers: Vec::new(),
}
}
pub async fn add_typed_event(
&mut self,
event_data: ChainEventData,
) -> Result<ChainEvent, serde_json::Error> {
let start = Instant::now();
let mut event = event_data.to_chain_event(self.current_hash.clone());
let serialized_event = serde_json::to_vec(&event)?;
let content_ref = ContentRef::from_content(&serialized_event);
let hash_bytes = hex::decode(content_ref.hash()).unwrap();
event.hash = hash_bytes.clone();
self.current_hash = Some(event.hash.clone());
let actor_id = self.actor_id;
let dispatch_start = Instant::now();
let subscribers = self.subscribers.len();
let mut closed_indices: Vec<usize> = Vec::new();
for (index, subscriber) in self.subscribers.iter().enumerate() {
if subscriber.send((actor_id, event.clone())).await.is_err() {
error!("Subscriber for actor {:?} closed; evicting", self.actor_id);
closed_indices.push(index);
}
}
for index in closed_indices.into_iter().rev() {
self.subscribers.swap_remove(index);
}
let dispatch_elapsed_ms = dispatch_start.elapsed().as_millis() as u64;
debug!(
phase = "chain.append",
elapsed_ms = start.elapsed().as_millis() as u64,
dispatch_ms = dispatch_elapsed_ms,
subscribers,
actor_id = %self.actor_id,
event_hash = content_ref.hash(),
"chain append complete",
);
Ok(event)
}
pub fn head_hash(&self) -> Option<&[u8]> {
self.current_hash.as_deref()
}
pub fn add_subscriber(&mut self, tx: Sender<(TheaterId, ChainEvent)>) {
self.subscribers.push(tx);
}
pub fn remove_subscriber(&mut self, tx: &Sender<(TheaterId, ChainEvent)>) -> bool {
if let Some(index) = self.subscribers.iter().position(|s| s.same_channel(tx)) {
self.subscribers.swap_remove(index);
true
} else {
false
}
}
}