use std::collections::HashMap;
use std::process::{Child, Command, Stdio};
use std::time::Duration;
use crate::events::EventStream;
use crate::protocol::test_node_service_client::TestNodeServiceClient;
use crate::protocol::*;
pub struct TestNodeHandle {
pub process: Child,
pub node_id: String,
pub control_port: u16,
client: Option<TestNodeServiceClient<tonic::transport::Channel>>,
}
pub struct TestCluster {
nodes: HashMap<String, TestNodeHandle>,
}
pub struct TestClusterBuilder {
nodes: Vec<(String, String, Vec<String>, u16)>,
}
impl TestCluster {
pub fn builder() -> TestClusterBuilder {
TestClusterBuilder { nodes: Vec::new() }
}
pub async fn ping(
&mut self,
node_id: &str,
echo: &str,
) -> Result<PingResponse, Box<dyn std::error::Error>> {
let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
let client = handle.client.as_mut().ok_or("not connected")?;
let response = client
.ping(PingRequest {
echo: echo.to_string(),
})
.await?;
Ok(response.into_inner())
}
pub async fn get_node_info(
&mut self,
node_id: &str,
) -> Result<NodeInfoResponse, Box<dyn std::error::Error>> {
let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
let client = handle.client.as_mut().ok_or("not connected")?;
let response = client.get_node_info(Empty {}).await?;
Ok(response.into_inner())
}
pub async fn inject_fault(
&mut self,
node_id: &str,
fault_type: &str,
target: &str,
duration_ms: u64,
count: u32,
) -> Result<(), Box<dyn std::error::Error>> {
let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
let client = handle.client.as_mut().ok_or("not connected")?;
client
.inject_fault(FaultRequest {
fault_type: fault_type.to_string(),
target: target.to_string(),
duration_ms,
count,
detail: String::new(),
})
.await?;
Ok(())
}
pub async fn clear_faults(&mut self, node_id: &str) -> Result<(), Box<dyn std::error::Error>> {
let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
let client = handle.client.as_mut().ok_or("not connected")?;
client.clear_faults(Empty {}).await?;
Ok(())
}
pub async fn subscribe_events(
&mut self,
node_id: &str,
event_types: &[&str],
) -> Result<EventStream, Box<dyn std::error::Error>> {
let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
let client = handle.client.as_mut().ok_or("not connected")?;
let response = client
.subscribe_events(EventFilter {
event_types: event_types.iter().map(|s| s.to_string()).collect(),
})
.await?;
Ok(EventStream::new(response.into_inner()))
}
pub async fn custom(
&mut self,
node_id: &str,
command_type: &str,
payload: &[u8],
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
let client = handle.client.as_mut().ok_or("not connected")?;
let response = client
.custom_command(CustomRequest {
command_type: command_type.to_string(),
payload: payload.to_vec(),
})
.await?;
Ok(response.into_inner().payload)
}
pub async fn spawn_actor(
&mut self,
node_id: &str,
actor_type: &str,
actor_name: &str,
args: &[u8],
) -> Result<SpawnActorResponse, Box<dyn std::error::Error>> {
let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
let client = handle.client.as_mut().ok_or("not connected")?;
let response = client
.spawn_actor(SpawnActorRequest {
actor_type: actor_type.to_string(),
actor_name: actor_name.to_string(),
args: args.to_vec(),
})
.await?;
Ok(response.into_inner())
}
pub async fn tell_actor(
&mut self,
node_id: &str,
actor_name: &str,
message_type: &str,
payload: &[u8],
) -> Result<TellActorResponse, Box<dyn std::error::Error>> {
let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
let client = handle.client.as_mut().ok_or("not connected")?;
let response = client
.tell_actor(TellActorRequest {
actor_name: actor_name.to_string(),
message_type: message_type.to_string(),
payload: payload.to_vec(),
})
.await?;
Ok(response.into_inner())
}
pub async fn ask_actor(
&mut self,
node_id: &str,
actor_name: &str,
message_type: &str,
payload: &[u8],
) -> Result<AskActorResponse, Box<dyn std::error::Error>> {
let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
let client = handle.client.as_mut().ok_or("not connected")?;
let response = client
.ask_actor(AskActorRequest {
actor_name: actor_name.to_string(),
message_type: message_type.to_string(),
payload: payload.to_vec(),
timeout_ms: 0,
})
.await?;
Ok(response.into_inner())
}
pub async fn ask_actor_with_timeout(
&mut self,
node_id: &str,
actor_name: &str,
message_type: &str,
payload: &[u8],
timeout_ms: u64,
) -> Result<AskActorResponse, Box<dyn std::error::Error>> {
let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
let client = handle.client.as_mut().ok_or("not connected")?;
let response = client
.ask_actor(AskActorRequest {
actor_name: actor_name.to_string(),
message_type: message_type.to_string(),
payload: payload.to_vec(),
timeout_ms,
})
.await?;
Ok(response.into_inner())
}
pub async fn stop_actor(
&mut self,
node_id: &str,
actor_name: &str,
) -> Result<StopActorResponse, Box<dyn std::error::Error>> {
let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
let client = handle.client.as_mut().ok_or("not connected")?;
let response = client
.stop_actor(StopActorRequest {
actor_name: actor_name.to_string(),
})
.await?;
Ok(response.into_inner())
}
pub async fn watch_actor(
&mut self,
node_id: &str,
watcher_name: &str,
target_name: &str,
) -> Result<WatchActorResponse, Box<dyn std::error::Error>> {
let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
let client = handle.client.as_mut().ok_or("not connected")?;
let response = client
.watch_actor(WatchActorRequest {
watcher_name: watcher_name.to_string(),
target_name: target_name.to_string(),
})
.await?;
Ok(response.into_inner())
}
pub async fn shutdown_node(&mut self, node_id: &str) -> Result<(), Box<dyn std::error::Error>> {
if let Some(handle) = self.nodes.get_mut(node_id) {
if let Some(client) = handle.client.as_mut() {
let _ = client
.shutdown(ShutdownRequest {
graceful: true,
timeout_ms: 5000,
})
.await;
}
let _ = handle.process.kill();
let _ = handle.process.wait();
}
Ok(())
}
pub async fn shutdown(&mut self) {
let node_ids: Vec<String> = self.nodes.keys().cloned().collect();
for node_id in node_ids {
let _ = self.shutdown_node(&node_id).await;
}
}
}
impl Drop for TestCluster {
fn drop(&mut self) {
for (_, handle) in self.nodes.iter_mut() {
let _ = handle.process.kill();
let _ = handle.process.wait(); }
}
}
impl TestClusterBuilder {
pub fn node(mut self, node_id: &str, binary: &str, args: &[&str], port: u16) -> Self {
self.nodes.push((
node_id.to_string(),
binary.to_string(),
args.iter().map(|s| s.to_string()).collect(),
port,
));
self
}
pub async fn build(self) -> TestCluster {
let mut nodes: HashMap<String, TestNodeHandle> = HashMap::new();
for (node_id, binary, args, port) in self.nodes {
let mut process = Command::new(&binary)
.args(&args)
.env("DACTOR_NODE_ID", &node_id)
.env("DACTOR_CONTROL_PORT", port.to_string())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.unwrap_or_else(|e| {
for (_, handle) in nodes.iter_mut() {
let _ = handle.process.kill();
let _ = handle.process.wait();
}
panic!(
"Failed to launch test node '{}' ({}): {}",
node_id, binary, e
)
});
let addr = format!("http://127.0.0.1:{}", port);
let mut client = None;
for _ in 0..50 {
tokio::time::sleep(Duration::from_millis(100)).await;
match TestNodeServiceClient::connect(addr.clone()).await {
Ok(c) => {
client = Some(c);
break;
}
Err(_) => continue,
}
}
if client.is_none() {
let _ = process.kill();
let _ = process.wait();
for (_, handle) in nodes.iter_mut() {
let _ = handle.process.kill();
let _ = handle.process.wait();
}
panic!(
"Failed to connect to test node '{}' at {} after 5s of retries",
node_id, addr
);
}
nodes.insert(
node_id.clone(),
TestNodeHandle {
process,
node_id,
control_port: port,
client,
},
);
}
TestCluster { nodes }
}
}