Skip to main content

p2panda_store/logs/sqlite/
mod.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3mod models;
4#[cfg(test)]
5mod tests;
6
7use std::collections::BTreeMap;
8
9use p2panda_core::cbor::encode_cbor;
10use p2panda_core::{Extensions, Hash, LogId, Operation, SeqNum, VerifyingKey};
11use sqlx::{query, query_as};
12
13use crate::logs::LogStore;
14use crate::logs::sqlite::models::{LogHeightRow, LogMetaRow};
15use crate::operations::OperationRow;
16use crate::sqlite::{SqliteError, SqliteStore};
17
18const GET_LATEST_ENTRY: &str = "
19    SELECT
20        hash,
21        header,
22        body
23    FROM
24        operations_v1
25    WHERE
26        verifying_key = ?
27        AND log_id = ?
28    ORDER BY
29        CAST(seq_num AS NUMERIC) DESC LIMIT 1
30";
31
32impl<L, E> LogStore<Operation<E>, VerifyingKey, L, SeqNum, Hash> for SqliteStore
33where
34    E: Extensions,
35    L: LogId,
36{
37    type Error = SqliteError;
38
39    /// Retrieve the latest entry in an author's log.
40    async fn get_latest_entry(
41        &self,
42        author: &VerifyingKey,
43        log_id: &L,
44    ) -> Result<Option<Operation<E>>, Self::Error> {
45        if let Some(latest) = query_as::<_, OperationRow>(GET_LATEST_ENTRY)
46            .bind(author.to_string())
47            .bind(
48                encode_cbor(&log_id)
49                    .map_err(|err| SqliteError::Encode("log id".to_string(), err))?,
50            )
51            .fetch_optional(&self.pool)
52            .await?
53        {
54            let operation = latest.try_into()?;
55
56            Ok(Some(operation))
57        } else {
58            Ok(None)
59        }
60    }
61
62    /// Retrieve the latest entry in an author's log.
63    ///
64    /// This variant of the method is intended to be used in situations where atomicity of database
65    /// operations is needed. It requires a transaction context with an acquired permit.
66    // TODO: In the future we may be able to remove this `_tx` variant of the query by instead
67    // requiring that API users exlicitly handle transactions themselves.
68    //
69    // See: https://github.com/p2panda/p2panda/issues/1065
70    async fn get_latest_entry_tx(
71        &self,
72        author: &VerifyingKey,
73        log_id: &L,
74    ) -> Result<Option<Operation<E>>, Self::Error> {
75        let result = self
76            .tx(async |tx| {
77                query_as::<_, OperationRow>(GET_LATEST_ENTRY)
78                    .bind(author.to_string())
79                    .bind(
80                        encode_cbor(&log_id)
81                            .map_err(|err| SqliteError::Encode("log id".to_string(), err))?,
82                    )
83                    .fetch_optional(&mut **tx)
84                    .await
85                    .map_err(SqliteError::Sqlite)
86            })
87            .await?;
88
89        if let Some(latest) = result {
90            let hash_seq_num = latest.try_into()?;
91
92            Ok(Some(hash_seq_num))
93        } else {
94            Ok(None)
95        }
96    }
97
98    /// Retrieve the latest sequence number for a set of author's logs.
99    async fn get_log_heights(
100        &self,
101        author: &VerifyingKey,
102        logs: &[L],
103    ) -> Result<Option<BTreeMap<L, SeqNum>>, Self::Error> {
104        let mut encoded_log_ids = Vec::new();
105        for log in logs {
106            let encoded_log_id =
107                encode_cbor(&log).map_err(|err| SqliteError::Encode("log id".to_string(), err))?;
108            encoded_log_ids.push(encoded_log_id);
109        }
110
111        // This query formation approach is required since there is currently no
112        // way to directly bind arrays as comma-separated lists in sqlx.
113        let params = format!("?{}", ", ?".repeat(encoded_log_ids.len() - 1));
114        let query_str = format!(
115            "
116            SELECT
117                log_id,
118                CAST(MAX(CAST(seq_num AS NUMERIC)) AS TEXT) as seq_num
119            FROM
120                operations_v1
121            WHERE
122                verifying_key = ?
123                AND log_id IN ( {} )
124            GROUP BY
125                log_id
126            ",
127            params
128        );
129
130        let mut query = query_as::<_, LogHeightRow>(&query_str).bind(author.to_string());
131
132        for log_id in encoded_log_ids {
133            query = query.bind(log_id)
134        }
135
136        let log_heights_query = query.fetch_all(&self.pool).await?;
137
138        let log_heights = if log_heights_query.is_empty() {
139            None
140        } else {
141            let mut log_heights = BTreeMap::new();
142
143            for row in log_heights_query {
144                let (log_id, seq_num) = row.try_into()?;
145                log_heights.insert(log_id, seq_num);
146            }
147
148            Some(log_heights)
149        };
150
151        Ok(log_heights)
152    }
153
154    /// Retrieve the count and total byte size of all operations in an author's log.
155    async fn get_log_size(
156        &self,
157        author: &VerifyingKey,
158        log_id: &L,
159        after: Option<SeqNum>,
160        until: Option<SeqNum>,
161    ) -> Result<Option<(u64, u64)>, Self::Error> {
162        // We need to use an inclusive greater-than to ensure our
163        // query includes the operation with sequence number 0.
164        let after_operator = if after.is_none() { ">=" } else { ">" };
165        let query_str = format!(
166            "
167            SELECT
168                CAST(SUM(CAST(header_size AS NUMERIC)) AS TEXT) AS total_header_bytes,
169                CAST(SUM(CAST(payload_size AS NUMERIC)) AS TEXT) AS total_payload_bytes,
170                CAST(COUNT(*) AS TEXT) AS total_operation_count
171            FROM
172                operations_v1
173            WHERE
174                verifying_key = ?
175                AND log_id = ?
176                AND CAST(seq_num AS NUMERIC) {} CAST(? as NUMERIC)
177                AND CAST(seq_num AS NUMERIC) <= CAST(? as NUMERIC)
178            ",
179            after_operator
180        );
181
182        let log_meta: Option<LogMetaRow> = query_as::<_, LogMetaRow>(&query_str)
183            .bind(author.to_string())
184            .bind(
185                encode_cbor(&log_id)
186                    .map_err(|err| SqliteError::Encode("log id".to_string(), err))?,
187            )
188            .bind(after.unwrap_or(0).to_string())
189            .bind(until.unwrap_or(u64::MAX).to_string())
190            .fetch_optional(&self.pool)
191            .await?;
192
193        if let Some(row) = log_meta {
194            let (total_header_bytes, total_payload_bytes, total_operation_count) =
195                row.try_into()?;
196
197            return Ok(Some((
198                total_operation_count,
199                total_header_bytes + total_payload_bytes,
200            )));
201        }
202
203        Ok(None)
204    }
205
206    /// Retrieve log entries representing operations from an author's log.
207    async fn get_log_entries(
208        &self,
209        author: &VerifyingKey,
210        log_id: &L,
211        after: Option<SeqNum>,
212        until: Option<SeqNum>,
213    ) -> Result<Option<Vec<(Operation<E>, Vec<u8>)>>, Self::Error> {
214        // We need to use an inclusive greater-than to ensure our
215        // query includes the operation with sequence number 0.
216        let after_operator = if after.is_none() { ">=" } else { ">" };
217
218        let query_str = format!(
219            "
220            SELECT
221                hash,
222                header,
223                body
224            FROM
225                operations_v1
226            WHERE
227                verifying_key = ?
228                AND log_id = ?
229                AND CAST(seq_num AS NUMERIC) {} CAST(? as NUMERIC)
230                AND CAST(seq_num AS NUMERIC) <= CAST(? as NUMERIC)
231            ORDER BY
232                CAST(seq_num AS NUMERIC)
233            ",
234            after_operator
235        );
236
237        let operations = query_as::<_, OperationRow>(&query_str)
238            .bind(author.to_string())
239            .bind(
240                encode_cbor(&log_id)
241                    .map_err(|err| SqliteError::Encode("log id".to_string(), err))?,
242            )
243            .bind(after.unwrap_or(0).to_string())
244            .bind(until.unwrap_or(u64::MAX).to_string())
245            .fetch_all(&self.pool)
246            .await?;
247
248        let mut entries = Vec::new();
249        for operation in operations {
250            let header = operation.header.clone();
251            entries.push((operation.try_into()?, header))
252        }
253
254        if entries.is_empty() {
255            Ok(None)
256        } else {
257            Ok(Some(entries))
258        }
259    }
260
261    /// Prune entries from an author's log.
262    ///
263    /// Pruning involves deletion of the entry bodies (ie. payloads) from the database.
264    async fn prune_entries(
265        &self,
266        author: &VerifyingKey,
267        log_id: &L,
268        until: &SeqNum,
269    ) -> Result<u64, Self::Error> {
270        let result = query(
271            "
272            DELETE
273            FROM
274                operations_v1
275            WHERE
276                verifying_key = ?
277                AND log_id = ?
278                AND CAST(seq_num AS NUMERIC) < CAST(? as NUMERIC)
279            ",
280        )
281        .bind(author.to_string())
282        .bind(encode_cbor(&log_id).map_err(|err| SqliteError::Encode("log id".to_string(), err))?)
283        .bind(until.to_string())
284        .execute(&self.pool)
285        .await?;
286
287        let pruned_entries_num = result.rows_affected();
288
289        Ok(pruned_entries_num)
290    }
291}