mod node {
include!("../src/bin/node.rs");
}
mod node_group {
include!("../src/bin/node_group.rs");
}
use node::ImplNodeRpc;
use node_group::ImplNodeGroupRpc;
use remote_hash_map::common::client::Client;
use remote_hash_map::common::utils::{data_file, get_endpoint};
use remote_hash_map::rhm::rhm::Rhm;
use remote_hash_map::rpc::node_group_rpc::node_group_rpc_server::NodeGroupRpcServer;
use remote_hash_map::rpc::node_rpc::node_rpc_server::NodeRpcServer;
use std::fs;
use std::net::SocketAddr;
use tokio::sync::oneshot;
use tonic::transport::Server;
async fn create_node(node_ip_port: &str, ng_ip_port: &str) -> ImplNodeRpc {
let node_addr: SocketAddr = node_ip_port.parse().unwrap();
let rhm = Rhm::new(&node_addr.to_string()).await.unwrap();
let mut node_rpc = ImplNodeRpc::new(rhm, node_addr);
let endpoint = get_endpoint(ng_ip_port).unwrap();
node_rpc.ng = Some(endpoint.clone());
node_rpc.attach_to_group().await.unwrap();
node_rpc
}
const NG_IP_PORT: &str = "127.0.0.1:7000";
const NODE1_IP_PORT: &str = "127.0.0.1:7001";
const NODE2_IP_PORT: &str = "127.0.0.1:7002";
#[tokio::test]
async fn test_end_to_end() {
let _ = env_logger::builder().is_test(true).try_init();
let _cleanup = TestCleanup;
let ng_ip_port = NG_IP_PORT;
let node1_ip_port = NODE1_IP_PORT;
let node2_ip_port = NODE2_IP_PORT;
let node_group_addr: SocketAddr = ng_ip_port.parse().unwrap();
let node_group_rpc = ImplNodeGroupRpc::new();
let (node_group_tx, node_group_rx) = oneshot::channel::<()>();
tokio::spawn(async move {
Server::builder()
.add_service(NodeGroupRpcServer::new(node_group_rpc))
.serve_with_shutdown(node_group_addr, async {
node_group_rx.await.ok();
})
.await
.unwrap();
});
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let node1_rpc = create_node(node1_ip_port, ng_ip_port).await;
let (node1_tx, node1_rx) = oneshot::channel::<()>();
tokio::spawn(async move {
Server::builder()
.add_service(NodeRpcServer::new(node1_rpc))
.serve_with_shutdown(node1_ip_port.parse().unwrap(), async {
node1_rx.await.ok();
})
.await
.unwrap();
});
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let node2_rpc = create_node(node2_ip_port, ng_ip_port).await;
let (node2_tx, node2_rx) = oneshot::channel::<()>();
tokio::spawn(async move {
Server::builder()
.add_service(NodeRpcServer::new(node2_rpc))
.serve_with_shutdown(node2_ip_port.parse().unwrap(), async {
node2_rx.await.ok();
})
.await
.unwrap();
});
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
let mut client = Client::connect(&format!("{}", ng_ip_port)).await.unwrap();
let key = "test_key";
let value = "test_value";
let set_result = client.set(key, value).await.unwrap();
assert_eq!(set_result, "Ok".to_string());
let key = "test_key";
let value = "test_value";
let set_result = client.set(key, value).await.unwrap();
assert_eq!(set_result, value);
let get_result = client.get(key).await.unwrap();
assert_eq!(get_result, value);
let node1_file = fs::read(data_file(node1_ip_port)).unwrap();
let node2_file = fs::read(data_file(node2_ip_port)).unwrap();
assert_eq!(node1_file, node2_file, "File contents do not match");
node1_tx.send(()).unwrap();
node2_tx.send(()).unwrap();
node_group_tx.send(()).unwrap();
}
struct TestCleanup;
impl Drop for TestCleanup {
fn drop(&mut self) {
println!("Running cleanup code...");
fs::remove_file(data_file(NODE1_IP_PORT)).unwrap();
fs::remove_file(data_file(NODE2_IP_PORT)).unwrap();
}
}