qrpc 0.1.2

qrpc is a small QUIC + mTLS messaging library
Documentation
use std::sync::Arc;
use std::time::Duration;

use qrpc::{Ctx, PeerConfig, QrpcInstance, QrpcResult, State};

#[path = "common/demo_support.rs"]
mod demo_support;

use demo_support::{
    free_port, generate_ca_and_identity, wait_for_contains, DemoEnvelope, NodeState,
};

#[tokio::main]
async fn main() -> QrpcResult<()> {
    let (ca, cert, key) = generate_ca_and_identity("three_nodes_broadcast");

    let port_a = free_port();
    let port_b = free_port();
    let port_c = free_port();

    let state_a = Arc::new(NodeState::default());
    let state_b = Arc::new(NodeState::default());
    let state_c = Arc::new(NodeState::default());

    let a = QrpcInstance::builder(
        |State(state): State<Arc<NodeState>>,
         _ctx: Ctx<DemoEnvelope>,
         source_peer_id: String,
         msg: DemoEnvelope| async move {
            state
                .received
                .lock()
                .await
                .push(format!("from={} {:?}", source_peer_id, msg));
            Ok(())
        },
    )
        .with_state(Arc::clone(&state_a))
        .with_id("node-a")
        .with_ca_cert(ca.clone())
        .with_identity(cert.clone(), key.clone())
        .with_port(port_a)
        .add_peer(PeerConfig {
            address: "127.0.0.1".to_string(),
            port: port_b,
            server_name: "localhost".to_string(),
            ca_cert_path: Some(ca.clone()),
            expected_id: Some("node-b".to_string()),
        })
        .add_peer(PeerConfig {
            address: "127.0.0.1".to_string(),
            port: port_c,
            server_name: "localhost".to_string(),
            ca_cert_path: Some(ca.clone()),
            expected_id: Some("node-c".to_string()),
        })
        .build()?;

    let b = QrpcInstance::builder(
        |State(state): State<Arc<NodeState>>,
         _ctx: Ctx<DemoEnvelope>,
         source_peer_id: String,
         msg: DemoEnvelope| async move {
            state
                .received
                .lock()
                .await
                .push(format!("from={} {:?}", source_peer_id, msg));
            Ok(())
        },
    )
        .with_state(Arc::clone(&state_b))
        .with_id("node-b")
        .with_ca_cert(ca.clone())
        .with_identity(cert.clone(), key.clone())
        .with_port(port_b)
        .build()?;

    let c = QrpcInstance::builder(
        |State(state): State<Arc<NodeState>>,
         _ctx: Ctx<DemoEnvelope>,
         source_peer_id: String,
         msg: DemoEnvelope| async move {
            state
                .received
                .lock()
                .await
                .push(format!("from={} {:?}", source_peer_id, msg));
            Ok(())
        },
    )
        .with_state(Arc::clone(&state_c))
        .with_id("node-c")
        .with_ca_cert(ca)
        .with_identity(cert, key)
        .with_port(port_c)
        .build()?;

    c.start().await;
    b.start().await;
    a.start().await;

    wait_for_contains(&a, &["node-b", "node-c"], Duration::from_secs(8)).await;
    wait_for_contains(&b, &["node-a"], Duration::from_secs(8)).await;
    wait_for_contains(&c, &["node-a"], Duration::from_secs(8)).await;

    let sent_from_a = a
        .broadcast(&DemoEnvelope::ping("broadcast ping from a"))
        .await?;
    let sent_from_c = c
        .broadcast(&DemoEnvelope::pong("broadcast pong from c"))
        .await?;

    tokio::time::sleep(Duration::from_millis(400)).await;

    println!("sent_from_a={sent_from_a}, sent_from_c={sent_from_c}");
    println!(
        "node-a received={:?}",
        state_a.received.lock().await.as_slice()
    );
    println!(
        "node-b received={:?}",
        state_b.received.lock().await.as_slice()
    );
    println!(
        "node-c received={:?}",
        state_c.received.lock().await.as_slice()
    );

    a.shutdown().await;
    b.shutdown().await;
    c.shutdown().await;

    Ok(())
}