Skip to main content

p2panda_store/sqlite/
store.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! SQLite persistent storage.
4use std::hash::{DefaultHasher, Hash as StdHash, Hasher};
5use std::marker::PhantomData;
6
7use sqlx::migrate;
8use sqlx::migrate::{MigrateDatabase, MigrateError};
9use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
10use sqlx::{Error as SqlxError, Sqlite, query, query_as};
11use thiserror::Error;
12
13use p2panda_core::cbor::{DecodeError, EncodeError, encode_cbor};
14use p2panda_core::{Body, Extensions, Hash, Header, PublicKey, RawOperation};
15
16use crate::operations::{LogId, LogStore, OperationStore};
17use crate::sqlite::models::{ByteCount, LogHeightRow, OperationRow, RawOperationRow, SeqAndHash};
18
19#[derive(Debug, Error)]
20pub enum SqliteStoreError {
21    #[error("failed to encode operation extensions: {0}")]
22    EncodingFailed(#[from] EncodeError),
23
24    #[error("failed to decode operation extensions: {0}")]
25    DecodingFailed(#[from] DecodeError),
26
27    #[error("an error occurred with the sqlite database: {0}")]
28    Database(#[from] SqlxError),
29}
30
31impl From<MigrateError> for SqliteStoreError {
32    fn from(error: MigrateError) -> Self {
33        Self::Database(SqlxError::Migrate(Box::new(error)))
34    }
35}
36
37/// Re-export of SQLite connection pool type.
38pub type Pool = SqlitePool;
39
40/// SQLite-based persistent store.
41#[derive(Clone, Debug)]
42pub struct SqliteStore<L, E> {
43    pub(crate) pool: Pool,
44    _marker: PhantomData<(L, E)>,
45}
46
47impl<L, E> SqliteStore<L, E>
48where
49    L: LogId,
50    E: Extensions,
51{
52    /// Create a new `SqliteStore` using the provided db `Pool`.
53    pub fn new(pool: Pool) -> Self {
54        Self {
55            pool,
56            _marker: PhantomData {},
57        }
58    }
59}
60
61/// Create the database if it doesn't already exist.
62pub async fn create_database(url: &str) -> Result<(), SqliteStoreError> {
63    if !Sqlite::database_exists(url).await? {
64        Sqlite::create_database(url).await?
65    }
66
67    Ok(())
68}
69
70/// Drop the database if it exists.
71pub async fn drop_database(url: &str) -> Result<(), SqliteStoreError> {
72    if Sqlite::database_exists(url).await? {
73        Sqlite::drop_database(url).await?
74    }
75
76    Ok(())
77}
78
79/// Create a connection pool.
80pub async fn connection_pool(url: &str, max_connections: u32) -> Result<Pool, SqliteStoreError> {
81    let pool: Pool = SqlitePoolOptions::new()
82        .max_connections(max_connections)
83        .connect(url)
84        .await?;
85
86    Ok(pool)
87}
88
89/// Get migrations without running them
90pub fn migrations() -> migrate::Migrator {
91    migrate!()
92}
93
94/// Run any pending database migrations from inside the application.
95pub async fn run_pending_migrations(pool: &Pool) -> Result<(), SqliteStoreError> {
96    migrations().run(pool).await?;
97
98    Ok(())
99}
100
101fn calculate_hash<T: StdHash>(t: &T) -> u64 {
102    let mut s = DefaultHasher::new();
103    t.hash(&mut s);
104    s.finish()
105}
106
107impl<L, E> OperationStore<L, E> for SqliteStore<L, E>
108where
109    L: LogId + Send + Sync,
110    E: Extensions + Send + Sync,
111{
112    type Error = SqliteStoreError;
113
114    async fn insert_operation(
115        &mut self,
116        hash: Hash,
117        header: &Header<E>,
118        body: Option<&Body>,
119        header_bytes: &[u8],
120        log_id: &L,
121    ) -> Result<bool, Self::Error> {
122        query(
123            "
124            INSERT INTO
125                operations_v1 (
126                    hash,
127                    log_id,
128                    version,
129                    public_key,
130                    signature,
131                    payload_size,
132                    payload_hash,
133                    timestamp,
134                    seq_num,
135                    backlink,
136                    previous,
137                    extensions,
138                    body,
139                    header_bytes,
140                    header_size
141                )
142            VALUES
143                (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
144            ",
145        )
146        .bind(hash.to_string())
147        .bind(calculate_hash(log_id).to_string())
148        .bind(header.version.to_string())
149        .bind(header.public_key.to_hex())
150        .bind(header.signature.map(|sig| sig.to_hex()))
151        .bind(header.payload_size.to_string())
152        .bind(header.payload_hash.map(|hash| hash.to_hex()))
153        .bind(header.timestamp.to_string())
154        .bind(header.seq_num.to_string())
155        .bind(header.backlink.map(|backlink| backlink.to_hex()))
156        .bind(
157            header
158                .previous
159                .iter()
160                .map(|previous| previous.to_hex())
161                .collect::<Vec<String>>()
162                .concat(),
163        )
164        .bind(encode_cbor(&header.extensions).expect("extenions are serializable"))
165        .bind(body.map(|body| body.to_bytes()))
166        .bind(header_bytes)
167        .bind(header_bytes.len().to_string())
168        .execute(&self.pool)
169        .await?;
170
171        Ok(true)
172    }
173
174    async fn get_operation(
175        &self,
176        hash: Hash,
177    ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
178        if let Some(operation) = query_as::<_, OperationRow>(
179            "
180            SELECT
181                hash,
182                log_id,
183                version,
184                public_key,
185                signature,
186                payload_size,
187                payload_hash,
188                timestamp,
189                seq_num,
190                backlink,
191                previous,
192                extensions,
193                body,
194                header_bytes
195            FROM
196                operations_v1
197            WHERE
198                hash = ?
199            ",
200        )
201        .bind(hash.to_string())
202        .fetch_optional(&self.pool)
203        .await?
204        {
205            let body = operation.body.clone().map(|body| body.into());
206            let header: Header<E> = operation.into();
207
208            Ok(Some((header, body)))
209        } else {
210            Ok(None)
211        }
212    }
213
214    async fn get_raw_operation(&self, hash: Hash) -> Result<Option<RawOperation>, Self::Error> {
215        if let Some(operation) = query_as::<_, RawOperationRow>(
216            "
217            SELECT
218                hash,
219                body,
220                header_bytes
221            FROM
222                operations_v1
223            WHERE
224                hash = ?
225            ",
226        )
227        .bind(hash.to_string())
228        .fetch_optional(&self.pool)
229        .await?
230        {
231            let raw_operation = operation.into();
232
233            Ok(Some(raw_operation))
234        } else {
235            Ok(None)
236        }
237    }
238
239    async fn has_operation(&self, hash: Hash) -> Result<bool, Self::Error> {
240        let exists = query(
241            "
242            SELECT
243                1
244            FROM
245                operations_v1
246            WHERE
247                hash = ?
248            ",
249        )
250        .bind(hash.to_string())
251        .fetch_optional(&self.pool)
252        .await?;
253
254        Ok(exists.is_some())
255    }
256
257    async fn delete_operation(&mut self, hash: Hash) -> Result<bool, Self::Error> {
258        let result = query(
259            "
260            DELETE
261            FROM
262                operations_v1
263            WHERE
264                hash = ?
265            ",
266        )
267        .bind(hash.to_string())
268        .execute(&self.pool)
269        .await?;
270
271        Ok(result.rows_affected() > 0)
272    }
273
274    async fn delete_payload(&mut self, hash: Hash) -> Result<bool, Self::Error> {
275        let result = query(
276            "
277            UPDATE
278                operations_v1
279            SET
280                body = NULL
281            WHERE
282                operations_v1.hash = ?
283            ",
284        )
285        .bind(hash.to_string())
286        .execute(&self.pool)
287        .await?;
288
289        Ok(result.rows_affected() > 0)
290    }
291}
292
293impl<L, E> LogStore<L, E> for SqliteStore<L, E>
294where
295    L: LogId + Send + Sync,
296    E: Extensions + Send + Sync,
297{
298    type Error = SqliteStoreError;
299
300    async fn get_log(
301        &self,
302        public_key: &PublicKey,
303        log_id: &L,
304        from: Option<u64>,
305    ) -> Result<Option<Vec<(Header<E>, Option<Body>)>>, Self::Error> {
306        let operations = query_as::<_, OperationRow>(
307            "
308            SELECT
309                hash,
310                log_id,
311                version,
312                public_key,
313                signature,
314                payload_size,
315                payload_hash,
316                timestamp,
317                seq_num,
318                backlink,
319                previous,
320                extensions,
321                body,
322                header_bytes
323            FROM
324                operations_v1
325            WHERE
326                public_key = ?
327                AND log_id = ?
328                AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
329            ORDER BY
330                CAST(seq_num AS NUMERIC)
331            ",
332        )
333        .bind(public_key.to_string())
334        .bind(calculate_hash(log_id).to_string())
335        .bind(from.unwrap_or(0).to_string())
336        .fetch_all(&self.pool)
337        .await?;
338
339        let log: Vec<(Header<E>, Option<Body>)> = operations
340            .into_iter()
341            .map(|operation| {
342                (
343                    operation.clone().into(),
344                    operation.body.map(|body| body.into()),
345                )
346            })
347            .collect();
348
349        if log.is_empty() {
350            Ok(None)
351        } else {
352            Ok(Some(log))
353        }
354    }
355
356    async fn get_raw_log(
357        &self,
358        public_key: &PublicKey,
359        log_id: &L,
360        from: Option<u64>,
361    ) -> Result<Option<Vec<RawOperation>>, Self::Error> {
362        let operations = query_as::<_, RawOperationRow>(
363            "
364            SELECT
365                hash,
366                body,
367                header_bytes
368            FROM
369                operations_v1
370            WHERE
371                public_key = ?
372                AND log_id = ?
373                AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
374            ORDER BY
375                CAST(seq_num AS NUMERIC)
376            ",
377        )
378        .bind(public_key.to_string())
379        .bind(calculate_hash(log_id).to_string())
380        .bind(from.unwrap_or(0).to_string())
381        .fetch_all(&self.pool)
382        .await?;
383
384        let log: Vec<RawOperation> = operations
385            .into_iter()
386            .map(|operation| operation.into())
387            .collect();
388
389        if log.is_empty() {
390            Ok(None)
391        } else {
392            Ok(Some(log))
393        }
394    }
395
396    async fn get_log_size(
397        &self,
398        public_key: &PublicKey,
399        log_id: &L,
400        from: Option<u64>,
401    ) -> Result<Option<u64>, Self::Error> {
402        let bytes_count = query_as::<_, ByteCount>(
403            "
404            SELECT
405                CAST(SUM(CAST(header_size AS NUMERIC)) AS TEXT) AS total_header_size,
406                CAST(SUM(CAST(payload_size AS NUMERIC)) AS TEXT) AS total_payload_size
407            FROM
408                operations_v1
409            WHERE
410                public_key = ?
411                AND log_id = ?
412                AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
413            ",
414        )
415        .bind(public_key.to_string())
416        .bind(calculate_hash(log_id).to_string())
417        .bind(from.unwrap_or(0).to_string())
418        .fetch_one(&self.pool)
419        .await?;
420
421        let total_bytes = bytes_count.total_header_size.parse::<u64>().unwrap()
422            + bytes_count.total_payload_size.parse::<u64>().unwrap();
423
424        if total_bytes == 0 {
425            Ok(None)
426        } else {
427            Ok(Some(total_bytes))
428        }
429    }
430
431    async fn get_log_hashes(
432        &self,
433        public_key: &PublicKey,
434        log_id: &L,
435        from: Option<u64>,
436    ) -> Result<Option<Vec<(u64, Hash)>>, Self::Error> {
437        let hashes = query_as::<_, SeqAndHash>(
438            "
439            SELECT
440                seq_num,
441                hash
442            FROM
443                operations_v1
444            WHERE
445                public_key = ?
446                AND log_id = ?
447                AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
448            ORDER BY
449                CAST(seq_num AS NUMERIC)
450            ",
451        )
452        .bind(public_key.to_string())
453        .bind(calculate_hash(log_id).to_string())
454        .bind(from.unwrap_or(0).to_string())
455        .fetch_all(&self.pool)
456        .await?;
457
458        let hashes: Vec<(u64, Hash)> = hashes.into_iter().map(Into::<(u64, Hash)>::into).collect();
459
460        if hashes.is_empty() {
461            Ok(None)
462        } else {
463            Ok(Some(hashes))
464        }
465    }
466
467    async fn latest_operation(
468        &self,
469        public_key: &PublicKey,
470        log_id: &L,
471    ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
472        if let Some(operation) = query_as::<_, OperationRow>(
473            "
474            SELECT
475                hash,
476                log_id,
477                version,
478                public_key,
479                signature,
480                payload_size,
481                payload_hash,
482                timestamp,
483                seq_num,
484                backlink,
485                previous,
486                extensions,
487                body,
488                header_bytes
489            FROM
490                operations_v1
491            WHERE
492                public_key = ?
493                AND log_id = ?
494            ORDER BY
495                CAST(seq_num AS NUMERIC) DESC LIMIT 1
496            ",
497        )
498        .bind(public_key.to_string())
499        .bind(calculate_hash(log_id).to_string())
500        .fetch_optional(&self.pool)
501        .await?
502        {
503            let body = operation.body.clone().map(|body| body.into());
504            let header: Header<E> = operation.into();
505
506            Ok(Some((header, body)))
507        } else {
508            Ok(None)
509        }
510    }
511
512    async fn delete_operations(
513        &mut self,
514        public_key: &PublicKey,
515        log_id: &L,
516        before: u64,
517    ) -> Result<bool, Self::Error> {
518        let result = query(
519            "
520            DELETE
521            FROM
522                operations_v1
523            WHERE
524                public_key = ?
525                AND log_id = ?
526                AND CAST(seq_num AS NUMERIC) < CAST(? as NUMERIC)
527            ",
528        )
529        .bind(public_key.to_string())
530        .bind(calculate_hash(log_id).to_string())
531        .bind(before.to_string())
532        .execute(&self.pool)
533        .await?;
534
535        Ok(result.rows_affected() > 0)
536    }
537
538    async fn delete_payloads(
539        &mut self,
540        public_key: &PublicKey,
541        log_id: &L,
542        from: u64,
543        to: u64,
544    ) -> Result<bool, Self::Error> {
545        let result = query(
546            "
547            UPDATE
548                operations_v1
549            SET
550                body = NULL
551            WHERE
552                public_key = ?
553                AND log_id = ?
554                AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
555                AND CAST(seq_num AS NUMERIC) < CAST(? as NUMERIC)
556            ",
557        )
558        .bind(public_key.to_string())
559        .bind(calculate_hash(log_id).to_string())
560        .bind(from.to_string())
561        .bind(to.to_string())
562        .execute(&self.pool)
563        .await?;
564
565        Ok(result.rows_affected() > 0)
566    }
567
568    async fn get_log_heights(&self, log_id: &L) -> Result<Vec<(PublicKey, u64)>, Self::Error> {
569        let operations = query_as::<_, LogHeightRow>(
570            "
571            SELECT
572                public_key,
573                CAST(MAX(CAST(seq_num AS NUMERIC)) AS TEXT) as seq_num
574            FROM
575                operations_v1
576            WHERE
577                log_id = ?
578            GROUP BY
579                public_key
580            ",
581        )
582        .bind(calculate_hash(log_id).to_string())
583        .fetch_all(&self.pool)
584        .await?;
585
586        let log_heights: Vec<(PublicKey, u64)> = operations
587            .into_iter()
588            .map(|operation| operation.into())
589            .collect();
590
591        Ok(log_heights)
592    }
593}
594
595#[cfg(test)]
596mod tests {
597    use p2panda_core::{Body, Hash, Header, PrivateKey};
598    use serde::{Deserialize, Serialize};
599
600    use crate::sqlite::test_utils::initialize_sqlite_db;
601    use crate::{LogStore, OperationStore};
602
603    use super::SqliteStore;
604
605    fn create_operation(
606        private_key: &PrivateKey,
607        body: &Body,
608        seq_num: u64,
609        timestamp: u64,
610        backlink: Option<Hash>,
611    ) -> (Hash, Header<()>, Vec<u8>) {
612        let mut header = Header {
613            version: 1,
614            public_key: private_key.public_key(),
615            signature: None,
616            payload_size: body.size(),
617            payload_hash: Some(body.hash()),
618            timestamp,
619            seq_num,
620            backlink,
621            previous: vec![],
622            extensions: (),
623        };
624        header.sign(private_key);
625        let header_bytes = header.to_bytes();
626        (header.hash(), header, header_bytes)
627    }
628
629    #[tokio::test]
630    async fn default_sqlite_store() {
631        let db_pool = initialize_sqlite_db().await;
632
633        let mut store = SqliteStore::new(db_pool);
634        let private_key = PrivateKey::new();
635
636        let body = Body::new("hello!".as_bytes());
637        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
638
639        let inserted = store
640            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
641            .await
642            .expect("no errors");
643
644        assert!(inserted);
645    }
646
647    #[tokio::test]
648    async fn generic_extensions_mem_store() {
649        // Define our own custom extension type.
650        #[derive(Clone, Debug, Default, Serialize, Deserialize)]
651        struct MyExtension {}
652
653        // Instantiate a database pool backed by an in-memory db.
654        let db_pool = initialize_sqlite_db().await;
655
656        // Construct a new store.
657        let mut store = SqliteStore::new(db_pool);
658
659        // Construct an operation using the custom extension.
660        let private_key = PrivateKey::new();
661        let body = Body::new("hello!".as_bytes());
662        let mut header = Header {
663            version: 1,
664            public_key: private_key.public_key(),
665            signature: None,
666            payload_size: body.size(),
667            payload_hash: Some(body.hash()),
668            timestamp: 0,
669            seq_num: 0,
670            backlink: None,
671            previous: vec![],
672            extensions: Some(MyExtension {}),
673        };
674        header.sign(&private_key);
675
676        // Insert the operation into the store, the extension type is inferred.
677        let inserted = store
678            .insert_operation(header.hash(), &header, Some(&body), &header.to_bytes(), &0)
679            .await
680            .expect("no errors");
681        assert!(inserted);
682    }
683
684    #[tokio::test]
685    async fn insert_operation_with_unsigned_header() {
686        let db_pool = initialize_sqlite_db().await;
687        let mut store = SqliteStore::new(db_pool);
688        let private_key = PrivateKey::new();
689
690        // Create the first operation.
691        let body = Body::new("hello!".as_bytes());
692        let (hash, mut header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
693
694        // Set signature to `None` for the sake of the test.
695        header.signature = None;
696
697        // Only insert the first operation into the store.
698        let inserted = store
699            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
700            .await;
701
702        // Ensure that the lack of a header signature returns an error.
703        assert!(inserted.is_err());
704        assert_eq!(
705            format!("{}", inserted.unwrap_err()),
706            "an error occurred with the sqlite database: error returned from database: (code: 1299) NOT NULL constraint failed: operations_v1.signature"
707        );
708    }
709
710    #[tokio::test]
711    async fn insert_get_operation() {
712        let db_pool = initialize_sqlite_db().await;
713        let mut store = SqliteStore::new(db_pool);
714        let private_key = PrivateKey::new();
715
716        // Create the first operation.
717        let body = Body::new("hello!".as_bytes());
718        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
719
720        // Create the second operation.
721        let body_2 = Body::new("buenas!".as_bytes());
722        let (hash_2, _header_2, _header_bytes_2) =
723            create_operation(&private_key, &body_2, 0, 0, None);
724
725        // Only insert the first operation into the store.
726        let inserted = store
727            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
728            .await
729            .expect("no errors");
730        assert!(inserted);
731        // Ensure the store contains the first operation but not the second.
732        assert!(store.has_operation(hash).await.expect("no error"));
733        assert!(!store.has_operation(hash_2).await.expect("no error"));
734
735        let (header_again, body_again) = store
736            .get_operation(hash)
737            .await
738            .expect("no error")
739            .expect("operation exist");
740
741        // Ensure the hash of the created operation header matches that of the retrieved
742        // header hash.
743        assert_eq!(header.hash(), header_again.hash());
744        // Ensure the body of the created operation matches that of the retrieved body.
745        assert_eq!(Some(body.clone()), body_again);
746
747        let (header_bytes_again, body_bytes_again) = store
748            .get_raw_operation(hash)
749            .await
750            .expect("no error")
751            .expect("operation exist");
752
753        assert_eq!(header_bytes_again, header_bytes);
754        assert_eq!(body_bytes_again, Some(body.to_bytes()));
755    }
756
757    #[tokio::test]
758    async fn delete_operation() {
759        let db_pool = initialize_sqlite_db().await;
760        let mut store = SqliteStore::new(db_pool);
761        let private_key = PrivateKey::new();
762
763        let body = Body::new("hello!".as_bytes());
764        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
765
766        // Insert one operation.
767        let inserted = store
768            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
769            .await
770            .expect("no errors");
771        assert!(inserted);
772
773        // Ensure the store contains the operation.
774        assert!(store.has_operation(hash).await.expect("no error"));
775
776        // Delete the operation.
777        assert!(store.delete_operation(hash).await.expect("no error"));
778
779        let deleted_operation = store.get_operation(hash).await.expect("no error");
780        assert!(deleted_operation.is_none());
781        assert!(!store.has_operation(hash).await.expect("no error"));
782
783        let deleted_raw_operation = store.get_raw_operation(hash).await.expect("no error");
784        assert!(deleted_raw_operation.is_none());
785    }
786
787    #[tokio::test]
788    async fn delete_payload() {
789        let db_pool = initialize_sqlite_db().await;
790        let mut store = SqliteStore::new(db_pool);
791        let private_key = PrivateKey::new();
792
793        let body = Body::new("hello!".as_bytes());
794        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
795
796        let inserted = store
797            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
798            .await
799            .expect("no errors");
800        assert!(inserted);
801
802        assert!(store.delete_payload(hash).await.expect("no error"));
803
804        let (_, no_body) = store
805            .get_operation(hash)
806            .await
807            .expect("no error")
808            .expect("operation exist");
809        assert!(no_body.is_none());
810        assert!(store.has_operation(hash).await.expect("no error"));
811
812        let (_, no_body) = store
813            .get_raw_operation(hash)
814            .await
815            .expect("no error")
816            .expect("operation exist");
817        assert!(no_body.is_none());
818    }
819
820    #[tokio::test]
821    async fn get_log() {
822        let db_pool = initialize_sqlite_db().await;
823        let mut store = SqliteStore::new(db_pool);
824        let private_key = PrivateKey::new();
825        let log_id = 0;
826
827        let body_0 = Body::new("hello!".as_bytes());
828        let body_1 = Body::new("hello again!".as_bytes());
829        let body_2 = Body::new("hello for a third time!".as_bytes());
830
831        let (hash_0, header_0, header_bytes_0) =
832            create_operation(&private_key, &body_0, 0, 0, None);
833        let (hash_1, header_1, header_bytes_1) =
834            create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
835        let (hash_2, header_2, header_bytes_2) =
836            create_operation(&private_key, &body_2, 2, 0, Some(hash_1));
837
838        store
839            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
840            .await
841            .expect("no errors");
842        store
843            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
844            .await
845            .expect("no errors");
846        store
847            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
848            .await
849            .expect("no errors");
850
851        // Get all log operations.
852        let log = store
853            .get_log(&private_key.public_key(), &log_id, None)
854            .await
855            .expect("no errors")
856            .expect("log should exist");
857
858        assert_eq!(log.len(), 3);
859        assert_eq!(log[0].0.hash(), hash_0);
860        assert_eq!(log[1].0.hash(), hash_1);
861        assert_eq!(log[2].0.hash(), hash_2);
862        assert_eq!(log[0].1, Some(body_0.clone()));
863        assert_eq!(log[1].1, Some(body_1.clone()));
864        assert_eq!(log[2].1, Some(body_2.clone()));
865
866        // Get all log operations starting from sequence number 1.
867        let log = store
868            .get_log(&private_key.public_key(), &log_id, Some(1))
869            .await
870            .expect("no errors")
871            .expect("log should exist");
872
873        assert_eq!(log.len(), 2);
874        assert_eq!(log[0].0.hash(), hash_1);
875        assert_eq!(log[1].0.hash(), hash_2);
876        assert_eq!(log[0].1, Some(body_1.clone()));
877        assert_eq!(log[1].1, Some(body_2.clone()));
878
879        // Get all raw log operations.
880        let log = store
881            .get_raw_log(&private_key.public_key(), &log_id, None)
882            .await
883            .expect("no errors")
884            .expect("log should exist");
885
886        assert_eq!(log.len(), 3);
887        assert_eq!(log[0].0, header_bytes_0);
888        assert_eq!(log[1].0, header_bytes_1);
889        assert_eq!(log[2].0, header_bytes_2);
890        assert_eq!(log[0].1, Some(body_0.to_bytes()));
891        assert_eq!(log[1].1, Some(body_1.to_bytes()));
892        assert_eq!(log[2].1, Some(body_2.to_bytes()));
893
894        // Get all raw log operations starting from sequence number 1.
895        let log = store
896            .get_raw_log(&private_key.public_key(), &log_id, Some(1))
897            .await
898            .expect("no errors")
899            .expect("log should exist");
900
901        assert_eq!(log.len(), 2);
902        assert_eq!(log[0].0, header_bytes_1);
903        assert_eq!(log[1].0, header_bytes_2);
904        assert_eq!(log[0].1, Some(body_1.to_bytes()));
905        assert_eq!(log[1].1, Some(body_2.to_bytes()));
906    }
907
908    #[tokio::test]
909    async fn get_log_hashes_and_size() {
910        let db_pool = initialize_sqlite_db().await;
911        let mut store = SqliteStore::new(db_pool);
912        let private_key = PrivateKey::new();
913        let log_id = 0;
914
915        let body_0 = Body::new("hello!".as_bytes());
916        let body_1 = Body::new("hello again!".as_bytes());
917        let body_2 = Body::new("hello for a third time!".as_bytes());
918
919        let (hash_0, header_0, header_bytes_0) =
920            create_operation(&private_key, &body_0, 0, 0, None);
921        let (hash_1, header_1, header_bytes_1) =
922            create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
923        let (hash_2, header_2, header_bytes_2) =
924            create_operation(&private_key, &body_2, 2, 0, Some(hash_1));
925
926        store
927            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
928            .await
929            .expect("no errors");
930        store
931            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
932            .await
933            .expect("no errors");
934        store
935            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
936            .await
937            .expect("no errors");
938
939        // Get all log hashes.
940        let hashes = store
941            .get_log_hashes(&private_key.public_key(), &log_id, None)
942            .await
943            .expect("no errors")
944            .expect("log should exist");
945
946        assert_eq!(hashes.len(), 3);
947        assert_eq!(hashes[0], (0, hash_0));
948        assert_eq!(hashes[1], (1, hash_1));
949        assert_eq!(hashes[2], (2, hash_2));
950
951        // Get sum of log byte lengths.
952        let size = store
953            .get_log_size(&private_key.public_key(), &log_id, None)
954            .await
955            .expect("no errors")
956            .expect("log should exist");
957
958        let expected_size = header_bytes_0.len() as u64
959            + header_0.payload_size
960            + header_bytes_1.len() as u64
961            + header_1.payload_size
962            + header_bytes_2.len() as u64
963            + header_2.payload_size;
964        assert_eq!(size, expected_size);
965
966        // Get all log hashes starting from sequence number 1.
967        let hashes = store
968            .get_log_hashes(&private_key.public_key(), &log_id, Some(1))
969            .await
970            .expect("no errors")
971            .expect("log should exist");
972
973        assert_eq!(hashes.len(), 2);
974        assert_eq!(hashes[0], (1, hash_1));
975        assert_eq!(hashes[1], (2, hash_2));
976
977        // Get sum of log byte lengths from sequence number 1.
978        let size = store
979            .get_log_size(&private_key.public_key(), &log_id, Some(1))
980            .await
981            .expect("no errors")
982            .expect("log should exist");
983
984        let expected_size = header_bytes_1.len() as u64
985            + header_1.payload_size
986            + header_bytes_2.len() as u64
987            + header_2.payload_size;
988        assert_eq!(size, expected_size);
989    }
990
991    #[tokio::test]
992    async fn get_latest_operation() {
993        let db_pool = initialize_sqlite_db().await;
994        let mut store = SqliteStore::new(db_pool);
995        let private_key = PrivateKey::new();
996        let log_id = 0;
997
998        let body_0 = Body::new("hello!".as_bytes());
999        let body_1 = Body::new("hello again!".as_bytes());
1000        let body_2 = Body::new("hello?!?!".as_bytes());
1001
1002        let (hash_0, header_0, header_bytes_0) =
1003            create_operation(&private_key, &body_0, 0, 0, None);
1004        let (hash_1, header_1, header_bytes_1) =
1005            create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
1006        let (hash_2, mut header_2, header_bytes_2) =
1007            create_operation(&private_key, &body_2, 2, 0, Some(hash_1));
1008        header_2.previous = vec![hash_0];
1009
1010        store
1011            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
1012            .await
1013            .expect("no errors");
1014        store
1015            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
1016            .await
1017            .expect("no errors");
1018        store
1019            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
1020            .await
1021            .expect("no errors");
1022
1023        let (latest_header, latest_body) = store
1024            .latest_operation(&private_key.public_key(), &log_id)
1025            .await
1026            .expect("no errors")
1027            .expect("there's an operation");
1028
1029        assert_eq!(latest_header.hash(), header_2.hash());
1030        assert_eq!(latest_body, Some(body_2));
1031    }
1032
1033    #[tokio::test]
1034    async fn delete_operations() {
1035        let db_pool = initialize_sqlite_db().await;
1036        let mut store = SqliteStore::new(db_pool);
1037        let private_key = PrivateKey::new();
1038        let log_id = 0;
1039
1040        let body_0 = Body::new("hello!".as_bytes());
1041        let body_1 = Body::new("hello again!".as_bytes());
1042        let body_2 = Body::new("final hello!".as_bytes());
1043
1044        let (hash_0, header_0, header_bytes_0) =
1045            create_operation(&private_key, &body_0, 0, 0, None);
1046        let (hash_1, header_1, header_bytes_1) =
1047            create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
1048        let (hash_2, header_2, header_bytes_2) =
1049            create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
1050
1051        store
1052            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
1053            .await
1054            .expect("no errors");
1055        store
1056            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
1057            .await
1058            .expect("no errors");
1059        store
1060            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
1061            .await
1062            .expect("no errors");
1063
1064        // Get all log operations.
1065        let log = store
1066            .get_log(&private_key.public_key(), &log_id, None)
1067            .await
1068            .expect("no errors")
1069            .expect("log should exist");
1070
1071        // We expect the log to have 3 operations.
1072        assert_eq!(log.len(), 3);
1073
1074        // Delete all operations _before_ seq_num 2.
1075        let deleted = store
1076            .delete_operations(&private_key.public_key(), &log_id, 2)
1077            .await
1078            .expect("no errors");
1079        assert!(deleted);
1080
1081        let log = store
1082            .get_log(&private_key.public_key(), &log_id, None)
1083            .await
1084            .expect("no errors")
1085            .expect("log should exist");
1086
1087        // There is now only one operation in the log.
1088        assert_eq!(log.len(), 1);
1089
1090        // The remaining operation should be the latest (seq_num == 2).
1091        assert_eq!(log[0].0.hash(), header_2.hash());
1092
1093        // Deleting the same range again should return `false`, meaning no deletion occurred.
1094        let deleted = store
1095            .delete_operations(&private_key.public_key(), &log_id, 2)
1096            .await
1097            .expect("no errors");
1098        assert!(!deleted);
1099    }
1100
1101    #[tokio::test]
1102    async fn delete_payloads() {
1103        let db_pool = initialize_sqlite_db().await;
1104        let mut store = SqliteStore::new(db_pool);
1105        let private_key = PrivateKey::new();
1106        let log_id = 0;
1107
1108        let body_0 = Body::new("hello!".as_bytes());
1109        let body_1 = Body::new("hello again!".as_bytes());
1110        let body_2 = Body::new("final hello!".as_bytes());
1111
1112        let (hash_0, header_0, header_bytes_0) =
1113            create_operation(&private_key, &body_0, 0, 0, None);
1114        let (hash_1, header_1, header_bytes_1) =
1115            create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
1116        let (hash_2, header_2, header_bytes_2) =
1117            create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
1118
1119        store
1120            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
1121            .await
1122            .expect("no errors");
1123        store
1124            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
1125            .await
1126            .expect("no errors");
1127        store
1128            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
1129            .await
1130            .expect("no errors");
1131
1132        // Get all log operations.
1133        let log = store
1134            .get_log(&private_key.public_key(), &log_id, None)
1135            .await
1136            .expect("no errors")
1137            .expect("log should exist");
1138
1139        // We expect the log to have 3 operations.
1140        assert_eq!(log.len(), 3);
1141
1142        assert_eq!(log[0].1, Some(body_0));
1143        assert_eq!(log[1].1, Some(body_1));
1144        assert_eq!(log[2].1, Some(body_2.clone()));
1145
1146        // Delete all operation payloads from sequence number 0 up to but not including 2.
1147        let deleted = store
1148            .delete_payloads(&private_key.public_key(), &log_id, 0, 2)
1149            .await
1150            .expect("no errors");
1151        assert!(deleted);
1152
1153        let log = store
1154            .get_log(&private_key.public_key(), &log_id, None)
1155            .await
1156            .expect("no errors")
1157            .expect("log should exist");
1158
1159        assert_eq!(log[0].1, None);
1160        assert_eq!(log[1].1, None);
1161        assert_eq!(log[2].1, Some(body_2));
1162    }
1163
1164    #[tokio::test]
1165    async fn get_log_heights() {
1166        let db_pool = initialize_sqlite_db().await;
1167        let mut store = SqliteStore::new(db_pool);
1168
1169        let log_id = 0;
1170
1171        let private_key_0 = PrivateKey::new();
1172        let private_key_1 = PrivateKey::new();
1173        let private_key_2 = PrivateKey::new();
1174
1175        let body_0 = Body::new("hello!".as_bytes());
1176        let body_1 = Body::new("hello again!".as_bytes());
1177        let body_2 = Body::new("hello for a third time!".as_bytes());
1178
1179        let (hash_0, header_0, header_bytes_0) =
1180            create_operation(&private_key_0, &body_0, 0, 0, None);
1181        let (hash_1, header_1, header_bytes_1) =
1182            create_operation(&private_key_0, &body_1, 1, 0, Some(hash_0));
1183        let (hash_2, header_2, header_bytes_2) =
1184            create_operation(&private_key_0, &body_2, 2, 0, Some(hash_1));
1185
1186        let log_heights = store.get_log_heights(&log_id).await.expect("no errors");
1187        assert!(log_heights.is_empty());
1188
1189        store
1190            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1191            .await
1192            .expect("no errors");
1193        store
1194            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
1195            .await
1196            .expect("no errors");
1197        store
1198            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
1199            .await
1200            .expect("no errors");
1201
1202        let (hash_0, header_0, header_bytes_0) =
1203            create_operation(&private_key_1, &body_0, 0, 0, None);
1204        let (hash_1, header_1, header_bytes_1) =
1205            create_operation(&private_key_1, &body_1, 1, 0, Some(hash_0));
1206
1207        store
1208            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1209            .await
1210            .expect("no errors");
1211        store
1212            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
1213            .await
1214            .expect("no errors");
1215
1216        let (hash_0, header_0, header_bytes_0) =
1217            create_operation(&private_key_2, &body_0, 0, 0, None);
1218
1219        store
1220            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1221            .await
1222            .expect("no errors");
1223
1224        let log_heights = store.get_log_heights(&log_id).await.expect("no errors");
1225
1226        assert_eq!(log_heights.len(), 3);
1227
1228        // Ensure the correct sequence number for each public key.
1229        assert!(log_heights.contains(&(private_key_0.public_key(), 2)));
1230        assert!(log_heights.contains(&(private_key_1.public_key(), 1)));
1231        assert!(log_heights.contains(&(private_key_2.public_key(), 0)));
1232    }
1233}