use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use tokio::sync::{broadcast, oneshot};
use tracing::{error, info};
use crate::actor::handle::ActorHandle;
use crate::actor::store::ActorStore;
use crate::chain::ChainEvent;
use crate::events::wasm::WasmEventData;
use crate::events::ChainEventPayload;
use crate::handler::{Handler, HandlerContext, SharedActorInstance};
use crate::pack_bridge::{HostLinkerBuilder, InterfaceImpl, LinkerError, TypeHash};
use crate::shutdown::ShutdownReceiver;
#[derive(Clone)]
pub struct ReplayState {
events: Arc<Vec<ChainEvent>>,
position: Arc<Mutex<usize>>,
interfaces: Arc<Vec<String>>,
}
impl ReplayState {
pub fn new(events: Vec<ChainEvent>) -> Self {
let interfaces: Vec<String> = events
.iter()
.filter_map(|event| {
if event.event_type.contains('/') {
let parts: Vec<&str> = event.event_type.rsplitn(2, '/').collect();
if parts.len() == 2 {
Some(parts[1].to_string())
} else {
None
}
} else {
None
}
})
.collect::<HashSet<_>>()
.into_iter()
.collect();
Self {
events: Arc::new(events),
position: Arc::new(Mutex::new(0)),
interfaces: Arc::new(interfaces),
}
}
pub fn current_event(&self) -> Option<ChainEvent> {
let pos = *self.position.lock().unwrap();
self.events.get(pos).cloned()
}
pub fn current_output(&self) -> Option<packr::abi::Value> {
let event = self.current_event()?;
let call = crate::events::decode_host_function_call(&event.data)?;
Some(call.output)
}
pub fn advance(&self) {
let mut pos = self.position.lock().unwrap();
*pos += 1;
}
pub fn current_position(&self) -> usize {
*self.position.lock().unwrap()
}
pub fn total_events(&self) -> usize {
self.events.len()
}
pub fn is_complete(&self) -> bool {
self.current_position() >= self.events.len()
}
pub fn verify_hash(&self, actual_hash: &[u8]) -> Result<(), String> {
let pos = self.current_position();
let expected = self
.events
.get(pos)
.ok_or_else(|| format!("No expected event at position {}", pos))?;
if actual_hash != expected.hash {
return Err(format!(
"Hash mismatch at position {}: expected {}, got {}",
pos,
hex::encode(&expected.hash),
hex::encode(actual_hash)
));
}
Ok(())
}
pub fn interfaces(&self) -> Vec<String> {
(*self.interfaces).clone()
}
}
#[derive(Clone)]
pub struct ReplayHandler {
state: ReplayState,
}
impl ReplayHandler {
pub fn new(expected_chain: Vec<ChainEvent>) -> Self {
Self {
state: ReplayState::new(expected_chain),
}
}
pub fn state(&self) -> &ReplayState {
&self.state
}
pub fn progress(&self) -> (usize, usize) {
(self.state.current_position(), self.state.total_events())
}
pub fn interfaces(&self) -> Vec<InterfaceImpl> {
vec![]
}
}
impl Handler for ReplayHandler {
fn create_instance(
&self,
_config: Option<&crate::config::actor_manifest::HandlerConfig>,
) -> Box<dyn Handler> {
Box::new(self.clone())
}
fn setup(
&mut self,
actor_handle: ActorHandle,
_actor_instance: SharedActorInstance,
shutdown_receiver: ShutdownReceiver,
mut event_rx: broadcast::Receiver<ChainEvent>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
let expected_events = (*self.state.events).clone();
Box::pin(async move {
let total_expected = expected_events.len();
let mut shutdown_rx = shutdown_receiver.receiver;
let calls_to_replay: Vec<(usize, String, Vec<u8>)> = expected_events
.iter()
.enumerate()
.filter_map(|(idx, event)| {
let payload = crate::events::decode_chain_event_payload(&event.data)?;
match payload {
ChainEventPayload::Wasm(WasmEventData::WasmCall {
function_name,
params,
}) => Some((idx, function_name, params)),
_ => None,
}
})
.collect();
info!(
"Replay: found {} WasmCall events to replay out of {} total events",
calls_to_replay.len(),
total_expected
);
let (divergence_tx, mut divergence_rx) = oneshot::channel::<String>();
let expected_events_for_verify = expected_events.clone();
let verification_task = tokio::spawn(async move {
let mut verified_position = 0usize;
loop {
match event_rx.recv().await {
Ok(actual_event) => {
if verified_position >= expected_events_for_verify.len() {
let msg = format!(
"Divergence: received event {} but expected chain has only {} events\n extra event type: {}",
verified_position + 1,
expected_events_for_verify.len(),
actual_event.event_type
);
let _ = divergence_tx.send(msg);
return verified_position;
}
let expected_event = &expected_events_for_verify[verified_position];
if actual_event.hash != expected_event.hash {
let expected_hex = hex::encode(&expected_event.hash);
let actual_hex = hex::encode(&actual_event.hash);
let truncate_hash = |h: &str| {
if h.len() > 16 {
format!("{}..{}", &h[..8], &h[h.len() - 8..])
} else {
h.to_string()
}
};
let msg = format!(
"Divergence at event {} [{}]\n expected: {}\n actual: {}",
verified_position,
expected_event.event_type,
truncate_hash(&expected_hex),
truncate_hash(&actual_hex)
);
let _ = divergence_tx.send(msg);
return verified_position;
}
verified_position += 1;
if verified_position % 10000 == 0 {
info!(
"Replay: streaming verify progress {}/{}",
verified_position,
expected_events_for_verify.len()
);
}
if verified_position >= expected_events_for_verify.len() {
info!("Replay: streaming verification complete - all {} events verified", verified_position);
return verified_position;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
error!("Replay verification lagged by {} events - verification may be incomplete", n);
}
Err(broadcast::error::RecvError::Closed) => {
return verified_position;
}
}
}
});
let total_calls = calls_to_replay.len();
for (call_num, (_idx, function_name, params)) in calls_to_replay.into_iter().enumerate()
{
if call_num % 1000 == 0 {
info!("Replay: progress {}/{} calls", call_num, total_calls);
}
match divergence_rx.try_recv() {
Ok(msg) => {
return Err(anyhow::anyhow!("Replay stopped: {}", msg));
}
Err(oneshot::error::TryRecvError::Closed) => {
}
Err(oneshot::error::TryRecvError::Empty) => {
}
}
let call_result = tokio::select! {
result = actor_handle.call_function_pack_void(function_name.clone(), params) => result,
_ = &mut shutdown_rx => {
info!("Replay: shutdown received, stopping at call {}", call_num);
return Ok(());
}
};
if let Err(e) = call_result {
return Err(anyhow::anyhow!(
"Replay failed at {} (call {}): {:?}",
function_name,
call_num,
e
));
}
}
let verified_count = verification_task
.await
.map_err(|e| anyhow::anyhow!("Verification task panicked: {:?}", e))?;
if let Ok(msg) = divergence_rx.try_recv() {
return Err(anyhow::anyhow!("Replay failed: {}", msg));
}
if verified_count != total_expected {
return Err(anyhow::anyhow!(
"Replay produced {} events, expected {}",
verified_count,
total_expected
));
}
info!(
"Replay complete: {}/{} events verified via streaming",
total_expected, total_expected
);
Ok(())
})
}
fn setup_host_functions_composite(
&mut self,
_builder: &mut HostLinkerBuilder<'_, ActorStore>,
_ctx: &mut HandlerContext,
) -> Result<(), LinkerError> {
Ok(())
}
fn name(&self) -> &str {
"replay"
}
fn imports(&self) -> Option<Vec<String>> {
None
}
fn exports(&self) -> Option<Vec<String>> {
None
}
fn interface_hashes(&self) -> Vec<(String, TypeHash)> {
vec![]
}
fn supports_composite(&self) -> bool {
true
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_replay_state_creation() {
let events = vec![ChainEvent {
hash: vec![1, 2, 3, 4],
parent_hash: None,
event_type: "theater:simple/runtime/log".to_string(),
data: vec![],
}];
let state = ReplayState::new(events);
assert_eq!(state.current_position(), 0);
assert_eq!(state.total_events(), 1);
assert!(!state.is_complete());
let interfaces = state.interfaces();
assert!(interfaces.contains(&"theater:simple/runtime".to_string()));
}
#[test]
fn test_replay_state_advance() {
let events = vec![
ChainEvent {
hash: vec![1, 2, 3, 4],
parent_hash: None,
event_type: "test".to_string(),
data: vec![],
},
ChainEvent {
hash: vec![5, 6, 7, 8],
parent_hash: Some(vec![1, 2, 3, 4]),
event_type: "test2".to_string(),
data: vec![],
},
];
let state = ReplayState::new(events);
assert_eq!(state.current_position(), 0);
state.advance();
assert_eq!(state.current_position(), 1);
state.advance();
assert_eq!(state.current_position(), 2);
assert!(state.is_complete());
}
#[test]
fn test_replay_handler_creation() {
let events = vec![ChainEvent {
hash: vec![1, 2, 3, 4],
parent_hash: None,
event_type: "theater:simple/runtime/log".to_string(),
data: vec![],
}];
let handler = ReplayHandler::new(events);
assert_eq!(handler.state().total_events(), 1);
let (current, total) = handler.progress();
assert_eq!(current, 0);
assert_eq!(total, 1);
}
#[test]
fn test_replay_handler_interface_hashes() {
let events = vec![ChainEvent {
hash: vec![1, 2, 3, 4],
parent_hash: None,
event_type: "theater:simple/runtime/log".to_string(),
data: vec![],
}];
let handler = ReplayHandler::new(events);
let hashes = handler.interface_hashes();
assert!(hashes.is_empty());
assert!(handler.interfaces().is_empty());
}
}