Skip to main content

ecksport_rpc/
executor.rs

1//! Receives events from a worker and spawns tasks to execute handlers for it
2//! and stuff.  Does not handle listening for new connections.
3
4use std::sync::Arc;
5
6use tracing::*;
7
8use ecksport_net::{
9    channel::ChannelHandle,
10    worker::{ConnectionHandle, WorkerEvent},
11};
12
13use crate::{
14    constants::*,
15    endpoint::EndpointHandler,
16    errors::{RpcError, ServerError},
17    registry,
18};
19
20pub async fn executor_task(mut handle: ConnectionHandle, reg: Arc<registry::ProtocolRegistry>) {
21    let loc = handle.peer().location();
22    let proto = handle.protocol();
23
24    let Some(handler) = reg.get_protocol(proto) else {
25        error!(%proto, "tried to start RPC executor for connection on unsupported protocol");
26        return;
27    };
28
29    let conn_span = debug_span!("rpcexec", %loc);
30    debug!(parent: &conn_span, %proto, "starting RPC executor");
31
32    if let Err(e) = do_exec(&mut handle, handler.as_ref())
33        .instrument(conn_span)
34        .await
35    {
36        error!(err = %e, "executor task error");
37    }
38}
39
40async fn do_exec(
41    handle: &mut ConnectionHandle,
42    reg: &registry::HandlerRegistry,
43) -> Result<(), ServerError> {
44    let cur_span = Span::current();
45
46    while let Some(ev) = handle.wait_event().await? {
47        match ev {
48            WorkerEvent::NewChan(mut ch) => {
49                let chan_id = ch.chan_id();
50                let topic = ch.topic();
51
52                // Find the handler, if we don't have one then error.
53                let Some(eh) = reg.get_handler(topic) else {
54                    warn!(%topic, %chan_id, "remote opened channel on unknown topic");
55                    let err = RpcError::new_code(ERR_UNK_METHOD);
56                    ch.send_and_close(err.to_vec()).await?;
57                    return Ok(());
58                };
59
60                // Clone the stuff and spawn the task.
61                let eh = eh.clone();
62                let cur_span = cur_span.clone();
63
64                let span = debug_span!(parent: cur_span, "call", %chan_id);
65                let span2 = span.clone();
66                debug!(parent: &span, %topic, "handling call");
67                let handle_fut = async move {
68                    if let Err(e) = chan_worker(eh, ch).await {
69                        error!(parent: &span2, err = %e, "error handling call");
70                    }
71                };
72                tokio::spawn(handle_fut.instrument(span));
73            }
74            WorkerEvent::Notification(topic, msg) => {
75                warn!(%topic, len = %msg.len(), "received notification, not supported yet, ignoring");
76                // TODO
77            }
78        }
79    }
80
81    Ok(())
82}
83
84async fn chan_worker(
85    handler: Arc<EndpointHandler>,
86    mut channel: ChannelHandle,
87) -> Result<(), ServerError> {
88    match handler.as_ref() {
89        EndpointHandler::AsyncFunc(h) => {
90            let Some(m) = channel.recv_msg().await? else {
91                error!("channel closed without message?");
92                return Ok(());
93            };
94
95            // If the error flag is set then we don't know how to handle it so
96            // we should respond immediately.
97            if m.flags().err {
98                trace!("initial frame had error, wtf?");
99                let e = RpcError::new_code(ERR_MALFORMED_REQ);
100                channel.close_with_err(e.to_vec()).await?;
101                return Ok(());
102            }
103
104            let pdata = channel.peer_data().clone();
105
106            match h.invoke(pdata, m.into_payload()).await {
107                Ok(res) => {
108                    trace!(len = %res.len(), "sending response");
109                    channel.send_and_close(res).await?
110                }
111                Err(e) => {
112                    trace!(err = %e, "RPC error");
113                    channel.close_with_err(e.to_vec()).await?
114                }
115            }
116        }
117
118        EndpointHandler::Ignore => {
119            channel.close().await?;
120        }
121    }
122
123    Ok(())
124}