p2panda_store/logs/sqlite/
mod.rs1mod models;
4#[cfg(test)]
5mod tests;
6
7use std::collections::BTreeMap;
8
9use p2panda_core::cbor::encode_cbor;
10use p2panda_core::{Extensions, Hash, LogId, Operation, SeqNum, VerifyingKey};
11use sqlx::{query, query_as};
12
13use crate::logs::LogStore;
14use crate::logs::sqlite::models::{LogHeightRow, LogMetaRow};
15use crate::operations::OperationRow;
16use crate::sqlite::{SqliteError, SqliteStore};
17
18const GET_LATEST_ENTRY: &str = "
19 SELECT
20 hash,
21 header,
22 body
23 FROM
24 operations_v1
25 WHERE
26 verifying_key = ?
27 AND log_id = ?
28 ORDER BY
29 CAST(seq_num AS NUMERIC) DESC LIMIT 1
30";
31
32impl<L, E> LogStore<Operation<E>, VerifyingKey, L, SeqNum, Hash> for SqliteStore
33where
34 E: Extensions,
35 L: LogId,
36{
37 type Error = SqliteError;
38
39 async fn get_latest_entry(
41 &self,
42 author: &VerifyingKey,
43 log_id: &L,
44 ) -> Result<Option<Operation<E>>, Self::Error> {
45 if let Some(latest) = query_as::<_, OperationRow>(GET_LATEST_ENTRY)
46 .bind(author.to_string())
47 .bind(
48 encode_cbor(&log_id)
49 .map_err(|err| SqliteError::Encode("log id".to_string(), err))?,
50 )
51 .fetch_optional(&self.pool)
52 .await?
53 {
54 let operation = latest.try_into()?;
55
56 Ok(Some(operation))
57 } else {
58 Ok(None)
59 }
60 }
61
62 async fn get_latest_entry_tx(
71 &self,
72 author: &VerifyingKey,
73 log_id: &L,
74 ) -> Result<Option<Operation<E>>, Self::Error> {
75 let result = self
76 .tx(async |tx| {
77 query_as::<_, OperationRow>(GET_LATEST_ENTRY)
78 .bind(author.to_string())
79 .bind(
80 encode_cbor(&log_id)
81 .map_err(|err| SqliteError::Encode("log id".to_string(), err))?,
82 )
83 .fetch_optional(&mut **tx)
84 .await
85 .map_err(SqliteError::Sqlite)
86 })
87 .await?;
88
89 if let Some(latest) = result {
90 let hash_seq_num = latest.try_into()?;
91
92 Ok(Some(hash_seq_num))
93 } else {
94 Ok(None)
95 }
96 }
97
98 async fn get_log_heights(
100 &self,
101 author: &VerifyingKey,
102 logs: &[L],
103 ) -> Result<Option<BTreeMap<L, SeqNum>>, Self::Error> {
104 let mut encoded_log_ids = Vec::new();
105 for log in logs {
106 let encoded_log_id =
107 encode_cbor(&log).map_err(|err| SqliteError::Encode("log id".to_string(), err))?;
108 encoded_log_ids.push(encoded_log_id);
109 }
110
111 let params = format!("?{}", ", ?".repeat(encoded_log_ids.len() - 1));
114 let query_str = format!(
115 "
116 SELECT
117 log_id,
118 CAST(MAX(CAST(seq_num AS NUMERIC)) AS TEXT) as seq_num
119 FROM
120 operations_v1
121 WHERE
122 verifying_key = ?
123 AND log_id IN ( {} )
124 GROUP BY
125 log_id
126 ",
127 params
128 );
129
130 let mut query = query_as::<_, LogHeightRow>(&query_str).bind(author.to_string());
131
132 for log_id in encoded_log_ids {
133 query = query.bind(log_id)
134 }
135
136 let log_heights_query = query.fetch_all(&self.pool).await?;
137
138 let log_heights = if log_heights_query.is_empty() {
139 None
140 } else {
141 let mut log_heights = BTreeMap::new();
142
143 for row in log_heights_query {
144 let (log_id, seq_num) = row.try_into()?;
145 log_heights.insert(log_id, seq_num);
146 }
147
148 Some(log_heights)
149 };
150
151 Ok(log_heights)
152 }
153
154 async fn get_log_size(
156 &self,
157 author: &VerifyingKey,
158 log_id: &L,
159 after: Option<SeqNum>,
160 until: Option<SeqNum>,
161 ) -> Result<Option<(u64, u64)>, Self::Error> {
162 let after_operator = if after.is_none() { ">=" } else { ">" };
165 let query_str = format!(
166 "
167 SELECT
168 CAST(SUM(CAST(header_size AS NUMERIC)) AS TEXT) AS total_header_bytes,
169 CAST(SUM(CAST(payload_size AS NUMERIC)) AS TEXT) AS total_payload_bytes,
170 CAST(COUNT(*) AS TEXT) AS total_operation_count
171 FROM
172 operations_v1
173 WHERE
174 verifying_key = ?
175 AND log_id = ?
176 AND CAST(seq_num AS NUMERIC) {} CAST(? as NUMERIC)
177 AND CAST(seq_num AS NUMERIC) <= CAST(? as NUMERIC)
178 ",
179 after_operator
180 );
181
182 let log_meta: Option<LogMetaRow> = query_as::<_, LogMetaRow>(&query_str)
183 .bind(author.to_string())
184 .bind(
185 encode_cbor(&log_id)
186 .map_err(|err| SqliteError::Encode("log id".to_string(), err))?,
187 )
188 .bind(after.unwrap_or(0).to_string())
189 .bind(until.unwrap_or(u64::MAX).to_string())
190 .fetch_optional(&self.pool)
191 .await?;
192
193 if let Some(row) = log_meta {
194 let (total_header_bytes, total_payload_bytes, total_operation_count) =
195 row.try_into()?;
196
197 return Ok(Some((
198 total_operation_count,
199 total_header_bytes + total_payload_bytes,
200 )));
201 }
202
203 Ok(None)
204 }
205
206 async fn get_log_entries(
208 &self,
209 author: &VerifyingKey,
210 log_id: &L,
211 after: Option<SeqNum>,
212 until: Option<SeqNum>,
213 ) -> Result<Option<Vec<(Operation<E>, Vec<u8>)>>, Self::Error> {
214 let after_operator = if after.is_none() { ">=" } else { ">" };
217
218 let query_str = format!(
219 "
220 SELECT
221 hash,
222 header,
223 body
224 FROM
225 operations_v1
226 WHERE
227 verifying_key = ?
228 AND log_id = ?
229 AND CAST(seq_num AS NUMERIC) {} CAST(? as NUMERIC)
230 AND CAST(seq_num AS NUMERIC) <= CAST(? as NUMERIC)
231 ORDER BY
232 CAST(seq_num AS NUMERIC)
233 ",
234 after_operator
235 );
236
237 let operations = query_as::<_, OperationRow>(&query_str)
238 .bind(author.to_string())
239 .bind(
240 encode_cbor(&log_id)
241 .map_err(|err| SqliteError::Encode("log id".to_string(), err))?,
242 )
243 .bind(after.unwrap_or(0).to_string())
244 .bind(until.unwrap_or(u64::MAX).to_string())
245 .fetch_all(&self.pool)
246 .await?;
247
248 let mut entries = Vec::new();
249 for operation in operations {
250 let header = operation.header.clone();
251 entries.push((operation.try_into()?, header))
252 }
253
254 if entries.is_empty() {
255 Ok(None)
256 } else {
257 Ok(Some(entries))
258 }
259 }
260
261 async fn prune_entries(
265 &self,
266 author: &VerifyingKey,
267 log_id: &L,
268 until: &SeqNum,
269 ) -> Result<u64, Self::Error> {
270 let result = query(
271 "
272 DELETE
273 FROM
274 operations_v1
275 WHERE
276 verifying_key = ?
277 AND log_id = ?
278 AND CAST(seq_num AS NUMERIC) < CAST(? as NUMERIC)
279 ",
280 )
281 .bind(author.to_string())
282 .bind(encode_cbor(&log_id).map_err(|err| SqliteError::Encode("log id".to_string(), err))?)
283 .bind(until.to_string())
284 .execute(&self.pool)
285 .await?;
286
287 let pruned_entries_num = result.rows_affected();
288
289 Ok(pruned_entries_num)
290 }
291}