p2panda_store/orderer/
sqlite.rs1use std::collections::HashSet;
4use std::fmt::Display;
5use std::hash::Hash as StdHash;
6use std::str::FromStr;
7
8use p2panda_core::Hash;
9use sqlx::{query, query_as};
10
11use crate::orderer::OrdererStore;
12#[cfg(any(test, feature = "test_utils"))]
13use crate::orderer::OrdererTestExt;
14use crate::sqlite::{DecodeError, SqliteError, SqliteStore};
15
16impl<ID> OrdererStore<ID> for SqliteStore
17where
18 ID: Eq + Ord + StdHash + Display + FromStr,
19{
20 type Error = SqliteError;
21
22 async fn mark_ready(&self, id: ID) -> Result<bool, Self::Error> {
23 self.tx(async |tx| {
24 let queue_index = {
25 let last_index: (i64,) = query_as(
26 "
27 SELECT
28 MAX(queue_index)
29 FROM
30 orderer_ready_v1
31 ",
32 )
33 .fetch_one(&mut **tx)
34 .await?;
35
36 last_index.0 + 1
38 };
39
40 let in_queue = true;
41
42 let result = query(
44 "
45 INSERT OR IGNORE
46 INTO
47 orderer_ready_v1 (
48 id,
49 queue_index,
50 in_queue
51 )
52 VALUES
53 (?, ?, ?)
54 ",
55 )
56 .bind(id.to_string())
57 .bind(queue_index)
58 .bind(in_queue)
59 .execute(&mut **tx)
60 .await?;
61
62 if result.rows_affected() == 0 {
72 let was_in_queue: (bool,) = query_as(
73 "
74 SELECT
75 in_queue
76 FROM
77 orderer_ready_v1
78 WHERE
79 id = ?
80 ",
81 )
82 .bind(id.to_string())
83 .fetch_one(&mut **tx)
84 .await?;
85
86 if was_in_queue.0 {
88 return Ok(false);
89 }
90
91 query(
93 "
94 UPDATE
95 orderer_ready_v1
96 SET
97 queue_index = ?,
98 in_queue = ?
99 WHERE
100 id = ?
101 ",
102 )
103 .bind(queue_index)
104 .bind(in_queue)
105 .bind(id.to_string())
106 .execute(&mut **tx)
107 .await?;
108
109 Ok(true)
110 } else {
111 Ok(result.rows_affected() > 0)
112 }
113 })
114 .await
115 }
116
117 async fn mark_pending(
118 &self,
119 child_id: ID,
120 mut parent_ids: Vec<ID>,
121 ) -> Result<bool, Self::Error> {
122 self.tx(async |tx| {
123 let child_id = child_id.to_string();
124
125 parent_ids.sort();
127
128 let set_digest = Hash::digest({
130 let mut buf: Vec<u8> = Vec::new();
131 buf.extend_from_slice(child_id.as_bytes());
132 for id in &parent_ids {
133 buf.extend_from_slice(id.to_string().as_bytes());
134 }
135 buf
136 })
137 .to_string();
138
139 let mut insertion_occured = false;
140
141 for id in &parent_ids {
142 let is_ready = query(
144 "
145 SELECT
146 1
147 FROM
148 orderer_ready_v1
149 WHERE
150 id = ?
151 ",
152 )
153 .bind(id.to_string())
154 .fetch_optional(&mut **tx)
155 .await?
156 .is_some();
157
158 if is_ready {
159 continue;
160 }
161
162 let id = id.to_string();
163
164 for parent_id in &parent_ids {
166 let parent_id = parent_id.to_string();
167
168 let result = query(
169 "
170 INSERT OR IGNORE
171 INTO
172 orderer_pending_v1 (
173 id,
174 child_id,
175 parent_id,
176 set_digest
177 )
178 VALUES
179 (?, ?, ?, ?)
180 ",
181 )
182 .bind(&id)
183 .bind(&child_id)
184 .bind(&parent_id)
185 .bind(&set_digest)
186 .execute(&mut **tx)
187 .await?;
188
189 if result.rows_affected() > 0 {
190 insertion_occured = true;
191 }
192 }
193 }
194
195 Ok(insertion_occured)
196 })
197 .await
198 }
199
200 async fn get_next_pending(
201 &self,
202 id: ID,
203 ) -> Result<Option<HashSet<(ID, Vec<ID>)>>, Self::Error> {
204 self.tx(async |tx| {
205 let sets: Vec<(String, String)> = query_as(
207 "
208 SELECT
209 DISTINCT child_id,
210 set_digest
211 FROM
212 orderer_pending_v1
213 WHERE
214 id = ?
215 ",
216 )
217 .bind(id.to_string())
218 .fetch_all(&mut **tx)
219 .await?;
220
221 if sets.is_empty() {
222 return Ok(None);
223 }
224
225 let mut result = HashSet::new();
226
227 for (child_id, set_digest) in sets {
229 let parent_ids: Vec<(String,)> = query_as(
230 "
231 SELECT
232 parent_id
233 FROM
234 orderer_pending_v1
235 WHERE
236 child_id = ?
237 AND set_digest = ?
238 ORDER BY
239 parent_id
240 ",
241 )
242 .bind(&child_id)
243 .bind(&set_digest)
244 .fetch_all(&mut **tx)
245 .await?;
246
247 let child_id = ID::from_str(&child_id)
248 .map_err(|_| SqliteError::Decode("child_id".into(), DecodeError::FromStr))?;
249
250 let mut dependencies = Vec::new();
251 for (parent_id,) in parent_ids {
252 let parent_id = ID::from_str(&parent_id).map_err(|_| {
253 SqliteError::Decode("parent_id".into(), DecodeError::FromStr)
254 })?;
255 dependencies.push(parent_id);
256 }
257
258 result.insert((child_id, dependencies));
259 }
260
261 Ok(Some(result))
262 })
263 .await
264 }
265
266 async fn take_next_ready(&self) -> Result<Option<ID>, Self::Error> {
267 self.tx(async |tx| {
268 let row: Option<(String,)> = query_as(
269 "
270 SELECT
271 id
272 FROM
273 orderer_ready_v1
274 WHERE
275 in_queue = TRUE
276 ORDER BY
277 queue_index ASC
278 LIMIT
279 1
280 ",
281 )
282 .fetch_optional(&mut **tx)
283 .await?;
284
285 let Some((id_str,)) = row else {
286 return Ok(None);
287 };
288
289 let id = ID::from_str(&id_str)
290 .map_err(|_| SqliteError::Decode("id".into(), DecodeError::FromStr))?;
291
292 query(
293 "
294 UPDATE
295 orderer_ready_v1
296 SET
297 in_queue = FALSE
298 WHERE
299 id = ?
300 ",
301 )
302 .bind(&id_str)
303 .execute(&mut **tx)
304 .await?;
305
306 Ok(Some(id))
307 })
308 .await
309 }
310
311 async fn remove_pending(&self, id: ID) -> Result<bool, Self::Error> {
312 self.tx(async |tx| {
313 let result = query(
314 "
315 DELETE FROM
316 orderer_pending_v1
317 WHERE
318 id = ?
319 ",
320 )
321 .bind(id.to_string())
322 .execute(&mut **tx)
323 .await?;
324
325 Ok(result.rows_affected() > 0)
326 })
327 .await
328 }
329
330 async fn ready(&self, dependencies: &[ID]) -> Result<bool, Self::Error> {
331 self.tx(async |tx| {
332 let sql = format!(
333 "
334 SELECT
335 COUNT(id)
336 FROM
337 orderer_ready_v1
338 WHERE id IN ({})
339 ",
340 dependencies
341 .iter()
342 .map(|dep| format!("'{dep}'"))
343 .collect::<Vec<String>>()
344 .join(",")
345 );
346
347 let result: (i64,) = query_as(&sql).fetch_one(&mut **tx).await?;
348 Ok(result.0 as usize == dependencies.len())
349 })
350 .await
351 }
352}
353
354#[cfg(any(test, feature = "test_utils"))]
355impl OrdererTestExt for SqliteStore {
356 async fn ready_len(&self) -> usize {
357 self.tx(async |tx| {
358 let row: (i64,) = query_as(
359 "
360 SELECT
361 COUNT(id)
362 FROM
363 orderer_ready_v1
364 ",
365 )
366 .fetch_one(&mut **tx)
367 .await?;
368 Ok(row.0 as usize)
369 })
370 .await
371 .unwrap()
372 }
373
374 async fn ready_queue_len(&self) -> usize {
375 self.tx(async |tx| {
376 let row: (i64,) = query_as(
377 "
378 SELECT
379 COUNT(id)
380 FROM
381 orderer_ready_v1
382 WHERE
383 in_queue = TRUE
384 ",
385 )
386 .fetch_one(&mut **tx)
387 .await?;
388 Ok(row.0 as usize)
389 })
390 .await
391 .unwrap()
392 }
393
394 async fn pending_len(&self) -> usize {
395 self.tx(async |tx| {
396 let row: (i64,) = query_as(
397 "
398 SELECT
399 COUNT(DISTINCT id)
400 FROM
401 orderer_pending_v1
402 ",
403 )
404 .fetch_one(&mut **tx)
405 .await?;
406 Ok(row.0 as usize)
407 })
408 .await
409 .unwrap()
410 }
411}