use std::time::Duration;
use net_sdk::capabilities::CapabilitySet;
use net_sdk::mesh::MeshBuilder;
use net_sdk::mesh_rpc::{CallOptions, CallOptionsTyped, Codec, RoutingPolicy, RpcError};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct EchoRequest {
message: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct EchoResponse {
echoed: String,
server_label: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct AddRequest {
a: i64,
b: i64,
}
#[derive(Debug, Serialize, Deserialize)]
struct AddResponse {
sum: i64,
}
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> net_sdk::error::Result<()> {
let psk = [0x42u8; 32];
let server = MeshBuilder::new("127.0.0.1:0", &psk)?.build().await?;
let caller = MeshBuilder::new("127.0.0.1:0", &psk)?.build().await?;
println!("server: node_id={:#x}", server.inner().node_id());
println!("caller: node_id={:#x}", caller.inner().node_id());
let server_addr = server.inner().local_addr();
let server_pub = *server.inner().public_key();
let server_id = server.inner().node_id();
let caller_id = caller.inner().node_id();
let (accept_res, connect_res) = tokio::join!(server.inner().accept(caller_id), async {
tokio::time::sleep(Duration::from_millis(50)).await;
caller
.inner()
.connect(server_addr, &server_pub, server_id)
.await
});
accept_res.map_err(|e| net_sdk::error::SdkError::Config(format!("accept: {e}")))?;
connect_res.map_err(|e| net_sdk::error::SdkError::Config(format!("connect: {e}")))?;
server.inner().start();
caller.inner().start();
let _echo_handle = server
.serve_rpc_typed("echo", Codec::Json, |req: EchoRequest| async move {
Ok(EchoResponse {
echoed: req.message,
server_label: "primary".to_string(),
})
})
.map_err(|e| net_sdk::error::SdkError::Config(format!("serve echo: {e}")))?;
let _add_handle = server
.serve_rpc_typed("add", Codec::Json, |req: AddRequest| async move {
if req.a < 0 || req.b < 0 {
Err(format!(
"this demo refuses negative inputs: a={}, b={}",
req.a, req.b
))
} else {
Ok(AddResponse { sum: req.a + req.b })
}
})
.map_err(|e| net_sdk::error::SdkError::Config(format!("serve add: {e}")))?;
server
.inner()
.announce_capabilities(CapabilitySet::new())
.await
.map_err(|e| net_sdk::error::SdkError::Config(format!("announce: {e}")))?;
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
while tokio::time::Instant::now() < deadline {
if !caller.find_service_nodes("echo").is_empty()
&& !caller.find_service_nodes("add").is_empty()
{
break;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
println!(
"discovered: echo on {} node(s), add on {} node(s)",
caller.find_service_nodes("echo").len(),
caller.find_service_nodes("add").len(),
);
let resp: EchoResponse = caller
.call_typed(
server.inner().node_id(),
"echo",
&EchoRequest {
message: "hello, mesh".to_string(),
},
CallOptionsTyped::default(),
)
.await
.map_err(|e| net_sdk::error::SdkError::Config(format!("call echo: {e}")))?;
println!(
"direct call -> echo: {} (from {})",
resp.echoed, resp.server_label
);
let resp: AddResponse = caller
.call_service_typed(
"add",
&AddRequest { a: 5, b: 7 },
CallOptionsTyped {
raw: CallOptions {
routing_policy: RoutingPolicy::RoundRobin,
..Default::default()
},
codec: Codec::Json,
},
)
.await
.map_err(|e| net_sdk::error::SdkError::Config(format!("call add: {e}")))?;
println!("discovery call -> add(5, 7) = {}", resp.sum);
let err = caller
.call_service_typed::<AddRequest, AddResponse>(
"add",
&AddRequest { a: -1, b: 7 },
CallOptionsTyped::default(),
)
.await
.expect_err("negative input must fail");
match err {
RpcError::ServerError { message, .. } => {
println!("expected error caught: {message}");
}
other => println!("unexpected error shape: {other:?}"),
}
println!("done.");
Ok(())
}