mod config;
mod error;
mod instance;
mod message;
mod protocol;
mod tls;
pub use config::PeerConfig;
pub use error::{QrpcError, QrpcResult};
pub use instance::{
ConnectionFailureKind, OutboundCmd, PeerConnectionEvent, QrpcInstance, QrpcInstanceBuilder,
WithState, WithoutState,
};
pub use message::{Ctx, FromRef, QrpcCallback, QrpcMessage, State};
#[cfg(test)]
mod tests {
use std::net::TcpListener;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use crate::{
Ctx, OutboundCmd, PeerConfig, QrpcError, QrpcInstance, QrpcMessage, QrpcResult, State,
};
#[derive(Debug, Clone)]
struct TestMsg {
cmd: u32,
body: Vec<u8>,
}
impl TestMsg {
fn new(cmd: u32, body: impl Into<Vec<u8>>) -> Self {
Self {
cmd,
body: body.into(),
}
}
}
impl QrpcMessage for TestMsg {
fn cmd_id(&self) -> u32 {
self.cmd
}
fn encode_vec(&self) -> Vec<u8> {
self.body.clone()
}
fn decode_vec(cmd_id: u32, data: &[u8]) -> QrpcResult<Self> {
Ok(Self::new(cmd_id, data.to_vec()))
}
}
#[derive(Default)]
struct SharedState {
received: Mutex<Vec<(String, u32, Vec<u8>)>>,
}
fn free_port() -> u16 {
TcpListener::bind("127.0.0.1:0")
.expect("bind temp listener")
.local_addr()
.expect("read local addr")
.port()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn full_service_flow_register_send_broadcast_shutdown() {
let ca = "tests/certs/ca.crt";
let cert_client = "tests/certs/client.crt";
let key_client = "tests/certs/client.key";
let cert_server = "tests/certs/server.crt";
let key_server = "tests/certs/server.key";
let a_state = Arc::new(SharedState::default());
let b_state = Arc::new(SharedState::default());
let a_port = free_port();
let b_port = free_port();
let a = QrpcInstance::builder(
|State(state): State<Arc<SharedState>>,
_ctx: Ctx<TestMsg>,
source_peer_id: String,
msg: TestMsg| async move {
state.received.lock().await.push((
source_peer_id.to_string(),
msg.cmd_id(),
msg.encode_vec(),
));
Ok(())
},
)
.with_state(Arc::clone(&a_state))
.with_id("node-a")
.with_ca_cert(ca)
.with_identity(cert_client, key_client)
.with_port(a_port)
.add_peer(PeerConfig {
address: "127.0.0.1".to_string(),
port: b_port,
server_name: "localhost".to_string(),
ca_cert_path: Some(ca.to_string()),
expected_id: Some("node-b".to_string()),
})
.build()
.expect("build a");
let b = QrpcInstance::builder(
|State(state): State<Arc<SharedState>>,
_ctx: Ctx<TestMsg>,
source_peer_id: String,
msg: TestMsg| async move {
state.received.lock().await.push((
source_peer_id.to_string(),
msg.cmd_id(),
msg.encode_vec(),
));
Ok(())
},
)
.with_state(Arc::clone(&b_state))
.with_id("node-b")
.with_ca_cert(ca)
.with_identity(cert_server, key_server)
.with_port(b_port)
.build()
.expect("build b");
b.start().await;
a.start().await;
let peer_start = tokio::time::Instant::now();
loop {
let a_peers = a.peer_ids().await;
let b_peers = b.peer_ids().await;
if a_peers.contains(&"node-b".to_string()) && b_peers.contains(&"node-a".to_string()) {
break;
}
assert!(
peer_start.elapsed() < Duration::from_secs(8),
"peer register timeout"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
a.send_to("node-b", &TestMsg::new(11, b"to-b".to_vec()))
.await
.expect("a -> b send");
let recv_b_start = tokio::time::Instant::now();
loop {
let got = b_state
.received
.lock()
.await
.iter()
.any(|(peer, cmd, body)| peer == "node-a" && *cmd == 11 && body == b"to-b");
if got {
break;
}
assert!(
recv_b_start.elapsed() < Duration::from_secs(5),
"b receive timeout"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
let sent = b
.broadcast(&TestMsg::new(22, b"hello-all".to_vec()))
.await
.expect("broadcast from b");
assert_eq!(sent, 1);
let recv_a_start = tokio::time::Instant::now();
loop {
let got = a_state
.received
.lock()
.await
.iter()
.any(|(peer, cmd, body)| peer == "node-b" && *cmd == 22 && body == b"hello-all");
if got {
break;
}
assert!(
recv_a_start.elapsed() < Duration::from_secs(5),
"a receive timeout"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
a.shutdown().await;
b.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn callback_ctx_reply_works() {
let ca = "tests/certs/ca.crt";
let cert_client = "tests/certs/client.crt";
let key_client = "tests/certs/client.key";
let cert_server = "tests/certs/server.crt";
let key_server = "tests/certs/server.key";
let a_state = Arc::new(SharedState::default());
let b_state = Arc::new(SharedState::default());
let a_port = free_port();
let b_port = free_port();
let a = QrpcInstance::builder(
|State(state): State<Arc<SharedState>>,
_ctx: Ctx<TestMsg>,
source_peer_id: String,
msg: TestMsg| async move {
state.received.lock().await.push((
source_peer_id.to_string(),
msg.cmd_id(),
msg.encode_vec(),
));
Ok(())
},
)
.with_state(Arc::clone(&a_state))
.with_id("node-a")
.with_ca_cert(ca)
.with_identity(cert_client, key_client)
.with_port(a_port)
.add_peer(PeerConfig {
address: "127.0.0.1".to_string(),
port: b_port,
server_name: "localhost".to_string(),
ca_cert_path: Some(ca.to_string()),
expected_id: Some("node-b".to_string()),
})
.build()
.expect("build a");
let b = QrpcInstance::builder(
|State(state): State<Arc<SharedState>>,
ctx: Ctx<TestMsg>,
source_peer_id: String,
msg: TestMsg| async move {
state.received.lock().await.push((
source_peer_id.to_string(),
msg.cmd_id(),
msg.encode_vec(),
));
if msg.cmd_id() == 31 {
ctx.send_to(&source_peer_id, &TestMsg::new(32, msg.encode_vec()))
.await?;
}
Ok(())
},
)
.with_state(Arc::clone(&b_state))
.with_id("node-b")
.with_ca_cert(ca)
.with_identity(cert_server, key_server)
.with_port(b_port)
.build()
.expect("build b");
b.start().await;
a.start().await;
a.wait_for_peer("node-b", Duration::from_secs(8))
.await
.expect("a sees b");
b.wait_for_peer("node-a", Duration::from_secs(8))
.await
.expect("b sees a");
a.send_to("node-b", &TestMsg::new(31, b"need-reply".to_vec()))
.await
.expect("a -> b");
let recv_a_start = tokio::time::Instant::now();
loop {
let got = a_state
.received
.lock()
.await
.iter()
.any(|(peer, cmd, body)| peer == "node-b" && *cmd == 32 && body == b"need-reply");
if got {
break;
}
assert!(
recv_a_start.elapsed() < Duration::from_secs(5),
"a reply receive timeout"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
a.shutdown().await;
b.shutdown().await;
}
#[tokio::test]
async fn send_to_unknown_peer_returns_error() {
let ca = "tests/certs/ca.crt";
let cert = "tests/certs/server.crt";
let key = "tests/certs/server.key";
let ins = QrpcInstance::builder(
|_state: State<Arc<SharedState>>,
_ctx: Ctx<TestMsg>,
_source_peer_id: String,
_msg: TestMsg| async move { Ok(()) },
)
.with_state(Arc::new(SharedState::default()))
.with_ca_cert(ca)
.with_identity(cert, key)
.with_port(free_port())
.build()
.expect("build instance");
let err = ins
.send_to("not-exist", &TestMsg::new(1, vec![]))
.await
.expect_err("must fail");
assert!(matches!(err, QrpcError::PeerNotFound(_)));
ins.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn serve_with_rx_send_to_waits_for_peer_connection() {
let ca = "tests/certs/ca.crt";
let cert_client = "tests/certs/client.crt";
let key_client = "tests/certs/client.key";
let cert_server = "tests/certs/server.crt";
let key_server = "tests/certs/server.key";
let b_state = Arc::new(SharedState::default());
let a_port = free_port();
let b_port = free_port();
let a = Arc::new(
QrpcInstance::builder(
|_state: State<()>,
_ctx: Ctx<TestMsg>,
_source_peer_id: String,
_msg: TestMsg| async move { Ok(()) },
)
.with_state(())
.with_id("node-a")
.with_ca_cert(ca)
.with_identity(cert_server, key_server)
.with_port(a_port)
.build()
.expect("build a"),
);
let b = QrpcInstance::builder(
|State(state): State<Arc<SharedState>>,
_ctx: Ctx<TestMsg>,
source_peer_id: String,
msg: TestMsg| async move {
state.received.lock().await.push((
source_peer_id.to_string(),
msg.cmd_id(),
msg.encode_vec(),
));
Ok(())
},
)
.with_state(Arc::clone(&b_state))
.with_id("node-b")
.with_ca_cert(ca)
.with_identity(cert_client, key_client)
.with_client_identity(cert_client, key_client)
.with_port(b_port)
.add_peer(PeerConfig {
address: "127.0.0.1".to_string(),
port: a_port,
server_name: "localhost".to_string(),
ca_cert_path: Some(ca.to_string()),
expected_id: Some("node-a".to_string()),
})
.build()
.expect("build b");
let (tx, rx) = tokio::sync::mpsc::channel(8);
let serve_instance = Arc::clone(&a);
let serve_task = tokio::spawn(async move { serve_instance.serve_with_rx(rx).await });
tx.send(OutboundCmd::SendTo {
peer_id: "node-b".to_string(),
message: TestMsg::new(41, b"from-rx".to_vec()),
})
.await
.expect("enqueue send command");
tokio::time::sleep(Duration::from_millis(120)).await;
b.start().await;
let recv_start = tokio::time::Instant::now();
loop {
let got = b_state
.received
.lock()
.await
.iter()
.any(|(peer, cmd, body)| peer == "node-a" && *cmd == 41 && body == b"from-rx");
if got {
break;
}
assert!(
recv_start.elapsed() < Duration::from_secs(5),
"b did not receive send_to command message"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
a.shutdown().await;
b.shutdown().await;
tokio::time::timeout(Duration::from_secs(1), serve_task)
.await
.expect("serve task should exit after shutdown")
.expect("serve task join ok")
.expect("serve_with_rx should return Ok");
}
#[tokio::test]
async fn serve_with_rx_broadcast_without_peers_is_noop() {
let ca = "tests/certs/ca.crt";
let cert = "tests/certs/server.crt";
let key = "tests/certs/server.key";
let ins = Arc::new(
QrpcInstance::builder(
|_state: State<()>,
_ctx: Ctx<TestMsg>,
_source_peer_id: String,
_msg: TestMsg| async move { Ok(()) },
)
.with_state(())
.with_id("node-solo")
.with_ca_cert(ca)
.with_identity(cert, key)
.with_port(free_port())
.build()
.expect("build instance"),
);
let (tx, rx) = tokio::sync::mpsc::channel(4);
let serve_instance = Arc::clone(&ins);
let serve_task = tokio::spawn(async move { serve_instance.serve_with_rx(rx).await });
tx.send(OutboundCmd::Broadcast {
message: TestMsg::new(7, b"noop".to_vec()),
})
.await
.expect("enqueue broadcast command");
tokio::time::sleep(Duration::from_millis(80)).await;
assert!(
!serve_task.is_finished(),
"serve_with_rx should keep serving after no-op broadcast"
);
ins.shutdown().await;
tokio::time::timeout(Duration::from_secs(1), serve_task)
.await
.expect("serve task should exit after shutdown")
.expect("serve task join ok")
.expect("serve_with_rx should return Ok");
}
}