use std::collections::HashMap;
use std::sync::Arc;
use dactor::actor::{Actor, Handler};
use dactor::node::{ActorId, NodeId};
use dactor::remote::ClusterState;
use dactor::supervision::ChildTerminated;
use dactor::test_support::test_runtime::TestActorRef;
use crate::network::MockNetwork;
use crate::node::MockNode;
pub struct MockCluster {
nodes: HashMap<NodeId, MockNode>,
network: Arc<MockNetwork>,
}
impl MockCluster {
pub fn new(node_ids: &[&str]) -> Self {
let ids: Vec<NodeId> = node_ids.iter().map(|id| NodeId(id.to_string())).collect();
let mut nodes = HashMap::new();
for id in &ids {
let mut node = MockNode::new(id.clone());
for peer in &ids {
if peer != id {
node.connect_peer(peer);
}
}
nodes.insert(id.clone(), node);
}
Self {
nodes,
network: Arc::new(MockNetwork::new()),
}
}
pub fn node(&self, id: &str) -> &MockNode {
self.nodes
.get(&NodeId(id.to_string()))
.unwrap_or_else(|| panic!("node '{}' not found in cluster", id))
}
pub fn node_mut(&mut self, id: &str) -> &mut MockNode {
self.nodes
.get_mut(&NodeId(id.to_string()))
.unwrap_or_else(|| panic!("node '{}' not found in cluster", id))
}
pub fn network(&self) -> &MockNetwork {
&self.network
}
pub fn node_count(&self) -> usize {
self.nodes.len()
}
pub fn node_ids(&self) -> Vec<NodeId> {
self.nodes.keys().cloned().collect()
}
pub fn crash_node(&mut self, id: &str) {
let crashed = NodeId(id.to_string());
self.nodes.remove(&crashed);
for node in self.nodes.values_mut() {
node.disconnect_peer(&crashed);
}
}
pub fn restart_node(&mut self, id: &str) {
let node_id = NodeId(id.to_string());
let mut new_node = MockNode::new(node_id.clone());
for peer_id in self.nodes.keys() {
new_node.connect_peer(peer_id);
}
for node in self.nodes.values_mut() {
node.connect_peer(&node_id);
}
self.nodes.insert(node_id, new_node);
}
pub fn freeze_node(&mut self, id: &str) -> Option<MockNode> {
self.nodes.remove(&NodeId(id.to_string()))
}
pub fn unfreeze_node(&mut self, node: MockNode) {
self.nodes.insert(node.node_id.clone(), node);
}
pub fn watch<W>(&self, watcher_node: &str, watcher: &TestActorRef<W>, target_id: ActorId)
where
W: Actor + Handler<ChildTerminated> + 'static,
{
let node = self.node(watcher_node);
node.runtime.watch(watcher, target_id);
}
pub fn unwatch(&self, node_id: &str, watcher_id: &ActorId, target_id: &ActorId) {
let node = self.node(node_id);
node.runtime.unwatch(watcher_id, target_id);
}
pub fn state_for(&self, local_node: &str) -> Option<ClusterState> {
let local = NodeId(local_node.into());
if !self.nodes.contains_key(&local) {
return None;
}
let node_ids: Vec<NodeId> = self.nodes.keys().cloned().collect();
let mut state = ClusterState::new(local.clone(), node_ids);
for node_id in &state.nodes {
if node_id != &local {
state.peer_versions.insert(
node_id.clone(),
dactor::remote::PeerVersionInfo {
wire_version: dactor::version::WireVersion::parse(
dactor::version::DACTOR_WIRE_VERSION,
)
.unwrap(),
app_version: None,
adapter: "mock".into(),
},
);
}
}
Some(state)
}
pub fn state(&self) -> Option<ClusterState> {
if self.nodes.is_empty() {
return None;
}
let mut node_ids: Vec<NodeId> = self.nodes.keys().cloned().collect();
node_ids.sort_by(|a, b| a.0.cmp(&b.0));
self.state_for(&node_ids[0].0)
}
pub fn remote_watch(&mut self, target_node: &str, target: ActorId, watcher: ActorId) {
let node = self.node_mut(target_node);
node.watch_manager.watch(target.clone(), watcher);
}
pub fn remote_unwatch(&mut self, target_node: &str, target: &ActorId, watcher: &ActorId) {
let node = self.node_mut(target_node);
node.watch_manager.unwatch(target, watcher);
}
pub fn notify_terminated(
&mut self,
node_id: &str,
terminated: &ActorId,
) -> Vec<dactor::system_actors::WatchNotification> {
let node = self.node_mut(node_id);
node.watch_manager.on_terminated(terminated)
}
pub fn register_cancel(
&mut self,
node_id: &str,
request_id: String,
token: tokio_util::sync::CancellationToken,
) {
let node = self.node_mut(node_id);
node.cancel_manager.register(request_id, token);
}
pub fn cancel_request(
&mut self,
node_id: &str,
request_id: &str,
) -> dactor::system_actors::CancelResponse {
let node = self.node_mut(node_id);
node.cancel_manager.cancel(request_id)
}
}