lolraft/process/
query_queue.rs

1use super::*;
2
3use std::collections::BTreeMap;
4
5pub struct Query {
6    pub message: Bytes,
7    pub user_completion: completion::UserCompletion,
8}
9
10pub struct Inner {
11    app: Ref<App>,
12    q: spin::Mutex<Impl>,
13}
14
15#[derive(shrinkwraprs::Shrinkwrap, Clone)]
16pub struct QueryQueue(Arc<Inner>);
17impl QueryQueue {
18    pub fn new(app: Ref<App>) -> Self {
19        let inner = Inner {
20            app,
21            q: Impl::new().into(),
22        };
23        Self(inner.into())
24    }
25}
26
27impl QueryQueue {
28    /// Register a query to be executed when the read index reaches `read_index`.
29    /// `read_index` is the index of the commit pointer of when the query is submitted.
30    pub fn register(&self, read_index: Index, query: Query) {
31        let mut q = self.q.lock();
32        q.register(read_index, query);
33    }
34
35    /// Execute awaiting queries whose `read_index` ∈ `[, index]` in parallel.
36    pub fn execute(&self, index: Index) -> bool {
37        let mut q = self.q.lock();
38        q.execute(index, &self.app)
39    }
40}
41
42pub struct Impl {
43    reserved: BTreeMap<Index, Vec<Query>>,
44}
45
46impl Impl {
47    fn new() -> Self {
48        Self {
49            reserved: BTreeMap::new(),
50        }
51    }
52
53    fn register(&mut self, read_index: Index, query: Query) {
54        self.reserved
55            .entry(read_index)
56            .or_insert(vec![])
57            .push(query);
58    }
59
60    fn execute(&mut self, index: Index, app: &App) -> bool {
61        let queries = {
62            let mut out = vec![];
63            let tgt_indexes: Vec<Index> = self.reserved.range(..=index).map(|(k, _)| *k).collect();
64            for idx in tgt_indexes {
65                if let Some(queries) = self.reserved.remove(&idx) {
66                    for query in queries {
67                        out.push((query, app.clone()));
68                    }
69                }
70            }
71            out
72        };
73
74        if queries.is_empty() {
75            return false;
76        }
77
78        let futs = queries.into_iter().map(
79            |(
80                Query {
81                    message,
82                    user_completion,
83                },
84                app,
85            )| async move {
86                // The `completion` of the failed queries are dropped
87                // which just results in failing on the client side.
88                if let Ok(resp) = app.process_read(&message).await {
89                    user_completion.complete_with(resp).ok();
90                }
91            },
92        );
93
94        for fut in futs {
95            tokio::spawn(fut);
96        }
97
98        true
99    }
100}