p2panda_store/sqlite/
store.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! SQLite persistent storage.
4use std::hash::{DefaultHasher, Hash as StdHash, Hasher};
5use std::marker::PhantomData;
6
7use sqlx::migrate;
8use sqlx::migrate::{MigrateDatabase, MigrateError};
9use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
10use sqlx::{Error as SqlxError, Sqlite, query, query_as};
11use thiserror::Error;
12
13use p2panda_core::cbor::{DecodeError, EncodeError, encode_cbor};
14use p2panda_core::{Body, Extensions, Hash, Header, PublicKey, RawOperation};
15
16use crate::sqlite::models::{LogHeightRow, OperationRow, RawOperationRow};
17use crate::{LogId, LogStore, OperationStore};
18
19#[derive(Debug, Error)]
20pub enum SqliteStoreError {
21    #[error("failed to encode operation extensions: {0}")]
22    EncodingFailed(#[from] EncodeError),
23
24    #[error("failed to decode operation extensions: {0}")]
25    DecodingFailed(#[from] DecodeError),
26
27    #[error("an error occurred with the sqlite database: {0}")]
28    Database(#[from] SqlxError),
29}
30
31impl From<MigrateError> for SqliteStoreError {
32    fn from(error: MigrateError) -> Self {
33        Self::Database(SqlxError::Migrate(Box::new(error)))
34    }
35}
36
37/// Re-export of SQLite connection pool type.
38pub type Pool = SqlitePool;
39
40/// SQLite-based persistent store.
41#[derive(Clone, Debug)]
42pub struct SqliteStore<L, E> {
43    pub(crate) pool: Pool,
44    _marker: PhantomData<(L, E)>,
45}
46
47impl<L, E> SqliteStore<L, E>
48where
49    L: LogId,
50    E: Extensions,
51{
52    /// Create a new `SqliteStore` using the provided db `Pool`.
53    pub fn new(pool: Pool) -> Self {
54        Self {
55            pool,
56            _marker: PhantomData {},
57        }
58    }
59}
60
61/// Create the database if it doesn't already exist.
62pub async fn create_database(url: &str) -> Result<(), SqliteStoreError> {
63    if !Sqlite::database_exists(url).await? {
64        Sqlite::create_database(url).await?
65    }
66
67    Ok(())
68}
69
70/// Drop the database if it exists.
71pub async fn drop_database(url: &str) -> Result<(), SqliteStoreError> {
72    if Sqlite::database_exists(url).await? {
73        Sqlite::drop_database(url).await?
74    }
75
76    Ok(())
77}
78
79/// Create a connection pool.
80pub async fn connection_pool(url: &str, max_connections: u32) -> Result<Pool, SqliteStoreError> {
81    let pool: Pool = SqlitePoolOptions::new()
82        .max_connections(max_connections)
83        .connect(url)
84        .await?;
85
86    Ok(pool)
87}
88
89/// Run any pending database migrations from inside the application.
90pub async fn run_pending_migrations(pool: &Pool) -> Result<(), SqliteStoreError> {
91    migrate!().run(pool).await?;
92
93    Ok(())
94}
95
96fn calculate_hash<T: StdHash>(t: &T) -> u64 {
97    let mut s = DefaultHasher::new();
98    t.hash(&mut s);
99    s.finish()
100}
101
102impl<L, E> OperationStore<L, E> for SqliteStore<L, E>
103where
104    L: LogId + Send + Sync,
105    E: Extensions + Send + Sync,
106{
107    type Error = SqliteStoreError;
108
109    async fn insert_operation(
110        &mut self,
111        hash: Hash,
112        header: &Header<E>,
113        body: Option<&Body>,
114        header_bytes: &[u8],
115        log_id: &L,
116    ) -> Result<bool, Self::Error> {
117        query(
118            "
119            INSERT INTO
120                operations_v1 (
121                    hash,
122                    log_id,
123                    version,
124                    public_key,
125                    signature,
126                    payload_size,
127                    payload_hash,
128                    timestamp,
129                    seq_num,
130                    backlink,
131                    previous,
132                    extensions,
133                    body,
134                    header_bytes
135                )
136            VALUES
137                (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
138            ",
139        )
140        .bind(hash.to_string())
141        .bind(calculate_hash(log_id).to_string())
142        .bind(header.version.to_string())
143        .bind(header.public_key.to_hex())
144        .bind(header.signature.map(|sig| sig.to_hex()))
145        .bind(header.payload_size.to_string())
146        .bind(header.payload_hash.map(|hash| hash.to_hex()))
147        .bind(header.timestamp.to_string())
148        .bind(header.seq_num.to_string())
149        .bind(header.backlink.map(|backlink| backlink.to_hex()))
150        .bind(
151            header
152                .previous
153                .iter()
154                .map(|previous| previous.to_hex())
155                .collect::<Vec<String>>()
156                .concat(),
157        )
158        .bind(
159            header
160                .extensions
161                .as_ref()
162                .map(|extensions| encode_cbor(extensions).expect("extenions are serializable")),
163        )
164        .bind(body.map(|body| body.to_bytes()))
165        .bind(header_bytes)
166        .execute(&self.pool)
167        .await?;
168
169        Ok(true)
170    }
171
172    async fn get_operation(
173        &self,
174        hash: Hash,
175    ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
176        if let Some(operation) = query_as::<_, OperationRow>(
177            "
178            SELECT
179                hash,
180                log_id,
181                version,
182                public_key,
183                signature,
184                payload_size,
185                payload_hash,
186                timestamp,
187                seq_num,
188                backlink,
189                previous,
190                extensions,
191                body,
192                header_bytes
193            FROM
194                operations_v1
195            WHERE
196                hash = ?
197            ",
198        )
199        .bind(hash.to_string())
200        .fetch_optional(&self.pool)
201        .await?
202        {
203            let body = operation.body.clone().map(|body| body.into());
204            let header: Header<E> = operation.into();
205
206            Ok(Some((header, body)))
207        } else {
208            Ok(None)
209        }
210    }
211
212    async fn get_raw_operation(&self, hash: Hash) -> Result<Option<RawOperation>, Self::Error> {
213        if let Some(operation) = query_as::<_, RawOperationRow>(
214            "
215            SELECT
216                hash,
217                body,
218                header_bytes
219            FROM
220                operations_v1
221            WHERE
222                hash = ?
223            ",
224        )
225        .bind(hash.to_string())
226        .fetch_optional(&self.pool)
227        .await?
228        {
229            let raw_operation = operation.into();
230
231            Ok(Some(raw_operation))
232        } else {
233            Ok(None)
234        }
235    }
236
237    async fn has_operation(&self, hash: Hash) -> Result<bool, Self::Error> {
238        let exists = query(
239            "
240            SELECT
241                1
242            FROM
243                operations_v1
244            WHERE
245                hash = ?
246            ",
247        )
248        .bind(hash.to_string())
249        .fetch_optional(&self.pool)
250        .await?;
251
252        Ok(exists.is_some())
253    }
254
255    async fn delete_operation(&mut self, hash: Hash) -> Result<bool, Self::Error> {
256        let result = query(
257            "
258            DELETE
259            FROM
260                operations_v1
261            WHERE
262                hash = ?
263            ",
264        )
265        .bind(hash.to_string())
266        .execute(&self.pool)
267        .await?;
268
269        Ok(result.rows_affected() > 0)
270    }
271
272    async fn delete_payload(&mut self, hash: Hash) -> Result<bool, Self::Error> {
273        let result = query(
274            "
275            UPDATE
276                operations_v1
277            SET
278                body = NULL
279            WHERE
280                operations_v1.hash = ?
281            ",
282        )
283        .bind(hash.to_string())
284        .execute(&self.pool)
285        .await?;
286
287        Ok(result.rows_affected() > 0)
288    }
289}
290
291impl<L, E> LogStore<L, E> for SqliteStore<L, E>
292where
293    L: LogId + Send + Sync,
294    E: Extensions + Send + Sync,
295{
296    type Error = SqliteStoreError;
297
298    async fn get_log(
299        &self,
300        public_key: &PublicKey,
301        log_id: &L,
302        from: Option<u64>,
303    ) -> Result<Option<Vec<(Header<E>, Option<Body>)>>, Self::Error> {
304        let operations = query_as::<_, OperationRow>(
305            "
306            SELECT
307                hash,
308                log_id,
309                version,
310                public_key,
311                signature,
312                payload_size,
313                payload_hash,
314                timestamp,
315                seq_num,
316                backlink,
317                previous,
318                extensions,
319                body,
320                header_bytes
321            FROM
322                operations_v1
323            WHERE
324                public_key = ?
325                AND log_id = ?
326                AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
327            ORDER BY
328                CAST(seq_num AS NUMERIC)
329            ",
330        )
331        .bind(public_key.to_string())
332        .bind(calculate_hash(log_id).to_string())
333        .bind(from.unwrap_or(0).to_string())
334        .fetch_all(&self.pool)
335        .await?;
336
337        let log: Vec<(Header<E>, Option<Body>)> = operations
338            .into_iter()
339            .map(|operation| {
340                (
341                    operation.clone().into(),
342                    operation.body.map(|body| body.into()),
343                )
344            })
345            .collect();
346
347        if log.is_empty() {
348            Ok(None)
349        } else {
350            Ok(Some(log))
351        }
352    }
353
354    async fn get_raw_log(
355        &self,
356        public_key: &PublicKey,
357        log_id: &L,
358        from: Option<u64>,
359    ) -> Result<Option<Vec<RawOperation>>, Self::Error> {
360        let operations = query_as::<_, RawOperationRow>(
361            "
362            SELECT
363                hash,
364                body,
365                header_bytes
366            FROM
367                operations_v1
368            WHERE
369                public_key = ?
370                AND log_id = ?
371                AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
372            ORDER BY
373                CAST(seq_num AS NUMERIC)
374            ",
375        )
376        .bind(public_key.to_string())
377        .bind(calculate_hash(log_id).to_string())
378        .bind(from.unwrap_or(0).to_string())
379        .fetch_all(&self.pool)
380        .await?;
381
382        let log: Vec<RawOperation> = operations
383            .into_iter()
384            .map(|operation| operation.into())
385            .collect();
386
387        if log.is_empty() {
388            Ok(None)
389        } else {
390            Ok(Some(log))
391        }
392    }
393
394    async fn latest_operation(
395        &self,
396        public_key: &PublicKey,
397        log_id: &L,
398    ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
399        if let Some(operation) = query_as::<_, OperationRow>(
400            "
401            SELECT
402                hash,
403                log_id,
404                version,
405                public_key,
406                signature,
407                payload_size,
408                payload_hash,
409                timestamp,
410                seq_num,
411                backlink,
412                previous,
413                extensions,
414                body,
415                header_bytes
416            FROM
417                operations_v1
418            WHERE
419                public_key = ?
420                AND log_id = ?
421            ORDER BY
422                CAST(seq_num AS NUMERIC) DESC LIMIT 1
423            ",
424        )
425        .bind(public_key.to_string())
426        .bind(calculate_hash(log_id).to_string())
427        .fetch_optional(&self.pool)
428        .await?
429        {
430            let body = operation.body.clone().map(|body| body.into());
431            let header: Header<E> = operation.into();
432
433            Ok(Some((header, body)))
434        } else {
435            Ok(None)
436        }
437    }
438
439    async fn delete_operations(
440        &mut self,
441        public_key: &PublicKey,
442        log_id: &L,
443        before: u64,
444    ) -> Result<bool, Self::Error> {
445        let result = query(
446            "
447            DELETE
448            FROM
449                operations_v1
450            WHERE
451                public_key = ?
452                AND log_id = ?
453                AND CAST(seq_num AS NUMERIC) < CAST(? as NUMERIC)
454            ",
455        )
456        .bind(public_key.to_string())
457        .bind(calculate_hash(log_id).to_string())
458        .bind(before.to_string())
459        .execute(&self.pool)
460        .await?;
461
462        Ok(result.rows_affected() > 0)
463    }
464
465    async fn delete_payloads(
466        &mut self,
467        public_key: &PublicKey,
468        log_id: &L,
469        from: u64,
470        to: u64,
471    ) -> Result<bool, Self::Error> {
472        let result = query(
473            "
474            UPDATE
475                operations_v1
476            SET
477                body = NULL
478            WHERE
479                public_key = ?
480                AND log_id = ?
481                AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
482                AND CAST(seq_num AS NUMERIC) < CAST(? as NUMERIC)
483            ",
484        )
485        .bind(public_key.to_string())
486        .bind(calculate_hash(log_id).to_string())
487        .bind(from.to_string())
488        .bind(to.to_string())
489        .execute(&self.pool)
490        .await?;
491
492        Ok(result.rows_affected() > 0)
493    }
494
495    async fn get_log_heights(&self, log_id: &L) -> Result<Vec<(PublicKey, u64)>, Self::Error> {
496        let operations = query_as::<_, LogHeightRow>(
497            "
498            SELECT
499                public_key,
500                CAST(MAX(CAST(seq_num AS NUMERIC)) AS TEXT) as seq_num
501            FROM
502                operations_v1
503            WHERE
504                log_id = ?
505            GROUP BY
506                public_key
507            ",
508        )
509        .bind(calculate_hash(log_id).to_string())
510        .fetch_all(&self.pool)
511        .await?;
512
513        let log_heights: Vec<(PublicKey, u64)> = operations
514            .into_iter()
515            .map(|operation| operation.into())
516            .collect();
517
518        Ok(log_heights)
519    }
520}
521
522#[cfg(test)]
523mod tests {
524    use p2panda_core::{Body, Hash, Header, PrivateKey};
525    use serde::{Deserialize, Serialize};
526
527    use crate::sqlite::test_utils::initialize_sqlite_db;
528    use crate::{LogStore, OperationStore};
529
530    use super::SqliteStore;
531
532    fn create_operation(
533        private_key: &PrivateKey,
534        body: &Body,
535        seq_num: u64,
536        timestamp: u64,
537        backlink: Option<Hash>,
538    ) -> (Hash, Header<()>, Vec<u8>) {
539        let mut header = Header {
540            version: 1,
541            public_key: private_key.public_key(),
542            signature: None,
543            payload_size: body.size(),
544            payload_hash: Some(body.hash()),
545            timestamp,
546            seq_num,
547            backlink,
548            previous: vec![],
549            extensions: None,
550        };
551        header.sign(private_key);
552        let header_bytes = header.to_bytes();
553        (header.hash(), header, header_bytes)
554    }
555
556    #[tokio::test]
557    async fn default_sqlite_store() {
558        let db_pool = initialize_sqlite_db().await;
559
560        let mut store = SqliteStore::new(db_pool);
561        let private_key = PrivateKey::new();
562
563        let body = Body::new("hello!".as_bytes());
564        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
565
566        let inserted = store
567            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
568            .await
569            .expect("no errors");
570
571        assert!(inserted);
572    }
573
574    #[tokio::test]
575    async fn generic_extensions_mem_store() {
576        // Define our own custom extension type.
577        #[derive(Clone, Debug, Default, Serialize, Deserialize)]
578        struct MyExtension {}
579
580        // Instantiate a database pool backed by an in-memory db.
581        let db_pool = initialize_sqlite_db().await;
582
583        // Construct a new store.
584        let mut store = SqliteStore::new(db_pool);
585
586        // Construct an operation using the custom extension.
587        let private_key = PrivateKey::new();
588        let body = Body::new("hello!".as_bytes());
589        let mut header = Header {
590            version: 1,
591            public_key: private_key.public_key(),
592            signature: None,
593            payload_size: body.size(),
594            payload_hash: Some(body.hash()),
595            timestamp: 0,
596            seq_num: 0,
597            backlink: None,
598            previous: vec![],
599            extensions: Some(MyExtension {}),
600        };
601        header.sign(&private_key);
602
603        // Insert the operation into the store, the extension type is inferred.
604        let inserted = store
605            .insert_operation(header.hash(), &header, Some(&body), &header.to_bytes(), &0)
606            .await
607            .expect("no errors");
608        assert!(inserted);
609    }
610
611    #[tokio::test]
612    async fn insert_operation_with_unsigned_header() {
613        let db_pool = initialize_sqlite_db().await;
614        let mut store = SqliteStore::new(db_pool);
615        let private_key = PrivateKey::new();
616
617        // Create the first operation.
618        let body = Body::new("hello!".as_bytes());
619        let (hash, mut header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
620
621        // Set signature to `None` for the sake of the test.
622        header.signature = None;
623
624        // Only insert the first operation into the store.
625        let inserted = store
626            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
627            .await;
628
629        // Ensure that the lack of a header signature returns an error.
630        assert!(inserted.is_err());
631        assert_eq!(
632            format!("{}", inserted.unwrap_err()),
633            "an error occurred with the sqlite database: error returned from database: (code: 1299) NOT NULL constraint failed: operations_v1.signature"
634        );
635    }
636
637    #[tokio::test]
638    async fn insert_get_operation() {
639        let db_pool = initialize_sqlite_db().await;
640        let mut store = SqliteStore::new(db_pool);
641        let private_key = PrivateKey::new();
642
643        // Create the first operation.
644        let body = Body::new("hello!".as_bytes());
645        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
646
647        // Create the second operation.
648        let body_2 = Body::new("buenas!".as_bytes());
649        let (hash_2, _header_2, _header_bytes_2) =
650            create_operation(&private_key, &body_2, 0, 0, None);
651
652        // Only insert the first operation into the store.
653        let inserted = store
654            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
655            .await
656            .expect("no errors");
657        assert!(inserted);
658        // Ensure the store contains the first operation but not the second.
659        assert!(store.has_operation(hash).await.expect("no error"));
660        assert!(!store.has_operation(hash_2).await.expect("no error"));
661
662        let (header_again, body_again) = store
663            .get_operation(hash)
664            .await
665            .expect("no error")
666            .expect("operation exist");
667
668        // Ensure the hash of the created operation header matches that of the retrieved
669        // header hash.
670        assert_eq!(header.hash(), header_again.hash());
671        // Ensure the body of the created operation matches that of the retrieved body.
672        assert_eq!(Some(body.clone()), body_again);
673
674        let (header_bytes_again, body_bytes_again) = store
675            .get_raw_operation(hash)
676            .await
677            .expect("no error")
678            .expect("operation exist");
679
680        assert_eq!(header_bytes_again, header_bytes);
681        assert_eq!(body_bytes_again, Some(body.to_bytes()));
682    }
683
684    #[tokio::test]
685    async fn delete_operation() {
686        let db_pool = initialize_sqlite_db().await;
687        let mut store = SqliteStore::new(db_pool);
688        let private_key = PrivateKey::new();
689
690        let body = Body::new("hello!".as_bytes());
691        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
692
693        // Insert one operation.
694        let inserted = store
695            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
696            .await
697            .expect("no errors");
698        assert!(inserted);
699
700        // Ensure the store contains the operation.
701        assert!(store.has_operation(hash).await.expect("no error"));
702
703        // Delete the operation.
704        assert!(store.delete_operation(hash).await.expect("no error"));
705
706        let deleted_operation = store.get_operation(hash).await.expect("no error");
707        assert!(deleted_operation.is_none());
708        assert!(!store.has_operation(hash).await.expect("no error"));
709
710        let deleted_raw_operation = store.get_raw_operation(hash).await.expect("no error");
711        assert!(deleted_raw_operation.is_none());
712    }
713
714    #[tokio::test]
715    async fn delete_payload() {
716        let db_pool = initialize_sqlite_db().await;
717        let mut store = SqliteStore::new(db_pool);
718        let private_key = PrivateKey::new();
719
720        let body = Body::new("hello!".as_bytes());
721        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
722
723        let inserted = store
724            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
725            .await
726            .expect("no errors");
727        assert!(inserted);
728
729        assert!(store.delete_payload(hash).await.expect("no error"));
730
731        let (_, no_body) = store
732            .get_operation(hash)
733            .await
734            .expect("no error")
735            .expect("operation exist");
736        assert!(no_body.is_none());
737        assert!(store.has_operation(hash).await.expect("no error"));
738
739        let (_, no_body) = store
740            .get_raw_operation(hash)
741            .await
742            .expect("no error")
743            .expect("operation exist");
744        assert!(no_body.is_none());
745    }
746
747    #[tokio::test]
748    async fn get_log() {
749        let db_pool = initialize_sqlite_db().await;
750        let mut store = SqliteStore::new(db_pool);
751        let private_key = PrivateKey::new();
752        let log_id = 0;
753
754        let body_0 = Body::new("hello!".as_bytes());
755        let body_1 = Body::new("hello again!".as_bytes());
756        let body_2 = Body::new("hello for a third time!".as_bytes());
757
758        let (hash_0, header_0, header_bytes_0) =
759            create_operation(&private_key, &body_0, 0, 0, None);
760        let (hash_1, header_1, header_bytes_1) =
761            create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
762        let (hash_2, header_2, header_bytes_2) =
763            create_operation(&private_key, &body_2, 2, 0, Some(hash_1));
764
765        store
766            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
767            .await
768            .expect("no errors");
769        store
770            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
771            .await
772            .expect("no errors");
773        store
774            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
775            .await
776            .expect("no errors");
777
778        // Get all log operations.
779        let log = store
780            .get_log(&private_key.public_key(), &log_id, None)
781            .await
782            .expect("no errors")
783            .expect("log should exist");
784
785        assert_eq!(log.len(), 3);
786        assert_eq!(log[0].0.hash(), hash_0);
787        assert_eq!(log[1].0.hash(), hash_1);
788        assert_eq!(log[2].0.hash(), hash_2);
789        assert_eq!(log[0].1, Some(body_0.clone()));
790        assert_eq!(log[1].1, Some(body_1.clone()));
791        assert_eq!(log[2].1, Some(body_2.clone()));
792
793        // Get all log operations starting from sequence number 1.
794        let log = store
795            .get_log(&private_key.public_key(), &log_id, Some(1))
796            .await
797            .expect("no errors")
798            .expect("log should exist");
799
800        assert_eq!(log.len(), 2);
801        assert_eq!(log[0].0.hash(), hash_1);
802        assert_eq!(log[1].0.hash(), hash_2);
803        assert_eq!(log[0].1, Some(body_1.clone()));
804        assert_eq!(log[1].1, Some(body_2.clone()));
805
806        // Get all raw log operations.
807        let log = store
808            .get_raw_log(&private_key.public_key(), &log_id, None)
809            .await
810            .expect("no errors")
811            .expect("log should exist");
812
813        assert_eq!(log.len(), 3);
814        assert_eq!(log[0].0, header_bytes_0);
815        assert_eq!(log[1].0, header_bytes_1);
816        assert_eq!(log[2].0, header_bytes_2);
817        assert_eq!(log[0].1, Some(body_0.to_bytes()));
818        assert_eq!(log[1].1, Some(body_1.to_bytes()));
819        assert_eq!(log[2].1, Some(body_2.to_bytes()));
820
821        // Get all raw log operations starting from sequence number 1.
822        let log = store
823            .get_raw_log(&private_key.public_key(), &log_id, Some(1))
824            .await
825            .expect("no errors")
826            .expect("log should exist");
827
828        assert_eq!(log.len(), 2);
829        assert_eq!(log[0].0, header_bytes_1);
830        assert_eq!(log[1].0, header_bytes_2);
831        assert_eq!(log[0].1, Some(body_1.to_bytes()));
832        assert_eq!(log[1].1, Some(body_2.to_bytes()));
833    }
834
835    #[tokio::test]
836    async fn get_latest_operation() {
837        let db_pool = initialize_sqlite_db().await;
838        let mut store = SqliteStore::new(db_pool);
839        let private_key = PrivateKey::new();
840        let log_id = 0;
841
842        let body_0 = Body::new("hello!".as_bytes());
843        let body_1 = Body::new("hello again!".as_bytes());
844
845        let (hash_0, header_0, header_bytes_0) =
846            create_operation(&private_key, &body_0, 0, 0, None);
847        let (hash_1, header_1, header_bytes_1) =
848            create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
849
850        store
851            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
852            .await
853            .expect("no errors");
854        store
855            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
856            .await
857            .expect("no errors");
858
859        let (latest_header, latest_body) = store
860            .latest_operation(&private_key.public_key(), &log_id)
861            .await
862            .expect("no errors")
863            .expect("there's an operation");
864
865        assert_eq!(latest_header.hash(), header_1.hash());
866        assert_eq!(latest_body, Some(body_1));
867    }
868
869    #[tokio::test]
870    async fn delete_operations() {
871        let db_pool = initialize_sqlite_db().await;
872        let mut store = SqliteStore::new(db_pool);
873        let private_key = PrivateKey::new();
874        let log_id = 0;
875
876        let body_0 = Body::new("hello!".as_bytes());
877        let body_1 = Body::new("hello again!".as_bytes());
878        let body_2 = Body::new("final hello!".as_bytes());
879
880        let (hash_0, header_0, header_bytes_0) =
881            create_operation(&private_key, &body_0, 0, 0, None);
882        let (hash_1, header_1, header_bytes_1) =
883            create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
884        let (hash_2, header_2, header_bytes_2) =
885            create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
886
887        store
888            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
889            .await
890            .expect("no errors");
891        store
892            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
893            .await
894            .expect("no errors");
895        store
896            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
897            .await
898            .expect("no errors");
899
900        // Get all log operations.
901        let log = store
902            .get_log(&private_key.public_key(), &log_id, None)
903            .await
904            .expect("no errors")
905            .expect("log should exist");
906
907        // We expect the log to have 3 operations.
908        assert_eq!(log.len(), 3);
909
910        // Delete all operations _before_ seq_num 2.
911        let deleted = store
912            .delete_operations(&private_key.public_key(), &log_id, 2)
913            .await
914            .expect("no errors");
915        assert!(deleted);
916
917        let log = store
918            .get_log(&private_key.public_key(), &log_id, None)
919            .await
920            .expect("no errors")
921            .expect("log should exist");
922
923        // There is now only one operation in the log.
924        assert_eq!(log.len(), 1);
925
926        // The remaining operation should be the latest (seq_num == 2).
927        assert_eq!(log[0].0.hash(), header_2.hash());
928
929        // Deleting the same range again should return `false`, meaning no deletion occurred.
930        let deleted = store
931            .delete_operations(&private_key.public_key(), &log_id, 2)
932            .await
933            .expect("no errors");
934        assert!(!deleted);
935    }
936
937    #[tokio::test]
938    async fn delete_payloads() {
939        let db_pool = initialize_sqlite_db().await;
940        let mut store = SqliteStore::new(db_pool);
941        let private_key = PrivateKey::new();
942        let log_id = 0;
943
944        let body_0 = Body::new("hello!".as_bytes());
945        let body_1 = Body::new("hello again!".as_bytes());
946        let body_2 = Body::new("final hello!".as_bytes());
947
948        let (hash_0, header_0, header_bytes_0) =
949            create_operation(&private_key, &body_0, 0, 0, None);
950        let (hash_1, header_1, header_bytes_1) =
951            create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
952        let (hash_2, header_2, header_bytes_2) =
953            create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
954
955        store
956            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
957            .await
958            .expect("no errors");
959        store
960            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
961            .await
962            .expect("no errors");
963        store
964            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
965            .await
966            .expect("no errors");
967
968        // Get all log operations.
969        let log = store
970            .get_log(&private_key.public_key(), &log_id, None)
971            .await
972            .expect("no errors")
973            .expect("log should exist");
974
975        // We expect the log to have 3 operations.
976        assert_eq!(log.len(), 3);
977
978        assert_eq!(log[0].1, Some(body_0));
979        assert_eq!(log[1].1, Some(body_1));
980        assert_eq!(log[2].1, Some(body_2.clone()));
981
982        // Delete all operation payloads from sequence number 0 up to but not including 2.
983        let deleted = store
984            .delete_payloads(&private_key.public_key(), &log_id, 0, 2)
985            .await
986            .expect("no errors");
987        assert!(deleted);
988
989        let log = store
990            .get_log(&private_key.public_key(), &log_id, None)
991            .await
992            .expect("no errors")
993            .expect("log should exist");
994
995        assert_eq!(log[0].1, None);
996        assert_eq!(log[1].1, None);
997        assert_eq!(log[2].1, Some(body_2));
998    }
999
1000    #[tokio::test]
1001    async fn get_log_heights() {
1002        let db_pool = initialize_sqlite_db().await;
1003        let mut store = SqliteStore::new(db_pool);
1004
1005        let log_id = 0;
1006
1007        let private_key_0 = PrivateKey::new();
1008        let private_key_1 = PrivateKey::new();
1009        let private_key_2 = PrivateKey::new();
1010
1011        let body_0 = Body::new("hello!".as_bytes());
1012        let body_1 = Body::new("hello again!".as_bytes());
1013        let body_2 = Body::new("hello for a third time!".as_bytes());
1014
1015        let (hash_0, header_0, header_bytes_0) =
1016            create_operation(&private_key_0, &body_0, 0, 0, None);
1017        let (hash_1, header_1, header_bytes_1) =
1018            create_operation(&private_key_0, &body_1, 1, 0, Some(hash_0));
1019        let (hash_2, header_2, header_bytes_2) =
1020            create_operation(&private_key_0, &body_2, 2, 0, Some(hash_1));
1021
1022        let log_heights = store.get_log_heights(&log_id).await.expect("no errors");
1023        assert!(log_heights.is_empty());
1024
1025        store
1026            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1027            .await
1028            .expect("no errors");
1029        store
1030            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
1031            .await
1032            .expect("no errors");
1033        store
1034            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
1035            .await
1036            .expect("no errors");
1037
1038        let (hash_0, header_0, header_bytes_0) =
1039            create_operation(&private_key_1, &body_0, 0, 0, None);
1040        let (hash_1, header_1, header_bytes_1) =
1041            create_operation(&private_key_1, &body_1, 1, 0, Some(hash_0));
1042
1043        store
1044            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1045            .await
1046            .expect("no errors");
1047        store
1048            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
1049            .await
1050            .expect("no errors");
1051
1052        let (hash_0, header_0, header_bytes_0) =
1053            create_operation(&private_key_2, &body_0, 0, 0, None);
1054
1055        store
1056            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1057            .await
1058            .expect("no errors");
1059
1060        let log_heights = store.get_log_heights(&log_id).await.expect("no errors");
1061
1062        assert_eq!(log_heights.len(), 3);
1063
1064        // Ensure the correct sequence number for each public key.
1065        assert!(log_heights.contains(&(private_key_0.public_key(), 2)));
1066        assert!(log_heights.contains(&(private_key_1.public_key(), 1)));
1067        assert!(log_heights.contains(&(private_key_2.public_key(), 0)));
1068    }
1069}