use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
#[cfg_attr(feature = "instrument", instrument(level = "trace", target = "rpc", skip(self, message), fields(message.len = message.len(), ret.latency, ret.len), err(level=Level::DEBUG)))]
pub async fn rpc_call_app_call(
&self,
dest: Destination,
message: Bytes,
) -> RPCNetworkResult<Answer<Bytes>> {
let _guard = self
.startup_context
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
let debug_string = format!("AppCall(message(len)={}) => {}", message.len(), dest);
let app_call_q = RPCOperationAppCallQ::new(message)?;
let question = RPCQuestion::new(
network_result_try!(self.get_destination_respond_to(&dest).await?),
RPCQuestionDetail::AppCallQ(Box::new(app_call_q)),
);
let waitable_reply = network_result_try!(self.question(dest, question, None, None).await?);
let (msg, answer_context) = match self.wait_for_reply(waitable_reply, debug_string).await? {
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
TimeoutOr::Value(v) => v,
};
let (_, _, kind) = msg.operation.destructure();
let app_call_a = match kind {
RPCOperationKind::Answer(a) => match a.destructure() {
RPCAnswerDetail::AppCallA(a) => a,
_ => return Ok(NetworkResult::invalid_message("not an appcall answer")),
},
_ => return Ok(NetworkResult::invalid_message("not an answer")),
};
let a_message = app_call_a.destructure();
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.latency", answer_context.latency.as_u64());
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.len", a_message.len());
Ok(NetworkResult::value(Answer::new(answer_context, a_message)))
}
#[cfg_attr(feature = "instrument", instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(super) async fn process_app_call_q(&self, msg: Message) -> RPCNetworkResult<()> {
let routing_table = self.routing_table();
let has_capability_app_message = routing_table
.get_published_peer_info(msg.header.routing_domain())
.map(|ppi| ppi.node_info().has_capability(VEILID_CAPABILITY_APPMESSAGE))
.unwrap_or(false);
if !has_capability_app_message {
return Ok(NetworkResult::service_unavailable(
"app call is not available",
));
}
let route_id = if let Some(pr_pubkey) = msg.header.get_private_route_public_key() {
let rss = routing_table.route_spec_store();
let Some(route_id) = rss.get_route_id_for_key(&pr_pubkey) else {
return Ok(NetworkResult::invalid_message(format!(
"private route does not exist for key: {}",
pr_pubkey
)));
};
Some(route_id)
} else {
None
};
let op_id = msg.operation.op_id();
let kind = msg.operation.kind().clone();
let app_call_q = match kind {
RPCOperationKind::Question(q) => match q.destructure() {
(_, RPCQuestionDetail::AppCallQ(q)) => q,
_ => panic!("not an appcall question"),
},
_ => panic!("not a question"),
};
let crypto_kind = msg.header.crypto_kind();
let sender = msg
.opt_sender_nr
.as_ref()
.map(|nr| nr.node_ids().get(crypto_kind).unwrap_or_log());
#[cfg(not(feature = "footgun"))]
{
if sender.is_some() {
return Ok(NetworkResult::invalid_message(
"Direct BareNodeId senders are not allowed for AppCall when footgun is disabled",
));
}
}
let handle = self.waiting_app_call_table.add_op_waiter(op_id, ());
let message = app_call_q.destructure();
(self.update_callback())(VeilidUpdate::AppCall(Box::new(VeilidAppCall::new(
sender,
route_id,
message.to_vec(),
op_id,
))));
let res = self
.waiting_app_call_table
.wait_for_op(handle, self.timeout)
.await?;
let (message_a, _latency) = match res {
TimeoutOr::Timeout => {
veilid_log!(self debug "App call timed out for id {}", op_id);
return Ok(NetworkResult::timeout());
}
TimeoutOr::Value(v) => v,
};
let app_call_a = RPCOperationAppCallA::new(message_a)?;
self.answer(
msg,
RPCAnswer::new(RPCAnswerDetail::AppCallA(Box::new(app_call_a))),
None,
)
.await
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "rpc", skip_all, fields(__VEILID_LOG_KEY = self.log_key()))
)]
pub fn app_call_reply(&self, call_id: OperationId, message: Bytes) -> Result<(), RPCError> {
let _guard = self
.startup_context
.startup_lock
.enter()
.map_err(RPCError::map_try_again("not started up"))?;
self.waiting_app_call_table
.complete_op_waiter(call_id, message)
.map_err(RPCError::ignore)
}
}