1use std::sync::Arc;
5
6use tracing::*;
7
8use ecksport_net::{
9 channel::ChannelHandle,
10 worker::{ConnectionHandle, WorkerEvent},
11};
12
13use crate::{
14 constants::*,
15 endpoint::EndpointHandler,
16 errors::{RpcError, ServerError},
17 registry,
18};
19
20pub async fn executor_task(mut handle: ConnectionHandle, reg: Arc<registry::ProtocolRegistry>) {
21 let loc = handle.peer().location();
22 let proto = handle.protocol();
23
24 let Some(handler) = reg.get_protocol(proto) else {
25 error!(%proto, "tried to start RPC executor for connection on unsupported protocol");
26 return;
27 };
28
29 let conn_span = debug_span!("rpcexec", %loc);
30 debug!(parent: &conn_span, %proto, "starting RPC executor");
31
32 if let Err(e) = do_exec(&mut handle, handler.as_ref())
33 .instrument(conn_span)
34 .await
35 {
36 error!(err = %e, "executor task error");
37 }
38}
39
40async fn do_exec(
41 handle: &mut ConnectionHandle,
42 reg: ®istry::HandlerRegistry,
43) -> Result<(), ServerError> {
44 let cur_span = Span::current();
45
46 while let Some(ev) = handle.wait_event().await? {
47 match ev {
48 WorkerEvent::NewChan(mut ch) => {
49 let chan_id = ch.chan_id();
50 let topic = ch.topic();
51
52 let Some(eh) = reg.get_handler(topic) else {
54 warn!(%topic, %chan_id, "remote opened channel on unknown topic");
55 let err = RpcError::new_code(ERR_UNK_METHOD);
56 ch.send_and_close(err.to_vec()).await?;
57 return Ok(());
58 };
59
60 let eh = eh.clone();
62 let cur_span = cur_span.clone();
63
64 let span = debug_span!(parent: cur_span, "call", %chan_id);
65 let span2 = span.clone();
66 debug!(parent: &span, %topic, "handling call");
67 let handle_fut = async move {
68 if let Err(e) = chan_worker(eh, ch).await {
69 error!(parent: &span2, err = %e, "error handling call");
70 }
71 };
72 tokio::spawn(handle_fut.instrument(span));
73 }
74 WorkerEvent::Notification(topic, msg) => {
75 warn!(%topic, len = %msg.len(), "received notification, not supported yet, ignoring");
76 }
78 }
79 }
80
81 Ok(())
82}
83
84async fn chan_worker(
85 handler: Arc<EndpointHandler>,
86 mut channel: ChannelHandle,
87) -> Result<(), ServerError> {
88 match handler.as_ref() {
89 EndpointHandler::AsyncFunc(h) => {
90 let Some(m) = channel.recv_msg().await? else {
91 error!("channel closed without message?");
92 return Ok(());
93 };
94
95 if m.flags().err {
98 trace!("initial frame had error, wtf?");
99 let e = RpcError::new_code(ERR_MALFORMED_REQ);
100 channel.close_with_err(e.to_vec()).await?;
101 return Ok(());
102 }
103
104 let pdata = channel.peer_data().clone();
105
106 match h.invoke(pdata, m.into_payload()).await {
107 Ok(res) => {
108 trace!(len = %res.len(), "sending response");
109 channel.send_and_close(res).await?
110 }
111 Err(e) => {
112 trace!(err = %e, "RPC error");
113 channel.close_with_err(e.to_vec()).await?
114 }
115 }
116 }
117
118 EndpointHandler::Ignore => {
119 channel.close().await?;
120 }
121 }
122
123 Ok(())
124}