1use rand::Rng;
2use std::collections::{BTreeMap, HashMap};
3use std::sync::Arc;
4use tokio::sync::{Notify, RwLock};
5
6use anyhow::bail;
7
8use crate::{Handler, RPCTask, RpcRequest, RpcResult, Transport};
9
10#[derive(Clone)]
11pub struct BoxcarExecutor {
12 pub(crate) slots: Arc<RwLock<Vec<u16>>>,
14 pub(crate) subscriber_map: HashMap<Transport, Vec<u16>>,
16 pub(crate) handlers: Arc<RwLock<Vec<Arc<Handler>>>>,
17 pub(crate) tasks: Arc<RwLock<BTreeMap<u16, Arc<RwLock<RPCTask>>>>>,
18}
19impl BoxcarExecutor {
20 pub fn new() -> Self {
21 BoxcarExecutor {
22 slots: Arc::new(Default::default()),
23 subscriber_map: Default::default(),
24 handlers: Default::default(),
25 tasks: Default::default(),
26 }
27 }
28
29 pub async fn add_handler(&mut self, handle: Handler) {
31 self.handlers.write().await.push(Arc::new(handle))
32 }
33
34 pub async fn num_handlers(&self) -> usize {
36 self.handlers.read().await.len()
37 }
38
39 pub async fn get_rpc(&self, slot: u16) -> Option<RPCTask> {
40 match self.tasks.read().await.get(&slot) {
41 None => None,
42 Some(task) => Some(task.read().await.clone()),
43 }
44 }
45
46 async fn assign_slot(&mut self) -> u16 {
47 let mut depth = 0;
48
49 loop {
50 let slot: u16 = rand::thread_rng().gen();
52 let mut task_handle = self.slots.write().await;
53
54 if !task_handle.contains(&slot) {
55 task_handle.push(slot);
56 return slot;
57 }
58 if depth > 1024 {
59 panic!("oh no")
60 }
61 depth += 1;
62 }
63 }
64
65 pub async fn execute_task(
66 &mut self,
67 request: RpcRequest,
68 ) -> anyhow::Result<(u16, Arc<Notify>)> {
69 let s_slot = self.assign_slot().await;
70 let task = RPCTask {
71 slot: s_slot,
72 request,
73 result: RpcResult::None,
74 };
75 tracing::debug!(s_slot = s_slot, "executing RPCTask {:?}", &task);
76 let delay = task.request.delay;
77
78 let handle = self.handlers.read().await;
80
81 tracing::trace!(
82 s_slot = s_slot,
83 "number of registered handlers: {}",
84 handle.len()
85 );
86 let handle = handle
87 .iter()
88 .find(|v| v.contains(task.request.method.as_str()));
89
90 if handle.is_none() {
91 tracing::error!(
92 s_slot = s_slot,
93 method = task.request.method.as_str(),
94 "requested handler does not exist"
95 );
96 bail!("no such handler");
97 }
98
99 let notifier = Arc::new(Notify::new());
100
101 let task_ref = Arc::new(RwLock::new(task));
103 self.tasks.write().await.insert(s_slot, task_ref.clone());
104
105 let handler = handle.unwrap().clone();
107 let closure_notify = notifier.clone();
109 let closure = async move {
110 let task_ref = task_ref.clone();
111
112 let task = task_ref.read().await;
114 let s_slot = task.slot;
115
116 tracing::debug!(s_slot = s_slot, "---- entering task handler ----");
118 let result = handler
119 .call(task.request.method.as_str(), task.request.body.clone())
120 .await;
121 tracing::debug!(s_slot = s_slot, "---- leaving task handler ----");
122 tracing::info!(s_slot = s_slot, "rpc returned {:?}", &result);
123
124 drop(task);
126
127 task_ref.write().await.result = result;
129 closure_notify.notify_one();
130 };
131
132 if delay {
133 tracing::trace!(s_slot = s_slot, "spawning task on a blocking thread");
134 tokio::task::spawn_blocking(move || closure);
135 } else {
136 tracing::trace!(s_slot = s_slot, "spawning task in a non-blocking fashion");
137 tokio::task::spawn(closure);
138 }
139
140 Ok((s_slot, notifier))
141 }
142}
143impl Default for BoxcarExecutor {
144 fn default() -> Self {
145 Self::new()
146 }
147}