Skip to main content

p2panda_store/orderer/
sqlite.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::collections::HashSet;
4use std::fmt::Display;
5use std::hash::Hash as StdHash;
6use std::str::FromStr;
7
8use p2panda_core::Hash;
9use sqlx::{query, query_as};
10
11use crate::orderer::OrdererStore;
12#[cfg(any(test, feature = "test_utils"))]
13use crate::orderer::OrdererTestExt;
14use crate::sqlite::{DecodeError, SqliteError, SqliteStore};
15
16impl<ID> OrdererStore<ID> for SqliteStore
17where
18    ID: Eq + Ord + StdHash + Display + FromStr,
19{
20    type Error = SqliteError;
21
22    async fn mark_ready(&self, id: ID) -> Result<bool, Self::Error> {
23        self.tx(async |tx| {
24            let queue_index = {
25                let last_index: (i64,) = query_as(
26                    "
27                    SELECT
28                        MAX(queue_index)
29                    FROM
30                        orderer_ready_v1
31                    ",
32                )
33                .fetch_one(&mut **tx)
34                .await?;
35
36                // This returns "0" (default) if no rows are in the database.
37                last_index.0 + 1
38            };
39
40            let in_queue = true;
41
42            // Ignore insertion when hash already exists (UNIQUE constraint).
43            let result = query(
44                "
45                INSERT OR IGNORE
46                INTO
47                    orderer_ready_v1 (
48                        id,
49                        queue_index,
50                        in_queue
51                    )
52                VALUES
53                    (?, ?, ?)
54                ",
55            )
56            .bind(id.to_string())
57            .bind(queue_index)
58            .bind(in_queue)
59            .execute(&mut **tx)
60            .await?;
61
62            // If no rows have been affected by this INSERT we know the item already exists in the
63            // "ready" table.
64            //
65            // This means that we've tried to mark an already existing item as ready _again_ and
66            // can happen when an item got re-processed by the orderer.
67            //
68            // Since we want the system to always behave the same and allow idempotency (and not
69            // "swallow" items when they got re-processed), we check if this "ready" item is still
70            // in the queue. If not, we re-queue it.
71            if result.rows_affected() == 0 {
72                let was_in_queue: (bool,) = query_as(
73                    "
74                    SELECT
75                        in_queue
76                    FROM
77                        orderer_ready_v1
78                    WHERE
79                        id = ?
80                    ",
81                )
82                .bind(id.to_string())
83                .fetch_one(&mut **tx)
84                .await?;
85
86                // Do nothing when item is still in queue (waiting to be picked up).
87                if was_in_queue.0 {
88                    return Ok(false);
89                }
90
91                // Re-queue item otherwise with new queue index.
92                query(
93                    "
94                    UPDATE
95                        orderer_ready_v1
96                    SET
97                        queue_index = ?,
98                        in_queue = ?
99                    WHERE
100                        id = ?
101                    ",
102                )
103                .bind(queue_index)
104                .bind(in_queue)
105                .bind(id.to_string())
106                .execute(&mut **tx)
107                .await?;
108
109                Ok(true)
110            } else {
111                Ok(result.rows_affected() > 0)
112            }
113        })
114        .await
115    }
116
117    async fn mark_pending(
118        &self,
119        child_id: ID,
120        mut parent_ids: Vec<ID>,
121    ) -> Result<bool, Self::Error> {
122        self.tx(async |tx| {
123            let child_id = child_id.to_string();
124
125            // Make hashing digest deterministic by sorting array first.
126            parent_ids.sort();
127
128            // Derive a hash from id (child) and all it's dependencies (parents).
129            let set_digest = Hash::digest({
130                let mut buf: Vec<u8> = Vec::new();
131                buf.extend_from_slice(child_id.as_bytes());
132                for id in &parent_ids {
133                    buf.extend_from_slice(id.to_string().as_bytes());
134                }
135                buf
136            })
137            .to_string();
138
139            let mut insertion_occured = false;
140
141            for id in &parent_ids {
142                // Ignore items which are already marked as "ready".
143                let is_ready = query(
144                    "
145                    SELECT
146                        1
147                    FROM
148                        orderer_ready_v1
149                    WHERE
150                        id = ?
151                    ",
152                )
153                .bind(id.to_string())
154                .fetch_optional(&mut **tx)
155                .await?
156                .is_some();
157
158                if is_ready {
159                    continue;
160                }
161
162                let id = id.to_string();
163
164                // Insert all dependencies for this non-ready dependency key with a set digest.
165                for parent_id in &parent_ids {
166                    let parent_id = parent_id.to_string();
167
168                    let result = query(
169                        "
170                        INSERT OR IGNORE
171                        INTO
172                            orderer_pending_v1 (
173                                id,
174                                child_id,
175                                parent_id,
176                                set_digest
177                            )
178                        VALUES
179                            (?, ?, ?, ?)
180                        ",
181                    )
182                    .bind(&id)
183                    .bind(&child_id)
184                    .bind(&parent_id)
185                    .bind(&set_digest)
186                    .execute(&mut **tx)
187                    .await?;
188
189                    if result.rows_affected() > 0 {
190                        insertion_occured = true;
191                    }
192                }
193            }
194
195            Ok(insertion_occured)
196        })
197        .await
198    }
199
200    async fn get_next_pending(
201        &self,
202        id: ID,
203    ) -> Result<Option<HashSet<(ID, Vec<ID>)>>, Self::Error> {
204        self.tx(async |tx| {
205            // Find all unique (child_id, set_digest) combinations that depend on the given id.
206            let sets: Vec<(String, String)> = query_as(
207                "
208                SELECT
209                    DISTINCT child_id,
210                    set_digest
211                FROM
212                    orderer_pending_v1
213                WHERE
214                    id = ?
215                ",
216            )
217            .bind(id.to_string())
218            .fetch_all(&mut **tx)
219            .await?;
220
221            if sets.is_empty() {
222                return Ok(None);
223            }
224
225            let mut result = HashSet::new();
226
227            // For each set, get the complete original dependency list.
228            for (child_id, set_digest) in sets {
229                let parent_ids: Vec<(String,)> = query_as(
230                    "
231                    SELECT
232                        parent_id
233                    FROM
234                        orderer_pending_v1
235                    WHERE
236                        child_id = ?
237                        AND set_digest = ?
238                    ORDER BY
239                        parent_id
240                    ",
241                )
242                .bind(&child_id)
243                .bind(&set_digest)
244                .fetch_all(&mut **tx)
245                .await?;
246
247                let child_id = ID::from_str(&child_id)
248                    .map_err(|_| SqliteError::Decode("child_id".into(), DecodeError::FromStr))?;
249
250                let mut dependencies = Vec::new();
251                for (parent_id,) in parent_ids {
252                    let parent_id = ID::from_str(&parent_id).map_err(|_| {
253                        SqliteError::Decode("parent_id".into(), DecodeError::FromStr)
254                    })?;
255                    dependencies.push(parent_id);
256                }
257
258                result.insert((child_id, dependencies));
259            }
260
261            Ok(Some(result))
262        })
263        .await
264    }
265
266    async fn take_next_ready(&self) -> Result<Option<ID>, Self::Error> {
267        self.tx(async |tx| {
268            let row: Option<(String,)> = query_as(
269                "
270                SELECT
271                    id
272                FROM
273                    orderer_ready_v1
274                WHERE
275                    in_queue = TRUE
276                ORDER BY
277                    queue_index ASC
278                LIMIT
279                    1
280                ",
281            )
282            .fetch_optional(&mut **tx)
283            .await?;
284
285            let Some((id_str,)) = row else {
286                return Ok(None);
287            };
288
289            let id = ID::from_str(&id_str)
290                .map_err(|_| SqliteError::Decode("id".into(), DecodeError::FromStr))?;
291
292            query(
293                "
294                UPDATE
295                    orderer_ready_v1
296                SET
297                    in_queue = FALSE
298                WHERE
299                    id = ?
300                ",
301            )
302            .bind(&id_str)
303            .execute(&mut **tx)
304            .await?;
305
306            Ok(Some(id))
307        })
308        .await
309    }
310
311    async fn remove_pending(&self, id: ID) -> Result<bool, Self::Error> {
312        self.tx(async |tx| {
313            let result = query(
314                "
315                DELETE FROM
316                    orderer_pending_v1
317                WHERE
318                    id = ?
319                ",
320            )
321            .bind(id.to_string())
322            .execute(&mut **tx)
323            .await?;
324
325            Ok(result.rows_affected() > 0)
326        })
327        .await
328    }
329
330    async fn ready(&self, dependencies: &[ID]) -> Result<bool, Self::Error> {
331        self.tx(async |tx| {
332            let sql = format!(
333                "
334                SELECT
335                    COUNT(id)
336                FROM
337                    orderer_ready_v1
338                WHERE id IN ({})
339                ",
340                dependencies
341                    .iter()
342                    .map(|dep| format!("'{dep}'"))
343                    .collect::<Vec<String>>()
344                    .join(",")
345            );
346
347            let result: (i64,) = query_as(&sql).fetch_one(&mut **tx).await?;
348            Ok(result.0 as usize == dependencies.len())
349        })
350        .await
351    }
352}
353
354#[cfg(any(test, feature = "test_utils"))]
355impl OrdererTestExt for SqliteStore {
356    async fn ready_len(&self) -> usize {
357        self.tx(async |tx| {
358            let row: (i64,) = query_as(
359                "
360                SELECT
361                    COUNT(id)
362                FROM
363                    orderer_ready_v1
364                ",
365            )
366            .fetch_one(&mut **tx)
367            .await?;
368            Ok(row.0 as usize)
369        })
370        .await
371        .unwrap()
372    }
373
374    async fn ready_queue_len(&self) -> usize {
375        self.tx(async |tx| {
376            let row: (i64,) = query_as(
377                "
378                SELECT
379                    COUNT(id)
380                FROM
381                    orderer_ready_v1
382                WHERE
383                    in_queue = TRUE
384                ",
385            )
386            .fetch_one(&mut **tx)
387            .await?;
388            Ok(row.0 as usize)
389        })
390        .await
391        .unwrap()
392    }
393
394    async fn pending_len(&self) -> usize {
395        self.tx(async |tx| {
396            let row: (i64,) = query_as(
397                "
398                SELECT
399                    COUNT(DISTINCT id)
400                FROM
401                    orderer_pending_v1
402                ",
403            )
404            .fetch_one(&mut **tx)
405            .await?;
406            Ok(row.0 as usize)
407        })
408        .await
409        .unwrap()
410    }
411}