use std::sync::Arc;
use std::time::Duration;
use net_sdk::mesh_rpc::{
CallOptions, CallOptionsTyped, Codec, RpcCallEvent, RpcCallStatus, RpcError, RpcObserver,
ServeHandle,
};
use net_sdk::testing::ClusterHarness;
use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;
use crate::streams::{NrpcCall, NrpcStatus, NrpcTail};
#[derive(Debug, Serialize, Deserialize)]
struct EchoRequest {
tick: u64,
}
#[derive(Debug, Serialize, Deserialize)]
struct EchoResponse {
tick: u64,
note: String,
}
const ECHO_SERVICE: &str = "demo.echo";
const CALL_INTERVAL: Duration = Duration::from_millis(250);
const CALL_DEADLINE: Duration = Duration::from_millis(2_000);
const RESPONDER_COUNT: usize = 2;
struct NrpcTailObserver {
tail: NrpcTail,
}
impl RpcObserver for NrpcTailObserver {
fn on_call(&self, evt: RpcCallEvent) {
let status = match evt.status {
RpcCallStatus::Ok => NrpcStatus::Ok,
RpcCallStatus::Error(msg) => NrpcStatus::Error(msg),
RpcCallStatus::Timeout => NrpcStatus::Timeout,
RpcCallStatus::Canceled => NrpcStatus::Error("canceled".to_string()),
};
let call = NrpcCall {
ts_ms: evt.ts_unix_ms,
caller: evt.caller,
callee: evt.callee,
method: evt.method,
latency_ms: evt.latency_ms,
status,
request_bytes: evt.request_bytes,
response_bytes: evt.response_bytes,
};
self.tail.push(call);
}
}
pub fn install_observers(harness: &ClusterHarness, tail: NrpcTail) {
for node in harness.nodes() {
let obs: Arc<dyn RpcObserver> = Arc::new(NrpcTailObserver { tail: tail.clone() });
node.mesh().set_rpc_observer(Some(obs));
}
}
pub fn install_responders(
harness: &ClusterHarness,
) -> Result<Vec<ServeHandle>, color_eyre::Report> {
let mut handles = Vec::new();
for idx in 0..RESPONDER_COUNT {
let node = harness.nth(idx);
let h = node
.mesh()
.serve_rpc_typed(ECHO_SERVICE, Codec::Json, |req: EchoRequest| async move {
Ok::<_, String>(EchoResponse {
tick: req.tick,
note: format!("echoed tick={}", req.tick),
})
})
.map_err(|e| color_eyre::eyre::eyre!("serve_rpc_typed on node[{idx}]: {e:?}"))?;
handles.push(h);
}
Ok(handles)
}
pub fn spawn_requester_loops(harness: &ClusterHarness) -> Vec<JoinHandle<()>> {
let responder_ids: Vec<u64> = harness
.nodes()
.iter()
.take(RESPONDER_COUNT)
.map(|n| n.node_id())
.collect();
if responder_ids.is_empty() {
return Vec::new();
}
harness
.nodes()
.iter()
.enumerate()
.skip(RESPONDER_COUNT)
.map(|(idx, node)| {
let mesh = node.mesh().clone();
let responders = responder_ids.clone();
tokio::spawn(async move {
run_requester_loop(idx, mesh, responders).await;
})
})
.collect()
}
async fn run_requester_loop(
requester_idx: usize,
mesh: Arc<net_sdk::mesh::Mesh>,
responders: Vec<u64>,
) {
let mut tick: u64 = 0;
loop {
tokio::time::sleep(CALL_INTERVAL).await;
let target = responders[(tick as usize + requester_idx) % responders.len()];
let req = EchoRequest { tick };
let opts = CallOptionsTyped {
raw: CallOptions {
deadline: Some(std::time::Instant::now() + CALL_DEADLINE),
..CallOptions::default()
},
..CallOptionsTyped::default()
};
let _resp: Result<EchoResponse, RpcError> =
mesh.call_typed(target, ECHO_SERVICE, &req, opts).await;
tick = tick.wrapping_add(1);
}
}