mod chain_reader;
mod chain_writer;
pub use chain_reader::ChainReader;
pub use chain_writer::{ChainWriter, RunMeta};
use std::fmt;
use std::path::Path;
use std::sync::{Arc, Mutex};
use anyhow::Result;
use console::style;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tokio::sync::mpsc::Sender;
use tracing::{debug, info, warn};
use wasmtime::component::{ComponentType, Lift, Lower};
use crate::events::ChainEventData;
use crate::messages::TheaterCommand;
use crate::store::ContentRef;
use crate::TheaterId;
use theater_chain::event::EventType;
#[derive(Debug, Clone)]
pub struct HttpReplayChain(pub Vec<ChainEvent>);
impl HttpReplayChain {
pub fn events_by_type(&self, event_type: &str) -> Vec<&ChainEvent> {
self.0
.iter()
.filter(|e| e.event_type == event_type)
.collect()
}
pub fn http_incoming_events(&self) -> Vec<&ChainEvent> {
self.events_by_type("wasi:http/incoming-handler@0.2.0/handle")
}
}
#[derive(Debug, Clone, Serialize, Deserialize, ComponentType, Lift, Lower, Eq)]
#[component(record)]
pub struct ChainEvent {
pub hash: Vec<u8>,
#[component(name = "parent-hash")]
pub parent_hash: Option<Vec<u8>>,
#[component(name = "event-type")]
pub event_type: String,
pub data: Vec<u8>,
}
impl ChainEvent {}
impl fmt::Display for ChainEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let hash_str = hex::encode(&self.hash);
let short_hash = if hash_str.len() > 7 {
&hash_str[0..7]
} else {
&hash_str
};
let parent_str = match &self.parent_hash {
Some(ph) => {
let ph_str = hex::encode(ph);
if ph_str.len() > 7 {
format!("(parent: {}...)", &ph_str[0..7])
} else {
format!("(parent: {})", ph_str)
}
}
None => "(root)".to_string(),
};
let content = if let Ok(text) = std::str::from_utf8(&self.data) {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(text) {
if json.is_object() && text.len() < 100 {
serde_json::to_string(&json).unwrap_or_else(|_| text.to_string())
} else {
let preview = if text.len() > 30 {
format!("{}...", &text[0..27])
} else {
text.to_string()
};
format!("'{}'", preview)
}
} else {
let preview = if text.len() > 30 {
format!("{}...", &text[0..27])
} else {
text.to_string()
};
format!("'{}'", preview)
}
} else {
format!("{} bytes of binary data", self.data.len())
};
write!(
f,
"Event[{}] {} {} {}",
short_hash,
parent_str,
style(&self.event_type).cyan(),
content
)
}
}
impl EventType for ChainEvent {
fn event_type(&self) -> String {
self.event_type.clone()
}
fn len(&self) -> usize {
self.data.len()
}
}
impl PartialEq for ChainEvent {
fn eq(&self, other: &Self) -> bool {
self.hash == other.hash
}
}
impl std::hash::Hash for ChainEvent {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.hash.hash(state);
}
}
#[derive(Clone, Serialize)]
pub struct StateChain {
events: Vec<ChainEvent>,
current_hash: Option<Vec<u8>>,
#[serde(skip)]
theater_tx: Sender<TheaterCommand>,
#[serde(skip)]
actor_id: TheaterId,
#[serde(skip)]
chain_writer: Arc<Mutex<Option<ChainWriter>>>,
#[serde(skip)]
event_broadcast: broadcast::Sender<ChainEvent>,
}
impl fmt::Debug for StateChain {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StateChain")
.field("events", &self.events)
.field("current_hash", &self.current_hash)
.field("actor_id", &self.actor_id)
.finish()
}
}
impl StateChain {
pub fn new(actor_id: TheaterId, theater_tx: Sender<TheaterCommand>) -> Self {
let chain_writer = match ChainWriter::new(&actor_id) {
Ok(log) => {
info!("Created run log at {:?}", log.path());
Some(log)
}
Err(e) => {
warn!("Failed to create run log: {}", e);
None
}
};
let (event_broadcast, _) = broadcast::channel(1024);
Self {
events: Vec::new(),
current_hash: None,
theater_tx,
actor_id,
chain_writer: Arc::new(Mutex::new(chain_writer)),
event_broadcast,
}
}
pub fn set_run_meta(&self, actor_name: Option<String>, manifest_path: Option<String>) {
let meta = RunMeta {
actor_id: self.actor_id.to_string(),
actor_name,
manifest_path,
started_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
};
if let Err(e) = ChainWriter::write_meta(&self.actor_id, &meta) {
warn!("Failed to write run meta: {}", e);
}
}
pub fn add_typed_event(
&mut self,
event_data: ChainEventData,
) -> Result<ChainEvent, serde_json::Error> {
let mut event = event_data.to_chain_event(self.current_hash.clone());
let serialized_event = serde_json::to_vec(&event)?;
let content_ref = ContentRef::from_content(&serialized_event);
let hash_bytes = hex::decode(content_ref.hash()).unwrap();
event.hash = hash_bytes.clone();
self.events.push(event.clone());
self.current_hash = Some(event.hash.clone());
if let Ok(mut guard) = self.chain_writer.lock() {
if let Some(ref mut log) = *guard {
if let Err(e) = log.append(&event) {
warn!("Failed to append to run log: {}", e);
}
}
}
if let Err(e) = self.theater_tx.try_send(TheaterCommand::NewEvent {
actor_id: self.actor_id,
event: event.clone(),
}) {
warn!("Failed to send event notification: {}", e);
}
let _ = self.event_broadcast.send(event.clone());
debug!(
"Stored event {} in content store for actor {}",
content_ref.hash(),
self.actor_id
);
Ok(event)
}
pub fn verify(&self) -> bool {
let mut prev_hash = None;
for event in &self.events {
let temp_event = ChainEvent {
hash: vec![],
parent_hash: prev_hash.clone(),
event_type: event.event_type.clone(),
data: event.data.clone(),
};
let serialized_event = match serde_json::to_vec(&temp_event) {
Ok(data) => data,
Err(_) => return false,
};
let content_ref = ContentRef::from_content(&serialized_event);
let hash_bytes = match hex::decode(content_ref.hash()) {
Ok(bytes) => bytes,
Err(_) => return false,
};
if hash_bytes != event.hash {
return false;
}
prev_hash = Some(event.hash.clone());
}
true
}
pub fn save_to_file(&self, path: &Path) -> Result<()> {
let json = serde_json::to_string_pretty(self)?;
std::fs::write(path, json)?;
Ok(())
}
pub fn save_chain(&self) -> Result<()> {
let theater_dir = std::env::var("THEATER_HOME").expect(
"THEATER_HOME environment variable must be set to the directory where events are stored",
);
let events_dir = format!("{}/events", theater_dir);
let chains_dir = format!("{}/chains", theater_dir);
std::fs::create_dir_all(&events_dir)?;
std::fs::create_dir_all(&chains_dir)?;
let chain_path = format!("{}/{}", chains_dir, self.actor_id);
for event in &self.events {
let event_path = format!("{}/{}", events_dir, hex::encode(&event.hash));
std::fs::write(
event_path,
serde_json::to_string(event).expect("Failed to serialize event"),
)
.expect("Failed to write event file");
}
std::fs::write(
chain_path,
serde_json::to_string(&self.current_hash).expect("Failed to serialize current hash"),
)
.expect("Failed to write chain file");
Ok(())
}
pub fn get_last_event(&self) -> Option<&ChainEvent> {
self.events.last()
}
pub fn get_events(&self) -> &[ChainEvent] {
&self.events
}
pub fn subscribe(&self) -> broadcast::Receiver<ChainEvent> {
self.event_broadcast.subscribe()
}
}