p2panda_store/cursors/
sqlite.rs1use p2panda_core::cbor::{decode_cbor, encode_cbor};
4use p2panda_core::identity::Author;
5use p2panda_core::{Cursor, LogId};
6use sqlx::{query, query_scalar};
7
8use crate::cursors::CursorStore;
9use crate::{SqliteError, SqliteStore};
10
11impl<A, L> CursorStore<A, L> for SqliteStore
12where
13 A: Author,
14 L: LogId,
15{
16 type Error = SqliteError;
17
18 async fn get_cursor(&self, name: impl AsRef<str>) -> Result<Option<Cursor<A, L>>, Self::Error> {
20 let state_bytes: Option<Vec<u8>> = self
21 .execute(async |pool| {
22 query_scalar(
23 "
24 SELECT
25 cursor
26 FROM
27 cursors_v1
28 WHERE
29 name = ?
30 ",
31 )
32 .bind(name.as_ref())
33 .fetch_optional(pool)
34 .await
35 .map_err(SqliteError::Sqlite)
36 })
37 .await?;
38
39 if let Some(bytes) = state_bytes {
40 let cursor: Cursor<A, L> = decode_cbor(&bytes[..])
41 .map_err(|err| SqliteError::Decode("cursor".into(), err.into()))?;
42 Ok(Some(cursor))
43 } else {
44 Ok(None)
45 }
46 }
47
48 async fn set_cursor(&self, cursor: &Cursor<A, L>) -> Result<(), Self::Error> {
50 self.tx(async |tx| {
51 query(
52 "
53 INSERT
54 INTO
55 cursors_v1(name, cursor)
56 VALUES
57 (?, ?)
58 ON CONFLICT(name)
59 DO UPDATE
60 SET
61 cursor = EXCLUDED.cursor
62 ",
63 )
64 .bind(cursor.name())
65 .bind(
66 encode_cbor(&cursor)
67 .map_err(|err| SqliteError::Encode("cursor".to_string(), err))?,
68 )
69 .execute(&mut **tx)
70 .await
71 .map_err(SqliteError::Sqlite)
72 })
73 .await?;
74
75 Ok(())
76 }
77
78 async fn delete_cursor(&self, name: impl AsRef<str>) -> Result<(), Self::Error> {
80 self.tx(async |tx| {
81 query(
82 "
83 DELETE FROM
84 cursors_v1
85 WHERE
86 name = ?
87 ",
88 )
89 .bind(name.as_ref())
90 .execute(&mut **tx)
91 .await
92 .map_err(SqliteError::Sqlite)
93 })
94 .await?;
95
96 Ok(())
97 }
98}