lolraft/process/
query_queue.rs1use super::*;
2
3use std::collections::BTreeMap;
4
5pub struct Query {
6 pub message: Bytes,
7 pub user_completion: completion::UserCompletion,
8}
9
10pub struct Inner {
11 app: Ref<App>,
12 q: spin::Mutex<Impl>,
13}
14
15#[derive(shrinkwraprs::Shrinkwrap, Clone)]
16pub struct QueryQueue(Arc<Inner>);
17impl QueryQueue {
18 pub fn new(app: Ref<App>) -> Self {
19 let inner = Inner {
20 app,
21 q: Impl::new().into(),
22 };
23 Self(inner.into())
24 }
25}
26
27impl QueryQueue {
28 pub fn register(&self, read_index: Index, query: Query) {
31 let mut q = self.q.lock();
32 q.register(read_index, query);
33 }
34
35 pub fn execute(&self, index: Index) -> bool {
37 let mut q = self.q.lock();
38 q.execute(index, &self.app)
39 }
40}
41
42pub struct Impl {
43 reserved: BTreeMap<Index, Vec<Query>>,
44}
45
46impl Impl {
47 fn new() -> Self {
48 Self {
49 reserved: BTreeMap::new(),
50 }
51 }
52
53 fn register(&mut self, read_index: Index, query: Query) {
54 self.reserved
55 .entry(read_index)
56 .or_insert(vec![])
57 .push(query);
58 }
59
60 fn execute(&mut self, index: Index, app: &App) -> bool {
61 let queries = {
62 let mut out = vec![];
63 let tgt_indexes: Vec<Index> = self.reserved.range(..=index).map(|(k, _)| *k).collect();
64 for idx in tgt_indexes {
65 if let Some(queries) = self.reserved.remove(&idx) {
66 for query in queries {
67 out.push((query, app.clone()));
68 }
69 }
70 }
71 out
72 };
73
74 if queries.is_empty() {
75 return false;
76 }
77
78 let futs = queries.into_iter().map(
79 |(
80 Query {
81 message,
82 user_completion,
83 },
84 app,
85 )| async move {
86 if let Ok(resp) = app.process_read(&message).await {
89 user_completion.complete_with(resp).ok();
90 }
91 },
92 );
93
94 for fut in futs {
95 tokio::spawn(fut);
96 }
97
98 true
99 }
100}