p2panda_store/sqlite/
store.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! SQLite persistent storage.
4use std::hash::{DefaultHasher, Hash as StdHash, Hasher};
5use std::marker::PhantomData;
6
7use sqlx::migrate;
8use sqlx::migrate::{MigrateDatabase, MigrateError};
9use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
10use sqlx::{Error as SqlxError, Sqlite, query, query_as};
11use thiserror::Error;
12
13use p2panda_core::cbor::{DecodeError, EncodeError, encode_cbor};
14use p2panda_core::{Body, Extensions, Hash, Header, PublicKey, RawOperation};
15
16use crate::sqlite::models::{LogHeightRow, OperationRow, RawOperationRow};
17use crate::{LogId, LogStore, OperationStore};
18
19#[derive(Debug, Error)]
20pub enum SqliteStoreError {
21    #[error("failed to encode operation extensions: {0}")]
22    EncodingFailed(#[from] EncodeError),
23
24    #[error("failed to decode operation extensions: {0}")]
25    DecodingFailed(#[from] DecodeError),
26
27    #[error("an error occurred with the sqlite database: {0}")]
28    Database(#[from] SqlxError),
29}
30
31impl From<MigrateError> for SqliteStoreError {
32    fn from(error: MigrateError) -> Self {
33        Self::Database(SqlxError::Migrate(Box::new(error)))
34    }
35}
36
37/// Re-export of SQLite connection pool type.
38pub type Pool = SqlitePool;
39
40/// SQLite-based persistent store.
41#[derive(Clone, Debug)]
42pub struct SqliteStore<L, E> {
43    pub(crate) pool: Pool,
44    _marker: PhantomData<(L, E)>,
45}
46
47impl<L, E> SqliteStore<L, E>
48where
49    L: LogId,
50    E: Extensions,
51{
52    /// Create a new `SqliteStore` using the provided db `Pool`.
53    pub fn new(pool: Pool) -> Self {
54        Self {
55            pool,
56            _marker: PhantomData {},
57        }
58    }
59}
60
61/// Create the database if it doesn't already exist.
62pub async fn create_database(url: &str) -> Result<(), SqliteStoreError> {
63    if !Sqlite::database_exists(url).await? {
64        Sqlite::create_database(url).await?
65    }
66
67    Ok(())
68}
69
70/// Drop the database if it exists.
71pub async fn drop_database(url: &str) -> Result<(), SqliteStoreError> {
72    if Sqlite::database_exists(url).await? {
73        Sqlite::drop_database(url).await?
74    }
75
76    Ok(())
77}
78
79/// Create a connection pool.
80pub async fn connection_pool(url: &str, max_connections: u32) -> Result<Pool, SqliteStoreError> {
81    let pool: Pool = SqlitePoolOptions::new()
82        .max_connections(max_connections)
83        .connect(url)
84        .await?;
85
86    Ok(pool)
87}
88
89/// Get migrations without running them
90pub fn migrations() -> migrate::Migrator {
91    migrate!()
92}
93
94/// Run any pending database migrations from inside the application.
95pub async fn run_pending_migrations(pool: &Pool) -> Result<(), SqliteStoreError> {
96    migrations().run(pool).await?;
97
98    Ok(())
99}
100
101fn calculate_hash<T: StdHash>(t: &T) -> u64 {
102    let mut s = DefaultHasher::new();
103    t.hash(&mut s);
104    s.finish()
105}
106
107impl<L, E> OperationStore<L, E> for SqliteStore<L, E>
108where
109    L: LogId + Send + Sync,
110    E: Extensions + Send + Sync,
111{
112    type Error = SqliteStoreError;
113
114    async fn insert_operation(
115        &mut self,
116        hash: Hash,
117        header: &Header<E>,
118        body: Option<&Body>,
119        header_bytes: &[u8],
120        log_id: &L,
121    ) -> Result<bool, Self::Error> {
122        query(
123            "
124            INSERT INTO
125                operations_v1 (
126                    hash,
127                    log_id,
128                    version,
129                    public_key,
130                    signature,
131                    payload_size,
132                    payload_hash,
133                    timestamp,
134                    seq_num,
135                    backlink,
136                    previous,
137                    extensions,
138                    body,
139                    header_bytes
140                )
141            VALUES
142                (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
143            ",
144        )
145        .bind(hash.to_string())
146        .bind(calculate_hash(log_id).to_string())
147        .bind(header.version.to_string())
148        .bind(header.public_key.to_hex())
149        .bind(header.signature.map(|sig| sig.to_hex()))
150        .bind(header.payload_size.to_string())
151        .bind(header.payload_hash.map(|hash| hash.to_hex()))
152        .bind(header.timestamp.to_string())
153        .bind(header.seq_num.to_string())
154        .bind(header.backlink.map(|backlink| backlink.to_hex()))
155        .bind(
156            header
157                .previous
158                .iter()
159                .map(|previous| previous.to_hex())
160                .collect::<Vec<String>>()
161                .concat(),
162        )
163        .bind(
164            header
165                .extensions
166                .as_ref()
167                .map(|extensions| encode_cbor(extensions).expect("extenions are serializable")),
168        )
169        .bind(body.map(|body| body.to_bytes()))
170        .bind(header_bytes)
171        .execute(&self.pool)
172        .await?;
173
174        Ok(true)
175    }
176
177    async fn get_operation(
178        &self,
179        hash: Hash,
180    ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
181        if let Some(operation) = query_as::<_, OperationRow>(
182            "
183            SELECT
184                hash,
185                log_id,
186                version,
187                public_key,
188                signature,
189                payload_size,
190                payload_hash,
191                timestamp,
192                seq_num,
193                backlink,
194                previous,
195                extensions,
196                body,
197                header_bytes
198            FROM
199                operations_v1
200            WHERE
201                hash = ?
202            ",
203        )
204        .bind(hash.to_string())
205        .fetch_optional(&self.pool)
206        .await?
207        {
208            let body = operation.body.clone().map(|body| body.into());
209            let header: Header<E> = operation.into();
210
211            Ok(Some((header, body)))
212        } else {
213            Ok(None)
214        }
215    }
216
217    async fn get_raw_operation(&self, hash: Hash) -> Result<Option<RawOperation>, Self::Error> {
218        if let Some(operation) = query_as::<_, RawOperationRow>(
219            "
220            SELECT
221                hash,
222                body,
223                header_bytes
224            FROM
225                operations_v1
226            WHERE
227                hash = ?
228            ",
229        )
230        .bind(hash.to_string())
231        .fetch_optional(&self.pool)
232        .await?
233        {
234            let raw_operation = operation.into();
235
236            Ok(Some(raw_operation))
237        } else {
238            Ok(None)
239        }
240    }
241
242    async fn has_operation(&self, hash: Hash) -> Result<bool, Self::Error> {
243        let exists = query(
244            "
245            SELECT
246                1
247            FROM
248                operations_v1
249            WHERE
250                hash = ?
251            ",
252        )
253        .bind(hash.to_string())
254        .fetch_optional(&self.pool)
255        .await?;
256
257        Ok(exists.is_some())
258    }
259
260    async fn delete_operation(&mut self, hash: Hash) -> Result<bool, Self::Error> {
261        let result = query(
262            "
263            DELETE
264            FROM
265                operations_v1
266            WHERE
267                hash = ?
268            ",
269        )
270        .bind(hash.to_string())
271        .execute(&self.pool)
272        .await?;
273
274        Ok(result.rows_affected() > 0)
275    }
276
277    async fn delete_payload(&mut self, hash: Hash) -> Result<bool, Self::Error> {
278        let result = query(
279            "
280            UPDATE
281                operations_v1
282            SET
283                body = NULL
284            WHERE
285                operations_v1.hash = ?
286            ",
287        )
288        .bind(hash.to_string())
289        .execute(&self.pool)
290        .await?;
291
292        Ok(result.rows_affected() > 0)
293    }
294}
295
296impl<L, E> LogStore<L, E> for SqliteStore<L, E>
297where
298    L: LogId + Send + Sync,
299    E: Extensions + Send + Sync,
300{
301    type Error = SqliteStoreError;
302
303    async fn get_log(
304        &self,
305        public_key: &PublicKey,
306        log_id: &L,
307        from: Option<u64>,
308    ) -> Result<Option<Vec<(Header<E>, Option<Body>)>>, Self::Error> {
309        let operations = query_as::<_, OperationRow>(
310            "
311            SELECT
312                hash,
313                log_id,
314                version,
315                public_key,
316                signature,
317                payload_size,
318                payload_hash,
319                timestamp,
320                seq_num,
321                backlink,
322                previous,
323                extensions,
324                body,
325                header_bytes
326            FROM
327                operations_v1
328            WHERE
329                public_key = ?
330                AND log_id = ?
331                AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
332            ORDER BY
333                CAST(seq_num AS NUMERIC)
334            ",
335        )
336        .bind(public_key.to_string())
337        .bind(calculate_hash(log_id).to_string())
338        .bind(from.unwrap_or(0).to_string())
339        .fetch_all(&self.pool)
340        .await?;
341
342        let log: Vec<(Header<E>, Option<Body>)> = operations
343            .into_iter()
344            .map(|operation| {
345                (
346                    operation.clone().into(),
347                    operation.body.map(|body| body.into()),
348                )
349            })
350            .collect();
351
352        if log.is_empty() {
353            Ok(None)
354        } else {
355            Ok(Some(log))
356        }
357    }
358
359    async fn get_raw_log(
360        &self,
361        public_key: &PublicKey,
362        log_id: &L,
363        from: Option<u64>,
364    ) -> Result<Option<Vec<RawOperation>>, Self::Error> {
365        let operations = query_as::<_, RawOperationRow>(
366            "
367            SELECT
368                hash,
369                body,
370                header_bytes
371            FROM
372                operations_v1
373            WHERE
374                public_key = ?
375                AND log_id = ?
376                AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
377            ORDER BY
378                CAST(seq_num AS NUMERIC)
379            ",
380        )
381        .bind(public_key.to_string())
382        .bind(calculate_hash(log_id).to_string())
383        .bind(from.unwrap_or(0).to_string())
384        .fetch_all(&self.pool)
385        .await?;
386
387        let log: Vec<RawOperation> = operations
388            .into_iter()
389            .map(|operation| operation.into())
390            .collect();
391
392        if log.is_empty() {
393            Ok(None)
394        } else {
395            Ok(Some(log))
396        }
397    }
398
399    async fn latest_operation(
400        &self,
401        public_key: &PublicKey,
402        log_id: &L,
403    ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
404        if let Some(operation) = query_as::<_, OperationRow>(
405            "
406            SELECT
407                hash,
408                log_id,
409                version,
410                public_key,
411                signature,
412                payload_size,
413                payload_hash,
414                timestamp,
415                seq_num,
416                backlink,
417                previous,
418                extensions,
419                body,
420                header_bytes
421            FROM
422                operations_v1
423            WHERE
424                public_key = ?
425                AND log_id = ?
426            ORDER BY
427                CAST(seq_num AS NUMERIC) DESC LIMIT 1
428            ",
429        )
430        .bind(public_key.to_string())
431        .bind(calculate_hash(log_id).to_string())
432        .fetch_optional(&self.pool)
433        .await?
434        {
435            let body = operation.body.clone().map(|body| body.into());
436            let header: Header<E> = operation.into();
437
438            Ok(Some((header, body)))
439        } else {
440            Ok(None)
441        }
442    }
443
444    async fn delete_operations(
445        &mut self,
446        public_key: &PublicKey,
447        log_id: &L,
448        before: u64,
449    ) -> Result<bool, Self::Error> {
450        let result = query(
451            "
452            DELETE
453            FROM
454                operations_v1
455            WHERE
456                public_key = ?
457                AND log_id = ?
458                AND CAST(seq_num AS NUMERIC) < CAST(? as NUMERIC)
459            ",
460        )
461        .bind(public_key.to_string())
462        .bind(calculate_hash(log_id).to_string())
463        .bind(before.to_string())
464        .execute(&self.pool)
465        .await?;
466
467        Ok(result.rows_affected() > 0)
468    }
469
470    async fn delete_payloads(
471        &mut self,
472        public_key: &PublicKey,
473        log_id: &L,
474        from: u64,
475        to: u64,
476    ) -> Result<bool, Self::Error> {
477        let result = query(
478            "
479            UPDATE
480                operations_v1
481            SET
482                body = NULL
483            WHERE
484                public_key = ?
485                AND log_id = ?
486                AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
487                AND CAST(seq_num AS NUMERIC) < CAST(? as NUMERIC)
488            ",
489        )
490        .bind(public_key.to_string())
491        .bind(calculate_hash(log_id).to_string())
492        .bind(from.to_string())
493        .bind(to.to_string())
494        .execute(&self.pool)
495        .await?;
496
497        Ok(result.rows_affected() > 0)
498    }
499
500    async fn get_log_heights(&self, log_id: &L) -> Result<Vec<(PublicKey, u64)>, Self::Error> {
501        let operations = query_as::<_, LogHeightRow>(
502            "
503            SELECT
504                public_key,
505                CAST(MAX(CAST(seq_num AS NUMERIC)) AS TEXT) as seq_num
506            FROM
507                operations_v1
508            WHERE
509                log_id = ?
510            GROUP BY
511                public_key
512            ",
513        )
514        .bind(calculate_hash(log_id).to_string())
515        .fetch_all(&self.pool)
516        .await?;
517
518        let log_heights: Vec<(PublicKey, u64)> = operations
519            .into_iter()
520            .map(|operation| operation.into())
521            .collect();
522
523        Ok(log_heights)
524    }
525}
526
527#[cfg(test)]
528mod tests {
529    use p2panda_core::{Body, Hash, Header, PrivateKey};
530    use serde::{Deserialize, Serialize};
531
532    use crate::sqlite::test_utils::initialize_sqlite_db;
533    use crate::{LogStore, OperationStore};
534
535    use super::SqliteStore;
536
537    fn create_operation(
538        private_key: &PrivateKey,
539        body: &Body,
540        seq_num: u64,
541        timestamp: u64,
542        backlink: Option<Hash>,
543    ) -> (Hash, Header<()>, Vec<u8>) {
544        let mut header = Header {
545            version: 1,
546            public_key: private_key.public_key(),
547            signature: None,
548            payload_size: body.size(),
549            payload_hash: Some(body.hash()),
550            timestamp,
551            seq_num,
552            backlink,
553            previous: vec![],
554            extensions: None,
555        };
556        header.sign(private_key);
557        let header_bytes = header.to_bytes();
558        (header.hash(), header, header_bytes)
559    }
560
561    #[tokio::test]
562    async fn default_sqlite_store() {
563        let db_pool = initialize_sqlite_db().await;
564
565        let mut store = SqliteStore::new(db_pool);
566        let private_key = PrivateKey::new();
567
568        let body = Body::new("hello!".as_bytes());
569        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
570
571        let inserted = store
572            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
573            .await
574            .expect("no errors");
575
576        assert!(inserted);
577    }
578
579    #[tokio::test]
580    async fn generic_extensions_mem_store() {
581        // Define our own custom extension type.
582        #[derive(Clone, Debug, Default, Serialize, Deserialize)]
583        struct MyExtension {}
584
585        // Instantiate a database pool backed by an in-memory db.
586        let db_pool = initialize_sqlite_db().await;
587
588        // Construct a new store.
589        let mut store = SqliteStore::new(db_pool);
590
591        // Construct an operation using the custom extension.
592        let private_key = PrivateKey::new();
593        let body = Body::new("hello!".as_bytes());
594        let mut header = Header {
595            version: 1,
596            public_key: private_key.public_key(),
597            signature: None,
598            payload_size: body.size(),
599            payload_hash: Some(body.hash()),
600            timestamp: 0,
601            seq_num: 0,
602            backlink: None,
603            previous: vec![],
604            extensions: Some(MyExtension {}),
605        };
606        header.sign(&private_key);
607
608        // Insert the operation into the store, the extension type is inferred.
609        let inserted = store
610            .insert_operation(header.hash(), &header, Some(&body), &header.to_bytes(), &0)
611            .await
612            .expect("no errors");
613        assert!(inserted);
614    }
615
616    #[tokio::test]
617    async fn insert_operation_with_unsigned_header() {
618        let db_pool = initialize_sqlite_db().await;
619        let mut store = SqliteStore::new(db_pool);
620        let private_key = PrivateKey::new();
621
622        // Create the first operation.
623        let body = Body::new("hello!".as_bytes());
624        let (hash, mut header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
625
626        // Set signature to `None` for the sake of the test.
627        header.signature = None;
628
629        // Only insert the first operation into the store.
630        let inserted = store
631            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
632            .await;
633
634        // Ensure that the lack of a header signature returns an error.
635        assert!(inserted.is_err());
636        assert_eq!(
637            format!("{}", inserted.unwrap_err()),
638            "an error occurred with the sqlite database: error returned from database: (code: 1299) NOT NULL constraint failed: operations_v1.signature"
639        );
640    }
641
642    #[tokio::test]
643    async fn insert_get_operation() {
644        let db_pool = initialize_sqlite_db().await;
645        let mut store = SqliteStore::new(db_pool);
646        let private_key = PrivateKey::new();
647
648        // Create the first operation.
649        let body = Body::new("hello!".as_bytes());
650        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
651
652        // Create the second operation.
653        let body_2 = Body::new("buenas!".as_bytes());
654        let (hash_2, _header_2, _header_bytes_2) =
655            create_operation(&private_key, &body_2, 0, 0, None);
656
657        // Only insert the first operation into the store.
658        let inserted = store
659            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
660            .await
661            .expect("no errors");
662        assert!(inserted);
663        // Ensure the store contains the first operation but not the second.
664        assert!(store.has_operation(hash).await.expect("no error"));
665        assert!(!store.has_operation(hash_2).await.expect("no error"));
666
667        let (header_again, body_again) = store
668            .get_operation(hash)
669            .await
670            .expect("no error")
671            .expect("operation exist");
672
673        // Ensure the hash of the created operation header matches that of the retrieved
674        // header hash.
675        assert_eq!(header.hash(), header_again.hash());
676        // Ensure the body of the created operation matches that of the retrieved body.
677        assert_eq!(Some(body.clone()), body_again);
678
679        let (header_bytes_again, body_bytes_again) = store
680            .get_raw_operation(hash)
681            .await
682            .expect("no error")
683            .expect("operation exist");
684
685        assert_eq!(header_bytes_again, header_bytes);
686        assert_eq!(body_bytes_again, Some(body.to_bytes()));
687    }
688
689    #[tokio::test]
690    async fn delete_operation() {
691        let db_pool = initialize_sqlite_db().await;
692        let mut store = SqliteStore::new(db_pool);
693        let private_key = PrivateKey::new();
694
695        let body = Body::new("hello!".as_bytes());
696        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
697
698        // Insert one operation.
699        let inserted = store
700            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
701            .await
702            .expect("no errors");
703        assert!(inserted);
704
705        // Ensure the store contains the operation.
706        assert!(store.has_operation(hash).await.expect("no error"));
707
708        // Delete the operation.
709        assert!(store.delete_operation(hash).await.expect("no error"));
710
711        let deleted_operation = store.get_operation(hash).await.expect("no error");
712        assert!(deleted_operation.is_none());
713        assert!(!store.has_operation(hash).await.expect("no error"));
714
715        let deleted_raw_operation = store.get_raw_operation(hash).await.expect("no error");
716        assert!(deleted_raw_operation.is_none());
717    }
718
719    #[tokio::test]
720    async fn delete_payload() {
721        let db_pool = initialize_sqlite_db().await;
722        let mut store = SqliteStore::new(db_pool);
723        let private_key = PrivateKey::new();
724
725        let body = Body::new("hello!".as_bytes());
726        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
727
728        let inserted = store
729            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
730            .await
731            .expect("no errors");
732        assert!(inserted);
733
734        assert!(store.delete_payload(hash).await.expect("no error"));
735
736        let (_, no_body) = store
737            .get_operation(hash)
738            .await
739            .expect("no error")
740            .expect("operation exist");
741        assert!(no_body.is_none());
742        assert!(store.has_operation(hash).await.expect("no error"));
743
744        let (_, no_body) = store
745            .get_raw_operation(hash)
746            .await
747            .expect("no error")
748            .expect("operation exist");
749        assert!(no_body.is_none());
750    }
751
752    #[tokio::test]
753    async fn get_log() {
754        let db_pool = initialize_sqlite_db().await;
755        let mut store = SqliteStore::new(db_pool);
756        let private_key = PrivateKey::new();
757        let log_id = 0;
758
759        let body_0 = Body::new("hello!".as_bytes());
760        let body_1 = Body::new("hello again!".as_bytes());
761        let body_2 = Body::new("hello for a third time!".as_bytes());
762
763        let (hash_0, header_0, header_bytes_0) =
764            create_operation(&private_key, &body_0, 0, 0, None);
765        let (hash_1, header_1, header_bytes_1) =
766            create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
767        let (hash_2, header_2, header_bytes_2) =
768            create_operation(&private_key, &body_2, 2, 0, Some(hash_1));
769
770        store
771            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
772            .await
773            .expect("no errors");
774        store
775            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
776            .await
777            .expect("no errors");
778        store
779            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
780            .await
781            .expect("no errors");
782
783        // Get all log operations.
784        let log = store
785            .get_log(&private_key.public_key(), &log_id, None)
786            .await
787            .expect("no errors")
788            .expect("log should exist");
789
790        assert_eq!(log.len(), 3);
791        assert_eq!(log[0].0.hash(), hash_0);
792        assert_eq!(log[1].0.hash(), hash_1);
793        assert_eq!(log[2].0.hash(), hash_2);
794        assert_eq!(log[0].1, Some(body_0.clone()));
795        assert_eq!(log[1].1, Some(body_1.clone()));
796        assert_eq!(log[2].1, Some(body_2.clone()));
797
798        // Get all log operations starting from sequence number 1.
799        let log = store
800            .get_log(&private_key.public_key(), &log_id, Some(1))
801            .await
802            .expect("no errors")
803            .expect("log should exist");
804
805        assert_eq!(log.len(), 2);
806        assert_eq!(log[0].0.hash(), hash_1);
807        assert_eq!(log[1].0.hash(), hash_2);
808        assert_eq!(log[0].1, Some(body_1.clone()));
809        assert_eq!(log[1].1, Some(body_2.clone()));
810
811        // Get all raw log operations.
812        let log = store
813            .get_raw_log(&private_key.public_key(), &log_id, None)
814            .await
815            .expect("no errors")
816            .expect("log should exist");
817
818        assert_eq!(log.len(), 3);
819        assert_eq!(log[0].0, header_bytes_0);
820        assert_eq!(log[1].0, header_bytes_1);
821        assert_eq!(log[2].0, header_bytes_2);
822        assert_eq!(log[0].1, Some(body_0.to_bytes()));
823        assert_eq!(log[1].1, Some(body_1.to_bytes()));
824        assert_eq!(log[2].1, Some(body_2.to_bytes()));
825
826        // Get all raw log operations starting from sequence number 1.
827        let log = store
828            .get_raw_log(&private_key.public_key(), &log_id, Some(1))
829            .await
830            .expect("no errors")
831            .expect("log should exist");
832
833        assert_eq!(log.len(), 2);
834        assert_eq!(log[0].0, header_bytes_1);
835        assert_eq!(log[1].0, header_bytes_2);
836        assert_eq!(log[0].1, Some(body_1.to_bytes()));
837        assert_eq!(log[1].1, Some(body_2.to_bytes()));
838    }
839
840    #[tokio::test]
841    async fn get_latest_operation() {
842        let db_pool = initialize_sqlite_db().await;
843        let mut store = SqliteStore::new(db_pool);
844        let private_key = PrivateKey::new();
845        let log_id = 0;
846
847        let body_0 = Body::new("hello!".as_bytes());
848        let body_1 = Body::new("hello again!".as_bytes());
849
850        let (hash_0, header_0, header_bytes_0) =
851            create_operation(&private_key, &body_0, 0, 0, None);
852        let (hash_1, header_1, header_bytes_1) =
853            create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
854
855        store
856            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
857            .await
858            .expect("no errors");
859        store
860            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
861            .await
862            .expect("no errors");
863
864        let (latest_header, latest_body) = store
865            .latest_operation(&private_key.public_key(), &log_id)
866            .await
867            .expect("no errors")
868            .expect("there's an operation");
869
870        assert_eq!(latest_header.hash(), header_1.hash());
871        assert_eq!(latest_body, Some(body_1));
872    }
873
874    #[tokio::test]
875    async fn delete_operations() {
876        let db_pool = initialize_sqlite_db().await;
877        let mut store = SqliteStore::new(db_pool);
878        let private_key = PrivateKey::new();
879        let log_id = 0;
880
881        let body_0 = Body::new("hello!".as_bytes());
882        let body_1 = Body::new("hello again!".as_bytes());
883        let body_2 = Body::new("final hello!".as_bytes());
884
885        let (hash_0, header_0, header_bytes_0) =
886            create_operation(&private_key, &body_0, 0, 0, None);
887        let (hash_1, header_1, header_bytes_1) =
888            create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
889        let (hash_2, header_2, header_bytes_2) =
890            create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
891
892        store
893            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
894            .await
895            .expect("no errors");
896        store
897            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
898            .await
899            .expect("no errors");
900        store
901            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
902            .await
903            .expect("no errors");
904
905        // Get all log operations.
906        let log = store
907            .get_log(&private_key.public_key(), &log_id, None)
908            .await
909            .expect("no errors")
910            .expect("log should exist");
911
912        // We expect the log to have 3 operations.
913        assert_eq!(log.len(), 3);
914
915        // Delete all operations _before_ seq_num 2.
916        let deleted = store
917            .delete_operations(&private_key.public_key(), &log_id, 2)
918            .await
919            .expect("no errors");
920        assert!(deleted);
921
922        let log = store
923            .get_log(&private_key.public_key(), &log_id, None)
924            .await
925            .expect("no errors")
926            .expect("log should exist");
927
928        // There is now only one operation in the log.
929        assert_eq!(log.len(), 1);
930
931        // The remaining operation should be the latest (seq_num == 2).
932        assert_eq!(log[0].0.hash(), header_2.hash());
933
934        // Deleting the same range again should return `false`, meaning no deletion occurred.
935        let deleted = store
936            .delete_operations(&private_key.public_key(), &log_id, 2)
937            .await
938            .expect("no errors");
939        assert!(!deleted);
940    }
941
942    #[tokio::test]
943    async fn delete_payloads() {
944        let db_pool = initialize_sqlite_db().await;
945        let mut store = SqliteStore::new(db_pool);
946        let private_key = PrivateKey::new();
947        let log_id = 0;
948
949        let body_0 = Body::new("hello!".as_bytes());
950        let body_1 = Body::new("hello again!".as_bytes());
951        let body_2 = Body::new("final hello!".as_bytes());
952
953        let (hash_0, header_0, header_bytes_0) =
954            create_operation(&private_key, &body_0, 0, 0, None);
955        let (hash_1, header_1, header_bytes_1) =
956            create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
957        let (hash_2, header_2, header_bytes_2) =
958            create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
959
960        store
961            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
962            .await
963            .expect("no errors");
964        store
965            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
966            .await
967            .expect("no errors");
968        store
969            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
970            .await
971            .expect("no errors");
972
973        // Get all log operations.
974        let log = store
975            .get_log(&private_key.public_key(), &log_id, None)
976            .await
977            .expect("no errors")
978            .expect("log should exist");
979
980        // We expect the log to have 3 operations.
981        assert_eq!(log.len(), 3);
982
983        assert_eq!(log[0].1, Some(body_0));
984        assert_eq!(log[1].1, Some(body_1));
985        assert_eq!(log[2].1, Some(body_2.clone()));
986
987        // Delete all operation payloads from sequence number 0 up to but not including 2.
988        let deleted = store
989            .delete_payloads(&private_key.public_key(), &log_id, 0, 2)
990            .await
991            .expect("no errors");
992        assert!(deleted);
993
994        let log = store
995            .get_log(&private_key.public_key(), &log_id, None)
996            .await
997            .expect("no errors")
998            .expect("log should exist");
999
1000        assert_eq!(log[0].1, None);
1001        assert_eq!(log[1].1, None);
1002        assert_eq!(log[2].1, Some(body_2));
1003    }
1004
1005    #[tokio::test]
1006    async fn get_log_heights() {
1007        let db_pool = initialize_sqlite_db().await;
1008        let mut store = SqliteStore::new(db_pool);
1009
1010        let log_id = 0;
1011
1012        let private_key_0 = PrivateKey::new();
1013        let private_key_1 = PrivateKey::new();
1014        let private_key_2 = PrivateKey::new();
1015
1016        let body_0 = Body::new("hello!".as_bytes());
1017        let body_1 = Body::new("hello again!".as_bytes());
1018        let body_2 = Body::new("hello for a third time!".as_bytes());
1019
1020        let (hash_0, header_0, header_bytes_0) =
1021            create_operation(&private_key_0, &body_0, 0, 0, None);
1022        let (hash_1, header_1, header_bytes_1) =
1023            create_operation(&private_key_0, &body_1, 1, 0, Some(hash_0));
1024        let (hash_2, header_2, header_bytes_2) =
1025            create_operation(&private_key_0, &body_2, 2, 0, Some(hash_1));
1026
1027        let log_heights = store.get_log_heights(&log_id).await.expect("no errors");
1028        assert!(log_heights.is_empty());
1029
1030        store
1031            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1032            .await
1033            .expect("no errors");
1034        store
1035            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
1036            .await
1037            .expect("no errors");
1038        store
1039            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
1040            .await
1041            .expect("no errors");
1042
1043        let (hash_0, header_0, header_bytes_0) =
1044            create_operation(&private_key_1, &body_0, 0, 0, None);
1045        let (hash_1, header_1, header_bytes_1) =
1046            create_operation(&private_key_1, &body_1, 1, 0, Some(hash_0));
1047
1048        store
1049            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1050            .await
1051            .expect("no errors");
1052        store
1053            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
1054            .await
1055            .expect("no errors");
1056
1057        let (hash_0, header_0, header_bytes_0) =
1058            create_operation(&private_key_2, &body_0, 0, 0, None);
1059
1060        store
1061            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1062            .await
1063            .expect("no errors");
1064
1065        let log_heights = store.get_log_heights(&log_id).await.expect("no errors");
1066
1067        assert_eq!(log_heights.len(), 3);
1068
1069        // Ensure the correct sequence number for each public key.
1070        assert!(log_heights.contains(&(private_key_0.public_key(), 2)));
1071        assert!(log_heights.contains(&(private_key_1.public_key(), 1)));
1072        assert!(log_heights.contains(&(private_key_2.public_key(), 0)));
1073    }
1074}