request_compiler/
lib.rs

1use std::collections::{VecDeque, BTreeMap};
2use std::sync::{Arc, Mutex};
3use std::task::Poll;
4use std::fmt::Debug;
5use std::pin::Pin;
6use std::any::Any;
7
8use anyanymap::Map;
9
10use tokio::sync::{Mutex as TokioMutex, MutexGuard as TokioMutexGuard};
11use tokio::sync::mpsc::{Receiver, Sender};
12use tokio::sync::{oneshot, mpsc};
13
14pub trait Request: Send + 'static {
15    type Response: Send + Debug;
16}
17
18impl<R: Request> Command<R> for R {
19    type Output = R::Response;
20    async fn run(self, mut ctx: Context<R>) -> Self::Output {
21        ctx.send(vec![self]).await.remove(0)
22    }
23}
24
25pub trait Command<R: Request>: Any + Send {
26    type Output: Any + Send + where Self: Sized;
27    fn run(self, ctx: Context<R>) -> impl Future<Output = Self::Output> + Send where Self: Sized;
28}
29
30pub trait Handler<R: Request>: Send {
31    fn handle(&mut self, store: &mut State, requests: Vec<R>) -> impl Future<Output = Vec<R::Response>> + Send;
32}
33
34pub trait AnyCommand<R: Request>: Send {fn run(self: Box<Self>, ctx: Context<R>) -> PBFut<Box<dyn Any + Send>>; }
35impl<R: Request, A: Any + Send + 'static, Self_: Command<R, Output = A> + Send> AnyCommand<R> for Self_ {
36    fn run(self: Box<Self>, ctx: Context<R>) -> PBFut<Box<dyn Any + Send>> {
37        Box::pin(async move {Box::new((*self).run(ctx).await) as Box<dyn Any + Send>})
38    }
39}
40impl<R: Request> Command<R> for Box<dyn AnyCommand<R>> {
41    type Output = Box<dyn Any + Send>;
42    fn run(self, ctx: Context<R>) -> impl Future<Output = Self::Output> {AnyCommand::run(self, ctx)}
43}
44
45Map!(State: std::any::Any, Send);
46
47pub struct Context<R: Request> {
48    callback: Callback<R>,
49    store: Store,
50}
51impl<R: Request> Clone for Context<R> {
52    fn clone(&self) -> Self {Context{callback: self.callback.clone(), store: self.store.clone()}}
53}
54    
55impl<R: Request> Context<R> {
56    pub async fn send(&mut self, requests: Vec<R>) -> Vec<R::Response> {
57        let (tx, rx) = oneshot::channel();
58        self.callback.send((requests, tx)).await.unwrap();
59        rx.await.unwrap()
60    }
61
62    pub async fn store(&mut self) -> TokioMutexGuard<'_, State> {
63        self.store.lock().await
64    }
65
66    pub async fn get_mut<A: Any + Send>(&mut self) -> Option<tokio::sync::MappedMutexGuard<'_, A>> {
67        let mut guard = self.store.lock().await;
68        if guard.get_mut::<A>().is_some() {
69            Some(TokioMutexGuard::map(guard, |store| store.get_mut().unwrap()))
70        } else {None}
71    }
72
73    pub async fn get_mut_or_default<A: Any + Send + Default>(&mut self) -> tokio::sync::MappedMutexGuard<'_, A> {
74        TokioMutexGuard::map(self.store.lock().await, |store| store.get_mut_or_default())
75    }
76
77    pub async fn run<Output: Send + 'static, M: Command<R, Output = Output>>(&mut self, input: M) -> Output {
78        Box::new(input).run_m(self.clone()).await
79    }
80}
81
82impl<R: Request> Handler<R> for Context<R> {
83    async fn handle(&mut self, _store: &mut State, requests: Vec<R>) -> Vec<R::Response> {self.send(requests).await}
84}
85
86pub struct Compiler<R: Request, H: Handler<R>, Output> {
87    commands: Commands<R, Output>,
88    running: InProgress<R, Output>,
89    store: Store,
90    handler: H
91}
92impl<R: Request, H: Handler<R>, Output: Send + 'static> Compiler<R, H, Output> {
93    pub fn new(
94        handler: H
95    ) -> Self {
96        Compiler{
97            commands: Commands::default(),
98            running: InProgress::default(),
99            store: Store::default(),
100            handler
101        }
102    }
103
104    pub async fn store(&mut self) -> TokioMutexGuard<'_, State> {
105        self.store.lock().await
106    }
107
108    pub fn add_task(&mut self, id: u64, command: impl Command<R, Output = Output>) {
109        self.commands.lock().unwrap().insert(id, Box::new(command));
110    }
111
112    pub fn is_empty(&self) -> bool {
113        self.commands.lock().unwrap().is_empty() && self.running.lock().unwrap().is_empty()
114    }
115
116    pub async fn tick(&mut self) -> BTreeMap<u64, Output> {
117        CompilerTick::<R, Output>::new(self.commands.clone(), self.running.clone(), self.store.clone(), 
118            Box::new(|req: Vec<R>| {
119                Box::pin(async move {
120                    let mut state = self.store.lock().await;
121                    self.handler.handle(&mut state, req).await
122                })
123            })
124        ).run().await
125    }
126
127    pub async fn run(mut self) -> BTreeMap<u64, Output> {
128        let mut results = BTreeMap::new();
129        while !self.is_empty() {
130            results.extend(self.tick().await);
131        }
132        results
133    }
134
135    async fn run_in_order(
136        commands: Vec<Box<dyn _Command<R, Output>>>,
137        store: Store,
138        handler: H
139    ) -> Vec<Output> {
140        let (order, commands): (Vec<u64>, _) = commands.into_iter().enumerate().map(|(id, c)| {(id as u64, (id as u64, c))}).unzip();
141        let mut results = Compiler{
142            commands: Arc::new(Mutex::new(commands)),
143            running: InProgress::default(),
144            store,
145            handler
146        }.run().await;
147        order.into_iter().map(|i| results.remove(&i).unwrap()).collect()
148    }
149
150
151}
152
153type HandlerOnce<'a, R> = Box<dyn FnOnce(Vec<R>) -> Pin<Box<dyn Future<Output = Vec<<R as Request>::Response>> + Send + 'a>> + Send + 'a>;
154type Commands<R, Output> = Arc<Mutex<BTreeMap<u64, Box<dyn _Command<R, Output>>>>>;
155type InProgress<R, Output> = Arc<Mutex<BTreeMap<u64, (Pin<Box<Running<R, Output>>>, Vec<R>, Responder<R>)>>>;
156type Responder<R> = oneshot::Sender<Vec<<R as Request>::Response>>;
157type Callback<R> = Sender<(Vec<R>, Responder<R>)>;
158type Store = Arc<TokioMutex<State>>;
159type PBFut<Output> = Pin<Box<dyn Future<Output = Output> + Send>>;
160
161struct CompilerTick<'a, R: Request, Output>{
162    commands: Commands<R, Output>,
163    running: InProgress<R, Output>,
164    store: Store,
165    handler: HandlerOnce<'a, R>,
166}
167impl<'a, R: Request, Output: Send + 'static> CompilerTick<'a, R, Output> {
168    pub fn new(
169        commands: Commands<R, Output>,
170        running: InProgress<R, Output>,
171        store: Store,
172        handler: HandlerOnce<'a, R> 
173    ) -> Self {
174        CompilerTick{commands, running, store, handler}
175    }
176
177    pub async fn run(self) -> BTreeMap<u64, Output> {
178        let mut results = BTreeMap::default();
179        let commands = std::mem::take(&mut *self.commands.lock().unwrap());
180        let store = self.store.clone();
181        for (id, c) in commands {
182            match Running::new(c, &store).await {
183                RunningResult::Ready(result) => {results.insert(id, result);},
184                RunningResult::Requesting(future, requests, responder) => {self.running.lock().unwrap().insert(id, (future, requests, responder));}
185            }
186        }
187        let running = std::mem::take(&mut *self.running.lock().unwrap());
188        let (running, batch): (BTreeMap<_, _>, Vec<_>) = running.into_iter().map(
189            |(id, (future, requests, responder))| ((id, (future, responder)), requests) 
190        ).unzip();
191        let counts: Vec<_> = batch.iter().map(|b| b.len()).collect();
192        let mut responses: VecDeque<R::Response> = (self.handler)(
193            batch.into_iter().flatten().collect()
194        ).await.into();
195        for ((id, (future, responder)), count) in running.into_iter().zip(counts) {
196            responder.send({
197                let mut r = Vec::new();
198                for _ in 0..count {
199                    r.push(responses.pop_front().unwrap());
200                }
201                r
202            }).unwrap();
203            match future.await {
204                RunningResult::Ready(result) => {results.insert(id, result);},
205                RunningResult::Requesting(future, requests, responder) => {self.running.lock().unwrap().insert(id, (future, requests, responder));}
206            }
207        }
208        results
209    }
210}
211enum RunningResult<R: Request, Output> {
212    Ready(Output),
213    Requesting(Pin<Box<Running<R, Output>>>, Vec<R>, Responder<R>)
214}
215
216struct Running<R: Request, Output>(Option<PBFut<Output>>, Option<Receiver<(Vec<R>, Responder<R>)>>);
217impl<R: Request, Output: Send + 'static> Running<R, Output> {
218    pub fn new(m: Box<dyn _Command<R, Output>>, store: &Store) -> Self {
219        let (callback, rx) = mpsc::channel(1);
220        let context = Context{store: store.clone(), callback};
221        Running(Some(Box::pin(m.run_m(context))), Some(rx))
222    }
223}
224
225impl<R: Request, Output: 'static> Future for Running<R, Output> {
226    type Output = RunningResult<R, Output>;
227    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
228        match self.0.as_mut().unwrap().as_mut().poll(cx) {
229            Poll::Ready(r) => Poll::Ready(RunningResult::Ready(r)),
230            Poll::Pending => {
231                if let Ok(tx) = self.1.as_mut().unwrap().try_recv() {
232                    Poll::Ready(RunningResult::Requesting(Box::pin(Running(self.0.take(), self.1.take())), tx.0, tx.1))
233                } else {Poll::Pending}
234            }
235        }
236    }
237}
238
239trait _Command<R: Request, Output: Send + 'static>: Send {
240    fn run_m(self: Box<Self>, ctx: Context<R>) -> PBFut<Output>;
241}
242
243impl<R: Request, C: Command<R>> Command<R> for Vec<C> {
244    type Output = Vec<C::Output>;
245    async fn run(self, ctx: Context<R>) -> Self::Output {
246        Compiler::run_in_order(
247            self.into_iter().map(|c| Box::new(c) as Box<dyn _Command<R, C::Output>>).collect(),
248            ctx.store.clone(),
249            ctx,
250        ).await
251    }
252}
253
254impl<R: Request, C: Command<R>> _Command<R, C::Output> for C {
255    fn run_m(self: Box<Self>, ctx: Context<R>) -> PBFut<C::Output> {
256        Box::pin(Command::run(*self, ctx))
257    }
258}
259
260impl<R: Request, Output: Any + Send> Command<R> for Box<dyn _Command<R, Output>> {
261    type Output = Box<dyn Any + Send>;
262    async fn run(self, ctx: Context<R>) -> Self::Output {
263        Box::new(self.run_m(ctx).await) as Box<dyn Any + Send>
264    }
265}
266
267trait AnyRequest<R: Request, Output> {fn any(self) -> Box<dyn _Command<R, Box<dyn Any + Send>>>;}
268impl<R: Request, Output: Send + 'static, M: _Command<R, Output> + 'static> AnyRequest<R, Output> for M {
269    fn any(self) -> Box<dyn _Command<R, Box<dyn Any + Send>>> {
270        Box::new(Box::new(self) as Box<dyn _Command<R, Output>>)
271    }
272}
273
274macro_rules! impl_result_tuple {
275    (
276        ($t_head:ident, $( $t:ident ),+);
277        ($tt_head:ident, $($tt:ident),+);
278        ($i_head:tt, $( $i:tt ),+)
279    ) => {
280        impl<
281            R: Request,
282            $t_head: Any + Send, $( $t: Any + Send ),+,
283            $tt_head: Command<R, Output = $t_head> + 'static, $( $tt: Command<R, Output = $t> + 'static ),+
284        > Command<R> for ($tt_head, $( $tt ),+) {
285            type Output = ($t_head, $( $t ),+);
286            async fn run(self, ctx: Context<R>) -> ($t_head, $( $t ),+) {
287                let mut results = Compiler::run_in_order(vec![
288                    $( self.$i.any() ),+, self.$i_head.any()
289                ], ctx.store.clone(), ctx).await;
290                (
291                    *results.remove(0).downcast::<$t_head>().unwrap(),
292                    $( {*results.remove(0).downcast::<$t>().unwrap()} ),+
293                )
294            }
295        }
296        impl_result_tuple!(($($t),+); ($($tt),+); ($($i),+));
297    };
298
299    (
300        ($t:ident);
301        ($tt:ident);
302        ($i:tt)
303    ) => {}
304}
305impl_result_tuple!((T0, T1, T2, T3, T4, T5, T6, T7); (M0, M1, M2, M3, M4, M5, M6, M7); (7, 6, 5, 4, 3, 2, 1, 0));