p2panda_store/operations/
sqlite.rs1use p2panda_core::cbor::{decode_cbor, encode_cbor};
4use p2panda_core::hash::{Hash, HashError};
5use p2panda_core::{Extensions, LogId, Operation};
6use sqlx::{FromRow, query, query_as};
7
8use crate::operations::OperationStore;
9use crate::sqlite::{SqliteError, SqliteStore};
10
11const GET_OPERATION: &str = "
12 SELECT
13 hash,
14 header,
15 body
16 FROM
17 operations_v1
18 WHERE
19 hash = ?
20";
21
22const HAS_OPERATION: &str = "
23 SELECT
24 1
25 FROM
26 operations_v1
27 WHERE
28 hash = ?
29";
30
31impl<E, L> OperationStore<Operation<E>, Hash, L> for SqliteStore
32where
33 E: Extensions,
34 L: LogId,
35{
36 type Error = SqliteError;
37
38 async fn insert_operation(
39 &self,
40 id: &Hash,
41 operation: &Operation<E>,
42 log_id: &L,
43 ) -> Result<bool, Self::Error> {
44 let result = self
45 .tx(async |tx| {
46 query(
47 "
48 INSERT OR IGNORE
49 INTO
50 operations_v1 (
51 hash,
52 log_id,
53 version,
54 verifying_key,
55 signature,
56 payload_size,
57 payload_hash,
58 timestamp,
59 seq_num,
60 header,
61 header_size,
62 body
63 )
64 VALUES
65 (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
66 ",
67 )
68 .bind(id.to_hex())
69 .bind(
70 encode_cbor(&log_id)
71 .map_err(|err| SqliteError::Encode("log id".to_string(), err))?,
72 )
73 .bind(operation.header.version.to_string())
74 .bind(operation.header.verifying_key.to_hex())
75 .bind(operation.header.signature.map(|sig| sig.to_hex()))
76 .bind(operation.header.payload_size.to_string())
77 .bind(operation.header.payload_hash.map(|hash| hash.to_hex()))
78 .bind(operation.header.timestamp.to_string())
79 .bind(operation.header.seq_num.to_string())
80 .bind(
81 encode_cbor(&operation.header)
82 .map_err(|err| SqliteError::Encode("header".to_string(), err))?,
83 )
84 .bind(operation.header.to_bytes().len().to_string())
85 .bind(operation.body().map(|body| body.to_bytes()))
86 .execute(&mut **tx)
87 .await
88 .map_err(SqliteError::Sqlite)
89 })
90 .await?;
91
92 Ok(result.rows_affected() > 0)
93 }
94
95 async fn get_operation(&self, id: &Hash) -> Result<Option<Operation<E>>, Self::Error> {
96 let result = self
97 .execute(async |pool| {
98 query_as::<_, OperationRow>(GET_OPERATION)
99 .bind(id.to_hex())
100 .fetch_optional(pool)
101 .await
102 .map_err(SqliteError::Sqlite)
103 })
104 .await?;
105
106 match result {
107 Some(row) => Ok(Some(row.try_into()?)),
108 None => Ok(None),
109 }
110 }
111
112 async fn get_operation_tx(&self, id: &Hash) -> Result<Option<Operation<E>>, Self::Error> {
117 let result = self
118 .tx(async |tx| {
119 query_as::<_, OperationRow>(GET_OPERATION)
120 .bind(id.to_hex())
121 .fetch_optional(&mut **tx)
122 .await
123 .map_err(SqliteError::Sqlite)
124 })
125 .await?;
126
127 match result {
128 Some(row) => Ok(Some(row.try_into()?)),
129 None => Ok(None),
130 }
131 }
132
133 async fn has_operation(&self, id: &Hash) -> Result<bool, Self::Error> {
134 let result = self
135 .execute(async |pool| {
136 query(HAS_OPERATION)
137 .bind(id.to_hex())
138 .fetch_optional(pool)
139 .await
140 .map_err(SqliteError::Sqlite)
141 })
142 .await?;
143
144 Ok(result.is_some())
145 }
146
147 async fn has_operation_tx(&self, id: &Hash) -> Result<bool, Self::Error> {
148 let result = self
149 .tx(async |tx| {
150 query(HAS_OPERATION)
151 .bind(id.to_hex())
152 .fetch_optional(&mut **tx)
153 .await
154 .map_err(SqliteError::Sqlite)
155 })
156 .await?;
157
158 Ok(result.is_some())
159 }
160
161 async fn delete_operation(&self, id: &Hash) -> Result<bool, Self::Error> {
162 let result = self
163 .tx(async |tx| {
164 query(
165 "
166 DELETE FROM
167 operations_v1
168 WHERE
169 hash = ?
170 ",
171 )
172 .bind(id.to_hex())
173 .execute(&mut **tx)
174 .await
175 .map_err(SqliteError::Sqlite)
176 })
177 .await?;
178
179 Ok(result.rows_affected() > 0)
180 }
181
182 async fn delete_operation_payload(&self, id: &Hash) -> Result<bool, Self::Error> {
183 let result = query(
184 "
185 UPDATE
186 operations_v1
187 SET
188 body = NULL
189 WHERE
190 operations_v1.hash = ?
191 ",
192 )
193 .bind(id.to_hex())
194 .execute(&self.pool)
195 .await?;
196
197 Ok(result.rows_affected() > 0)
198 }
199}
200
201#[derive(Clone, Debug, FromRow)]
203pub(crate) struct OperationRow {
204 hash: String,
205 pub(crate) header: Vec<u8>,
206 body: Option<Vec<u8>>,
207}
208
209impl<E> TryFrom<OperationRow> for Operation<E>
210where
211 E: Extensions,
212{
213 type Error = SqliteError;
214
215 fn try_from(row: OperationRow) -> Result<Self, Self::Error> {
216 Ok(Operation {
217 hash: row
218 .hash
219 .parse()
220 .map_err(|err: HashError| SqliteError::Decode("hash".to_string(), err.into()))?,
221 header: decode_cbor(&row.header[..])
222 .map_err(|err| SqliteError::Decode("header".into(), err.into()))?,
223 body: row.body.map(|body| body.into()),
224 })
225 }
226}