use futures_util::StreamExt as _;
use stop_token::future::FutureExt as _;
use super::*;
impl_veilid_log_facility!("rpc");
#[derive(Debug)]
pub(super) enum RPCWorkerRequestKind {
Message { message_encoded: MessageEncoded },
}
#[derive(Debug)]
pub(super) struct RPCWorkerRequest {
enqueued_ts: Timestamp,
span: Span,
kind: RPCWorkerRequestKind,
}
impl RPCProcessor {
pub(super) fn startup_rpc_workers(&self) -> EyreResult<()> {
let mut inner = self.inner.lock();
let channel = flume::bounded(self.queue_size as usize);
inner.rpc_send_channel = Some(channel.0.clone());
inner.rpc_stop_source = Some(StopSource::new());
veilid_log!(self debug "Starting {} RPC workers", self.concurrency);
for task_n in 0..self.concurrency {
let registry = self.registry();
let receiver = channel.1.clone();
let stop_token = inner.rpc_stop_source.as_ref().unwrap_or_log().token();
let jh = spawn(&format!("relay worker {}", task_n), async move {
let this = registry.rpc_processor();
Box::pin(this.rpc_worker(stop_token, receiver)).await
});
inner.rpc_worker_join_handles.push(jh);
}
Ok(())
}
pub(super) async fn shutdown_rpc_workers(&self) {
let mut unord = FuturesUnordered::new();
{
let mut inner = self.inner.lock();
for h in inner.rpc_worker_join_handles.drain(..) {
unord.push(h);
}
drop(inner.rpc_stop_source.take());
}
veilid_log!(self debug "Stopping {} RPC workers", unord.len());
while unord.next().await.is_some() {}
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "rpc", skip_all, fields(__VEILID_LOG_KEY = self.log_key()))
)]
async fn rpc_worker(&self, stop_token: StopToken, receiver: flume::Receiver<RPCWorkerRequest>) {
while let Ok(Ok(request)) = receiver.recv_async().timeout_at(stop_token.clone()).await {
let rpc_request_span = tracing::trace_span!("rpc request");
rpc_request_span.follows_from(request.span);
let dequeue_ts = Timestamp::now_non_decreasing();
let dequeue_latency = dequeue_ts.duration_since(request.enqueued_ts);
match request.kind {
RPCWorkerRequestKind::Message { message_encoded } => {
network_result_value_or_log!(self target:"network_result", match self
.process_rpc_message(message_encoded).instrument(rpc_request_span)
.await
{
Err(e) => {
veilid_log!(self error "couldn't process rpc message: {}", e);
continue;
}
Ok(v) => {
v
}
} => [ format!(": msg.header={:?}", message_encoded.header) ] {});
}
}
let process_ts = Timestamp::now_non_decreasing();
let process_latency = process_ts.duration_since(dequeue_ts);
let mut inner = self.inner.lock();
inner.rpc_worker_dequeue_latency = inner
.rpc_worker_dequeue_latency_accounting
.record_latency(dequeue_latency);
inner.rpc_worker_process_latency = inner
.rpc_worker_process_latency_accounting
.record_latency(process_latency);
}
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "rpc", skip_all, fields(__VEILID_LOG_KEY = self.log_key()))
)]
pub fn enqueue_direct_message(
&self,
envelope: Envelope,
sender_noderef: FilteredNodeRef,
flow: Flow,
routing_domain: RoutingDomain,
body: Bytes,
) -> EyreResult<()> {
let _guard = self
.startup_context
.startup_lock
.enter()
.wrap_err("not started up")?;
if sender_noderef.routing_domain_set() != routing_domain {
bail!("routing domain should match peer noderef filter");
}
let header = MessageHeader {
detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect {
envelope,
sender_noderef,
flow,
routing_domain,
}),
timestamp: Timestamp::now_non_decreasing(),
body_len: ByteCount::new(body.len() as u64),
};
let message_encoded = MessageEncoded {
header,
data: MessageData::new(body),
};
let send_channel = {
let inner = self.inner.lock();
let Some(send_channel) = inner.rpc_send_channel.as_ref().cloned() else {
bail!("send channel is closed");
};
send_channel
};
send_channel
.try_send(RPCWorkerRequest {
enqueued_ts: Timestamp::now_non_decreasing(),
span: Span::current(),
kind: RPCWorkerRequestKind::Message { message_encoded },
})
.map_err(|e| eyre!("failed to enqueue direct RPC message: {}", e))?;
Ok(())
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "rpc", skip_all, fields(__VEILID_LOG_KEY = self.log_key()))
)]
pub(super) fn enqueue_safety_routed_message(
&self,
direct: RPCMessageHeaderDetailDirect,
remote_safety_route: PublicKey,
sequencing: Sequencing,
route_op_id: OperationId,
body: Bytes,
) -> EyreResult<()> {
let _guard = self
.startup_context
.startup_lock
.enter()
.wrap_err("not started up")?;
let header = MessageHeader {
detail: RPCMessageHeaderDetail::SafetyRouted(RPCMessageHeaderDetailSafetyRouted {
direct,
remote_safety_route,
sequencing,
route_op_id,
}),
timestamp: Timestamp::now_non_decreasing(),
body_len: (body.len() as u64).into(),
};
let message_encoded = MessageEncoded {
header,
data: MessageData::new(body),
};
let send_channel = {
let inner = self.inner.lock();
let Some(send_channel) = inner.rpc_send_channel.as_ref().cloned() else {
bail!("send channel is closed");
};
send_channel
};
send_channel
.try_send(RPCWorkerRequest {
enqueued_ts: Timestamp::now_non_decreasing(),
span: Span::current(),
kind: RPCWorkerRequestKind::Message { message_encoded },
})
.map_err(|e| eyre!("failed to enqueue safety routed RPC message: {}", e))?;
Ok(())
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "rpc", skip_all, fields(__VEILID_LOG_KEY = self.log_key()))
)]
pub(super) fn enqueue_private_routed_message(
&self,
direct: RPCMessageHeaderDetailDirect,
remote_safety_route: PublicKey,
private_route: PublicKey,
safety_spec: SafetySpec,
route_op_id: OperationId,
body: Bytes,
) -> EyreResult<()> {
let _guard = self
.startup_context
.startup_lock
.enter()
.wrap_err("not started up")?;
let header = MessageHeader {
detail: RPCMessageHeaderDetail::PrivateRouted(RPCMessageHeaderDetailPrivateRouted {
direct,
remote_safety_route,
private_route,
safety_spec,
route_op_id,
}),
timestamp: Timestamp::now_non_decreasing(),
body_len: (body.len() as u64).into(),
};
let message_encoded = MessageEncoded {
header,
data: MessageData::new(body),
};
let send_channel = {
let inner = self.inner.lock();
let Some(send_channel) = inner.rpc_send_channel.as_ref().cloned() else {
bail!("send channel is closed");
};
send_channel
};
send_channel
.try_send(RPCWorkerRequest {
enqueued_ts: Timestamp::now_non_decreasing(),
span: Span::current(),
kind: RPCWorkerRequestKind::Message { message_encoded },
})
.map_err(|e| eyre!("failed to enqueue private routed RPC message: {}", e))?;
Ok(())
}
}