Skip to main content

p2panda_store/cursors/
sqlite.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use p2panda_core::cbor::{decode_cbor, encode_cbor};
4use p2panda_core::identity::Author;
5use p2panda_core::{Cursor, LogId};
6use sqlx::{query, query_scalar};
7
8use crate::cursors::CursorStore;
9use crate::{SqliteError, SqliteStore};
10
11impl<A, L> CursorStore<A, L> for SqliteStore
12where
13    A: Author,
14    L: LogId,
15{
16    type Error = SqliteError;
17
18    /// Returns the cursor matching the given name from the database.
19    async fn get_cursor(&self, name: impl AsRef<str>) -> Result<Option<Cursor<A, L>>, Self::Error> {
20        let state_bytes: Option<Vec<u8>> = self
21            .execute(async |pool| {
22                query_scalar(
23                    "
24                    SELECT
25                        cursor
26                    FROM
27                        cursors_v1
28                    WHERE
29                        name = ?
30                    ",
31                )
32                .bind(name.as_ref())
33                .fetch_optional(pool)
34                .await
35                .map_err(SqliteError::Sqlite)
36            })
37            .await?;
38
39        if let Some(bytes) = state_bytes {
40            let cursor: Cursor<A, L> = decode_cbor(&bytes[..])
41                .map_err(|err| SqliteError::Decode("cursor".into(), err.into()))?;
42            Ok(Some(cursor))
43        } else {
44            Ok(None)
45        }
46    }
47
48    /// Inserts the given cursor into the database.
49    async fn set_cursor(&self, cursor: &Cursor<A, L>) -> Result<(), Self::Error> {
50        self.tx(async |tx| {
51            query(
52                "
53                INSERT
54                    INTO
55                        cursors_v1(name, cursor)
56                    VALUES
57                        (?, ?)
58                ON CONFLICT(name)
59                DO UPDATE
60                    SET
61                        cursor = EXCLUDED.cursor
62                ",
63            )
64            .bind(cursor.name())
65            .bind(
66                encode_cbor(&cursor)
67                    .map_err(|err| SqliteError::Encode("cursor".to_string(), err))?,
68            )
69            .execute(&mut **tx)
70            .await
71            .map_err(SqliteError::Sqlite)
72        })
73        .await?;
74
75        Ok(())
76    }
77
78    /// Deletes the cursor matching the given name from the database.
79    async fn delete_cursor(&self, name: impl AsRef<str>) -> Result<(), Self::Error> {
80        self.tx(async |tx| {
81            query(
82                "
83                DELETE FROM
84                    cursors_v1
85                WHERE
86                    name = ?
87                ",
88            )
89            .bind(name.as_ref())
90            .execute(&mut **tx)
91            .await
92            .map_err(SqliteError::Sqlite)
93        })
94        .await?;
95
96        Ok(())
97    }
98}