Skip to main content

p2panda_store/operations/
sqlite.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use p2panda_core::cbor::{decode_cbor, encode_cbor};
4use p2panda_core::hash::{Hash, HashError};
5use p2panda_core::{Extensions, LogId, Operation};
6use sqlx::{FromRow, query, query_as};
7
8use crate::operations::OperationStore;
9use crate::sqlite::{SqliteError, SqliteStore};
10
11const GET_OPERATION: &str = "
12    SELECT
13        hash,
14        header,
15        body
16    FROM
17        operations_v1
18    WHERE
19        hash = ?
20";
21
22const HAS_OPERATION: &str = "
23    SELECT
24        1
25    FROM
26        operations_v1
27    WHERE
28        hash = ?
29";
30
31impl<E, L> OperationStore<Operation<E>, Hash, L> for SqliteStore
32where
33    E: Extensions,
34    L: LogId,
35{
36    type Error = SqliteError;
37
38    async fn insert_operation(
39        &self,
40        id: &Hash,
41        operation: &Operation<E>,
42        log_id: &L,
43    ) -> Result<bool, Self::Error> {
44        let result = self
45            .tx(async |tx| {
46                query(
47                    "
48                    INSERT OR IGNORE
49                    INTO
50                        operations_v1 (
51                            hash,
52                            log_id,
53                            version,
54                            verifying_key,
55                            signature,
56                            payload_size,
57                            payload_hash,
58                            timestamp,
59                            seq_num,
60                            header,
61                            header_size,
62                            body
63                        )
64                    VALUES
65                        (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
66                    ",
67                )
68                .bind(id.to_hex())
69                .bind(
70                    encode_cbor(&log_id)
71                        .map_err(|err| SqliteError::Encode("log id".to_string(), err))?,
72                )
73                .bind(operation.header.version.to_string())
74                .bind(operation.header.verifying_key.to_hex())
75                .bind(operation.header.signature.map(|sig| sig.to_hex()))
76                .bind(operation.header.payload_size.to_string())
77                .bind(operation.header.payload_hash.map(|hash| hash.to_hex()))
78                .bind(operation.header.timestamp.to_string())
79                .bind(operation.header.seq_num.to_string())
80                .bind(
81                    encode_cbor(&operation.header)
82                        .map_err(|err| SqliteError::Encode("header".to_string(), err))?,
83                )
84                .bind(operation.header.to_bytes().len().to_string())
85                .bind(operation.body().map(|body| body.to_bytes()))
86                .execute(&mut **tx)
87                .await
88                .map_err(SqliteError::Sqlite)
89            })
90            .await?;
91
92        Ok(result.rows_affected() > 0)
93    }
94
95    async fn get_operation(&self, id: &Hash) -> Result<Option<Operation<E>>, Self::Error> {
96        let result = self
97            .execute(async |pool| {
98                query_as::<_, OperationRow>(GET_OPERATION)
99                    .bind(id.to_hex())
100                    .fetch_optional(pool)
101                    .await
102                    .map_err(SqliteError::Sqlite)
103            })
104            .await?;
105
106        match result {
107            Some(row) => Ok(Some(row.try_into()?)),
108            None => Ok(None),
109        }
110    }
111
112    // TODO: In the future we may be able to remove this `_tx` variant of the query by instead
113    // requiring that API users exlicitly handle transactions themselves.
114    //
115    // See: https://github.com/p2panda/p2panda/issues/1065
116    async fn get_operation_tx(&self, id: &Hash) -> Result<Option<Operation<E>>, Self::Error> {
117        let result = self
118            .tx(async |tx| {
119                query_as::<_, OperationRow>(GET_OPERATION)
120                    .bind(id.to_hex())
121                    .fetch_optional(&mut **tx)
122                    .await
123                    .map_err(SqliteError::Sqlite)
124            })
125            .await?;
126
127        match result {
128            Some(row) => Ok(Some(row.try_into()?)),
129            None => Ok(None),
130        }
131    }
132
133    async fn has_operation(&self, id: &Hash) -> Result<bool, Self::Error> {
134        let result = self
135            .execute(async |pool| {
136                query(HAS_OPERATION)
137                    .bind(id.to_hex())
138                    .fetch_optional(pool)
139                    .await
140                    .map_err(SqliteError::Sqlite)
141            })
142            .await?;
143
144        Ok(result.is_some())
145    }
146
147    async fn has_operation_tx(&self, id: &Hash) -> Result<bool, Self::Error> {
148        let result = self
149            .tx(async |tx| {
150                query(HAS_OPERATION)
151                    .bind(id.to_hex())
152                    .fetch_optional(&mut **tx)
153                    .await
154                    .map_err(SqliteError::Sqlite)
155            })
156            .await?;
157
158        Ok(result.is_some())
159    }
160
161    async fn delete_operation(&self, id: &Hash) -> Result<bool, Self::Error> {
162        let result = self
163            .tx(async |tx| {
164                query(
165                    "
166                    DELETE FROM
167                        operations_v1
168                    WHERE
169                        hash = ?
170                    ",
171                )
172                .bind(id.to_hex())
173                .execute(&mut **tx)
174                .await
175                .map_err(SqliteError::Sqlite)
176            })
177            .await?;
178
179        Ok(result.rows_affected() > 0)
180    }
181
182    async fn delete_operation_payload(&self, id: &Hash) -> Result<bool, Self::Error> {
183        let result = query(
184            "
185            UPDATE
186                operations_v1
187            SET
188                body = NULL
189            WHERE
190                operations_v1.hash = ?
191            ",
192        )
193        .bind(id.to_hex())
194        .execute(&self.pool)
195        .await?;
196
197        Ok(result.rows_affected() > 0)
198    }
199}
200
201/// Single operation row as it is inserted in the SQLite database.
202#[derive(Clone, Debug, FromRow)]
203pub(crate) struct OperationRow {
204    hash: String,
205    pub(crate) header: Vec<u8>,
206    body: Option<Vec<u8>>,
207}
208
209impl<E> TryFrom<OperationRow> for Operation<E>
210where
211    E: Extensions,
212{
213    type Error = SqliteError;
214
215    fn try_from(row: OperationRow) -> Result<Self, Self::Error> {
216        Ok(Operation {
217            hash: row
218                .hash
219                .parse()
220                .map_err(|err: HashError| SqliteError::Decode("hash".to_string(), err.into()))?,
221            header: decode_cbor(&row.header[..])
222                .map_err(|err| SqliteError::Decode("header".into(), err.into()))?,
223            body: row.body.map(|body| body.into()),
224        })
225    }
226}