1use std::collections::{VecDeque, BTreeMap};
2use std::sync::{Arc, Mutex};
3use std::task::Poll;
4use std::fmt::Debug;
5use std::pin::Pin;
6use std::any::Any;
7
8use anyanymap::Map;
9
10use tokio::sync::{Mutex as TokioMutex, MutexGuard as TokioMutexGuard};
11use tokio::sync::mpsc::{Receiver, Sender};
12use tokio::sync::{oneshot, mpsc};
13
14pub trait Request: Send + 'static {
15 type Response: Send + Debug;
16}
17
18impl<R: Request> Command<R> for R {
19 type Output = R::Response;
20 async fn run(self, mut ctx: Context<R>) -> Self::Output {
21 ctx.send(vec![self]).await.remove(0)
22 }
23}
24
25pub trait Command<R: Request>: Any + Send {
26 type Output: Any + Send + where Self: Sized;
27 fn run(self, ctx: Context<R>) -> impl Future<Output = Self::Output> + Send where Self: Sized;
28}
29
30pub trait Handler<R: Request>: Send {
31 fn handle(&mut self, store: &mut State, requests: Vec<R>) -> impl Future<Output = Vec<R::Response>> + Send;
32}
33
34pub trait AnyCommand<R: Request>: Send {fn run(self: Box<Self>, ctx: Context<R>) -> PBFut<Box<dyn Any + Send>>; }
35impl<R: Request, A: Any + Send + 'static, Self_: Command<R, Output = A> + Send> AnyCommand<R> for Self_ {
36 fn run(self: Box<Self>, ctx: Context<R>) -> PBFut<Box<dyn Any + Send>> {
37 Box::pin(async move {Box::new((*self).run(ctx).await) as Box<dyn Any + Send>})
38 }
39}
40impl<R: Request> Command<R> for Box<dyn AnyCommand<R>> {
41 type Output = Box<dyn Any + Send>;
42 fn run(self, ctx: Context<R>) -> impl Future<Output = Self::Output> {AnyCommand::run(self, ctx)}
43}
44
45Map!(State: std::any::Any, Send);
46
47pub struct Context<R: Request> {
48 callback: Callback<R>,
49 store: Store,
50}
51impl<R: Request> Clone for Context<R> {
52 fn clone(&self) -> Self {Context{callback: self.callback.clone(), store: self.store.clone()}}
53}
54
55impl<R: Request> Context<R> {
56 pub async fn send(&mut self, requests: Vec<R>) -> Vec<R::Response> {
57 let (tx, rx) = oneshot::channel();
58 self.callback.send((requests, tx)).await.unwrap();
59 rx.await.unwrap()
60 }
61
62 pub async fn store(&mut self) -> TokioMutexGuard<'_, State> {
63 self.store.lock().await
64 }
65
66 pub async fn get_mut<A: Any + Send>(&mut self) -> Option<tokio::sync::MappedMutexGuard<'_, A>> {
67 let mut guard = self.store.lock().await;
68 if guard.get_mut::<A>().is_some() {
69 Some(TokioMutexGuard::map(guard, |store| store.get_mut().unwrap()))
70 } else {None}
71 }
72
73 pub async fn get_mut_or_default<A: Any + Send + Default>(&mut self) -> tokio::sync::MappedMutexGuard<'_, A> {
74 TokioMutexGuard::map(self.store.lock().await, |store| store.get_mut_or_default())
75 }
76
77 pub async fn run<Output: Send + 'static, M: Command<R, Output = Output>>(&mut self, input: M) -> Output {
78 Box::new(input).run_m(self.clone()).await
79 }
80}
81
82impl<R: Request> Handler<R> for Context<R> {
83 async fn handle(&mut self, _store: &mut State, requests: Vec<R>) -> Vec<R::Response> {self.send(requests).await}
84}
85
86pub struct Compiler<R: Request, H: Handler<R>, Output> {
87 commands: Commands<R, Output>,
88 running: InProgress<R, Output>,
89 store: Store,
90 handler: H
91}
92impl<R: Request, H: Handler<R>, Output: Send + 'static> Compiler<R, H, Output> {
93 pub fn new(
94 handler: H
95 ) -> Self {
96 Compiler{
97 commands: Commands::default(),
98 running: InProgress::default(),
99 store: Store::default(),
100 handler
101 }
102 }
103
104 pub async fn store(&mut self) -> TokioMutexGuard<'_, State> {
105 self.store.lock().await
106 }
107
108 pub fn add_task(&mut self, id: u64, command: impl Command<R, Output = Output>) {
109 self.commands.lock().unwrap().insert(id, Box::new(command));
110 }
111
112 pub fn is_empty(&self) -> bool {
113 self.commands.lock().unwrap().is_empty() && self.running.lock().unwrap().is_empty()
114 }
115
116 pub async fn tick(&mut self) -> BTreeMap<u64, Output> {
117 CompilerTick::<R, Output>::new(self.commands.clone(), self.running.clone(), self.store.clone(),
118 Box::new(|req: Vec<R>| {
119 Box::pin(async move {
120 let mut state = self.store.lock().await;
121 self.handler.handle(&mut state, req).await
122 })
123 })
124 ).run().await
125 }
126
127 pub async fn run(mut self) -> BTreeMap<u64, Output> {
128 let mut results = BTreeMap::new();
129 while !self.is_empty() {
130 results.extend(self.tick().await);
131 }
132 results
133 }
134
135 async fn run_in_order(
136 commands: Vec<Box<dyn _Command<R, Output>>>,
137 store: Store,
138 handler: H
139 ) -> Vec<Output> {
140 let (order, commands): (Vec<u64>, _) = commands.into_iter().enumerate().map(|(id, c)| {(id as u64, (id as u64, c))}).unzip();
141 let mut results = Compiler{
142 commands: Arc::new(Mutex::new(commands)),
143 running: InProgress::default(),
144 store,
145 handler
146 }.run().await;
147 order.into_iter().map(|i| results.remove(&i).unwrap()).collect()
148 }
149
150
151}
152
153type HandlerOnce<'a, R> = Box<dyn FnOnce(Vec<R>) -> Pin<Box<dyn Future<Output = Vec<<R as Request>::Response>> + Send + 'a>> + Send + 'a>;
154type Commands<R, Output> = Arc<Mutex<BTreeMap<u64, Box<dyn _Command<R, Output>>>>>;
155type InProgress<R, Output> = Arc<Mutex<BTreeMap<u64, (Pin<Box<Running<R, Output>>>, Vec<R>, Responder<R>)>>>;
156type Responder<R> = oneshot::Sender<Vec<<R as Request>::Response>>;
157type Callback<R> = Sender<(Vec<R>, Responder<R>)>;
158type Store = Arc<TokioMutex<State>>;
159type PBFut<Output> = Pin<Box<dyn Future<Output = Output> + Send>>;
160
161struct CompilerTick<'a, R: Request, Output>{
162 commands: Commands<R, Output>,
163 running: InProgress<R, Output>,
164 store: Store,
165 handler: HandlerOnce<'a, R>,
166}
167impl<'a, R: Request, Output: Send + 'static> CompilerTick<'a, R, Output> {
168 pub fn new(
169 commands: Commands<R, Output>,
170 running: InProgress<R, Output>,
171 store: Store,
172 handler: HandlerOnce<'a, R>
173 ) -> Self {
174 CompilerTick{commands, running, store, handler}
175 }
176
177 pub async fn run(self) -> BTreeMap<u64, Output> {
178 let mut results = BTreeMap::default();
179 let commands = std::mem::take(&mut *self.commands.lock().unwrap());
180 let store = self.store.clone();
181 for (id, c) in commands {
182 match Running::new(c, &store).await {
183 RunningResult::Ready(result) => {results.insert(id, result);},
184 RunningResult::Requesting(future, requests, responder) => {self.running.lock().unwrap().insert(id, (future, requests, responder));}
185 }
186 }
187 let running = std::mem::take(&mut *self.running.lock().unwrap());
188 let (running, batch): (BTreeMap<_, _>, Vec<_>) = running.into_iter().map(
189 |(id, (future, requests, responder))| ((id, (future, responder)), requests)
190 ).unzip();
191 let counts: Vec<_> = batch.iter().map(|b| b.len()).collect();
192 let mut responses: VecDeque<R::Response> = (self.handler)(
193 batch.into_iter().flatten().collect()
194 ).await.into();
195 for ((id, (future, responder)), count) in running.into_iter().zip(counts) {
196 responder.send({
197 let mut r = Vec::new();
198 for _ in 0..count {
199 r.push(responses.pop_front().unwrap());
200 }
201 r
202 }).unwrap();
203 match future.await {
204 RunningResult::Ready(result) => {results.insert(id, result);},
205 RunningResult::Requesting(future, requests, responder) => {self.running.lock().unwrap().insert(id, (future, requests, responder));}
206 }
207 }
208 results
209 }
210}
211enum RunningResult<R: Request, Output> {
212 Ready(Output),
213 Requesting(Pin<Box<Running<R, Output>>>, Vec<R>, Responder<R>)
214}
215
216struct Running<R: Request, Output>(Option<PBFut<Output>>, Option<Receiver<(Vec<R>, Responder<R>)>>);
217impl<R: Request, Output: Send + 'static> Running<R, Output> {
218 pub fn new(m: Box<dyn _Command<R, Output>>, store: &Store) -> Self {
219 let (callback, rx) = mpsc::channel(1);
220 let context = Context{store: store.clone(), callback};
221 Running(Some(Box::pin(m.run_m(context))), Some(rx))
222 }
223}
224
225impl<R: Request, Output: 'static> Future for Running<R, Output> {
226 type Output = RunningResult<R, Output>;
227 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
228 match self.0.as_mut().unwrap().as_mut().poll(cx) {
229 Poll::Ready(r) => Poll::Ready(RunningResult::Ready(r)),
230 Poll::Pending => {
231 if let Ok(tx) = self.1.as_mut().unwrap().try_recv() {
232 Poll::Ready(RunningResult::Requesting(Box::pin(Running(self.0.take(), self.1.take())), tx.0, tx.1))
233 } else {Poll::Pending}
234 }
235 }
236 }
237}
238
239trait _Command<R: Request, Output: Send + 'static>: Send {
240 fn run_m(self: Box<Self>, ctx: Context<R>) -> PBFut<Output>;
241}
242
243impl<R: Request, C: Command<R>> Command<R> for Vec<C> {
244 type Output = Vec<C::Output>;
245 async fn run(self, ctx: Context<R>) -> Self::Output {
246 Compiler::run_in_order(
247 self.into_iter().map(|c| Box::new(c) as Box<dyn _Command<R, C::Output>>).collect(),
248 ctx.store.clone(),
249 ctx,
250 ).await
251 }
252}
253
254impl<R: Request, C: Command<R>> _Command<R, C::Output> for C {
255 fn run_m(self: Box<Self>, ctx: Context<R>) -> PBFut<C::Output> {
256 Box::pin(Command::run(*self, ctx))
257 }
258}
259
260impl<R: Request, Output: Any + Send> Command<R> for Box<dyn _Command<R, Output>> {
261 type Output = Box<dyn Any + Send>;
262 async fn run(self, ctx: Context<R>) -> Self::Output {
263 Box::new(self.run_m(ctx).await) as Box<dyn Any + Send>
264 }
265}
266
267trait AnyRequest<R: Request, Output> {fn any(self) -> Box<dyn _Command<R, Box<dyn Any + Send>>>;}
268impl<R: Request, Output: Send + 'static, M: _Command<R, Output> + 'static> AnyRequest<R, Output> for M {
269 fn any(self) -> Box<dyn _Command<R, Box<dyn Any + Send>>> {
270 Box::new(Box::new(self) as Box<dyn _Command<R, Output>>)
271 }
272}
273
274macro_rules! impl_result_tuple {
275 (
276 ($t_head:ident, $( $t:ident ),+);
277 ($tt_head:ident, $($tt:ident),+);
278 ($i_head:tt, $( $i:tt ),+)
279 ) => {
280 impl<
281 R: Request,
282 $t_head: Any + Send, $( $t: Any + Send ),+,
283 $tt_head: Command<R, Output = $t_head> + 'static, $( $tt: Command<R, Output = $t> + 'static ),+
284 > Command<R> for ($tt_head, $( $tt ),+) {
285 type Output = ($t_head, $( $t ),+);
286 async fn run(self, ctx: Context<R>) -> ($t_head, $( $t ),+) {
287 let mut results = Compiler::run_in_order(vec![
288 $( self.$i.any() ),+, self.$i_head.any()
289 ], ctx.store.clone(), ctx).await;
290 (
291 *results.remove(0).downcast::<$t_head>().unwrap(),
292 $( {*results.remove(0).downcast::<$t>().unwrap()} ),+
293 )
294 }
295 }
296 impl_result_tuple!(($($t),+); ($($tt),+); ($($i),+));
297 };
298
299 (
300 ($t:ident);
301 ($tt:ident);
302 ($i:tt)
303 ) => {}
304}
305impl_result_tuple!((T0, T1, T2, T3, T4, T5, T6, T7); (M0, M1, M2, M3, M4, M5, M6, M7); (7, 6, 5, 4, 3, 2, 1, 0));