use std::sync::Arc;
use std::time::Duration;
use qrpc::{Ctx, PeerConfig, QrpcInstance, QrpcResult, State};
#[path = "common/demo_support.rs"]
mod demo_support;
use demo_support::{
free_port, generate_ca_and_identity, wait_for_contains, DemoEnvelope, NodeState,
};
#[tokio::main]
async fn main() -> QrpcResult<()> {
let (ca, cert, key) = generate_ca_and_identity("three_nodes_broadcast");
let port_a = free_port();
let port_b = free_port();
let port_c = free_port();
let state_a = Arc::new(NodeState::default());
let state_b = Arc::new(NodeState::default());
let state_c = Arc::new(NodeState::default());
let a = QrpcInstance::builder(
|State(state): State<Arc<NodeState>>,
_ctx: Ctx<DemoEnvelope>,
source_peer_id: String,
msg: DemoEnvelope| async move {
state
.received
.lock()
.await
.push(format!("from={} {:?}", source_peer_id, msg));
Ok(())
},
)
.with_state(Arc::clone(&state_a))
.with_id("node-a")
.with_ca_cert(ca.clone())
.with_identity(cert.clone(), key.clone())
.with_port(port_a)
.add_peer(PeerConfig {
address: "127.0.0.1".to_string(),
port: port_b,
server_name: "localhost".to_string(),
ca_cert_path: Some(ca.clone()),
expected_id: Some("node-b".to_string()),
})
.add_peer(PeerConfig {
address: "127.0.0.1".to_string(),
port: port_c,
server_name: "localhost".to_string(),
ca_cert_path: Some(ca.clone()),
expected_id: Some("node-c".to_string()),
})
.build()?;
let b = QrpcInstance::builder(
|State(state): State<Arc<NodeState>>,
_ctx: Ctx<DemoEnvelope>,
source_peer_id: String,
msg: DemoEnvelope| async move {
state
.received
.lock()
.await
.push(format!("from={} {:?}", source_peer_id, msg));
Ok(())
},
)
.with_state(Arc::clone(&state_b))
.with_id("node-b")
.with_ca_cert(ca.clone())
.with_identity(cert.clone(), key.clone())
.with_port(port_b)
.build()?;
let c = QrpcInstance::builder(
|State(state): State<Arc<NodeState>>,
_ctx: Ctx<DemoEnvelope>,
source_peer_id: String,
msg: DemoEnvelope| async move {
state
.received
.lock()
.await
.push(format!("from={} {:?}", source_peer_id, msg));
Ok(())
},
)
.with_state(Arc::clone(&state_c))
.with_id("node-c")
.with_ca_cert(ca)
.with_identity(cert, key)
.with_port(port_c)
.build()?;
c.start().await;
b.start().await;
a.start().await;
wait_for_contains(&a, &["node-b", "node-c"], Duration::from_secs(8)).await;
wait_for_contains(&b, &["node-a"], Duration::from_secs(8)).await;
wait_for_contains(&c, &["node-a"], Duration::from_secs(8)).await;
let sent_from_a = a
.broadcast(&DemoEnvelope::ping("broadcast ping from a"))
.await?;
let sent_from_c = c
.broadcast(&DemoEnvelope::pong("broadcast pong from c"))
.await?;
tokio::time::sleep(Duration::from_millis(400)).await;
println!("sent_from_a={sent_from_a}, sent_from_c={sent_from_c}");
println!(
"node-a received={:?}",
state_a.received.lock().await.as_slice()
);
println!(
"node-b received={:?}",
state_b.received.lock().await.as_slice()
);
println!(
"node-c received={:?}",
state_c.received.lock().await.as_slice()
);
a.shutdown().await;
b.shutdown().await;
c.shutdown().await;
Ok(())
}