boxcar_rpc/
executor.rs

1use rand::Rng;
2use std::collections::{BTreeMap, HashMap};
3use std::sync::Arc;
4use tokio::sync::{Notify, RwLock};
5
6use anyhow::bail;
7
8use crate::{Handler, RPCTask, RpcRequest, RpcResult, Transport};
9
10#[derive(Clone)]
11pub struct BoxcarExecutor {
12    /// List of all allocated slots
13    pub(crate) slots: Arc<RwLock<Vec<u16>>>,
14    /// Map that maps a Transport to all slots it is listening to
15    pub(crate) subscriber_map: HashMap<Transport, Vec<u16>>,
16    pub(crate) handlers: Arc<RwLock<Vec<Arc<Handler>>>>,
17    pub(crate) tasks: Arc<RwLock<BTreeMap<u16, Arc<RwLock<RPCTask>>>>>,
18}
19impl BoxcarExecutor {
20    pub fn new() -> Self {
21        BoxcarExecutor {
22            slots: Arc::new(Default::default()),
23            subscriber_map: Default::default(),
24            handlers: Default::default(),
25            tasks: Default::default(),
26        }
27    }
28
29    /// Add
30    pub async fn add_handler(&mut self, handle: Handler) {
31        self.handlers.write().await.push(Arc::new(handle))
32    }
33
34    /// Return the number of registered handlers
35    pub async fn num_handlers(&self) -> usize {
36        self.handlers.read().await.len()
37    }
38
39    pub async fn get_rpc(&self, slot: u16) -> Option<RPCTask> {
40        match self.tasks.read().await.get(&slot) {
41            None => None,
42            Some(task) => Some(task.read().await.clone()),
43        }
44    }
45
46    async fn assign_slot(&mut self) -> u16 {
47        let mut depth = 0;
48
49        loop {
50            // TODO Make this more efficient, as I think this is creating a new RNG each loop
51            let slot: u16 = rand::thread_rng().gen();
52            let mut task_handle = self.slots.write().await;
53
54            if !task_handle.contains(&slot) {
55                task_handle.push(slot);
56                return slot;
57            }
58            if depth > 1024 {
59                panic!("oh no")
60            }
61            depth += 1;
62        }
63    }
64
65    pub async fn execute_task(
66        &mut self,
67        request: RpcRequest,
68    ) -> anyhow::Result<(u16, Arc<Notify>)> {
69        let s_slot = self.assign_slot().await;
70        let task = RPCTask {
71            slot: s_slot,
72            request,
73            result: RpcResult::None,
74        };
75        tracing::debug!(s_slot = s_slot, "executing RPCTask {:?}", &task);
76        let delay = task.request.delay;
77
78        // search through registered handles to find the one that contains the requested method
79        let handle = self.handlers.read().await;
80
81        tracing::trace!(
82            s_slot = s_slot,
83            "number of registered handlers: {}",
84            handle.len()
85        );
86        let handle = handle
87            .iter()
88            .find(|v| v.contains(task.request.method.as_str()));
89
90        if handle.is_none() {
91            tracing::error!(
92                s_slot = s_slot,
93                method = task.request.method.as_str(),
94                "requested handler does not exist"
95            );
96            bail!("no such handler");
97        }
98
99        let notifier = Arc::new(Notify::new());
100
101        // build task arc, and insert it into our map
102        let task_ref = Arc::new(RwLock::new(task));
103        self.tasks.write().await.insert(s_slot, task_ref.clone());
104
105        // handler that will be moved into the closure
106        let handler = handle.unwrap().clone();
107        // notifier that will be moved into the closure
108        let closure_notify = notifier.clone();
109        let closure = async move {
110            let task_ref = task_ref.clone();
111
112            // pull out the operation information, as we don't want to move stuff into our handler
113            let task = task_ref.read().await;
114            let s_slot = task.slot;
115
116            // // run the handler
117            tracing::debug!(s_slot = s_slot, "---- entering task handler ----");
118            let result = handler
119                .call(task.request.method.as_str(), task.request.body.clone())
120                .await;
121            tracing::debug!(s_slot = s_slot, "---- leaving  task handler ----");
122            tracing::info!(s_slot = s_slot, "rpc returned {:?}", &result);
123
124            // why is there a drop here? I don't know, but if you remove it- you don't save `result`
125            drop(task);
126
127            // take the result of the handler and write it into the RPCTask
128            task_ref.write().await.result = result;
129            closure_notify.notify_one();
130        };
131
132        if delay {
133            tracing::trace!(s_slot = s_slot, "spawning task on a blocking thread");
134            tokio::task::spawn_blocking(move || closure);
135        } else {
136            tracing::trace!(s_slot = s_slot, "spawning task in a non-blocking fashion");
137            tokio::task::spawn(closure);
138        }
139
140        Ok((s_slot, notifier))
141    }
142}
143impl Default for BoxcarExecutor {
144    fn default() -> Self {
145        Self::new()
146    }
147}