1use std::hash::{DefaultHasher, Hash as StdHash, Hasher};
5use std::marker::PhantomData;
6
7use sqlx::migrate;
8use sqlx::migrate::{MigrateDatabase, MigrateError};
9use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
10use sqlx::{Error as SqlxError, Sqlite, query, query_as};
11use thiserror::Error;
12
13use p2panda_core::cbor::{DecodeError, EncodeError, encode_cbor};
14use p2panda_core::{Body, Extensions, Hash, Header, PublicKey, RawOperation};
15
16use crate::sqlite::models::{LogHeightRow, OperationRow, RawOperationRow};
17use crate::{LogId, LogStore, OperationStore};
18
19#[derive(Debug, Error)]
20pub enum SqliteStoreError {
21 #[error("failed to encode operation extensions: {0}")]
22 EncodingFailed(#[from] EncodeError),
23
24 #[error("failed to decode operation extensions: {0}")]
25 DecodingFailed(#[from] DecodeError),
26
27 #[error("an error occurred with the sqlite database: {0}")]
28 Database(#[from] SqlxError),
29}
30
31impl From<MigrateError> for SqliteStoreError {
32 fn from(error: MigrateError) -> Self {
33 Self::Database(SqlxError::Migrate(Box::new(error)))
34 }
35}
36
37pub type Pool = SqlitePool;
39
40#[derive(Clone, Debug)]
42pub struct SqliteStore<L, E> {
43 pub(crate) pool: Pool,
44 _marker: PhantomData<(L, E)>,
45}
46
47impl<L, E> SqliteStore<L, E>
48where
49 L: LogId,
50 E: Extensions,
51{
52 pub fn new(pool: Pool) -> Self {
54 Self {
55 pool,
56 _marker: PhantomData {},
57 }
58 }
59}
60
61pub async fn create_database(url: &str) -> Result<(), SqliteStoreError> {
63 if !Sqlite::database_exists(url).await? {
64 Sqlite::create_database(url).await?
65 }
66
67 Ok(())
68}
69
70pub async fn drop_database(url: &str) -> Result<(), SqliteStoreError> {
72 if Sqlite::database_exists(url).await? {
73 Sqlite::drop_database(url).await?
74 }
75
76 Ok(())
77}
78
79pub async fn connection_pool(url: &str, max_connections: u32) -> Result<Pool, SqliteStoreError> {
81 let pool: Pool = SqlitePoolOptions::new()
82 .max_connections(max_connections)
83 .connect(url)
84 .await?;
85
86 Ok(pool)
87}
88
89pub async fn run_pending_migrations(pool: &Pool) -> Result<(), SqliteStoreError> {
91 migrate!().run(pool).await?;
92
93 Ok(())
94}
95
96fn calculate_hash<T: StdHash>(t: &T) -> u64 {
97 let mut s = DefaultHasher::new();
98 t.hash(&mut s);
99 s.finish()
100}
101
102impl<L, E> OperationStore<L, E> for SqliteStore<L, E>
103where
104 L: LogId + Send + Sync,
105 E: Extensions + Send + Sync,
106{
107 type Error = SqliteStoreError;
108
109 async fn insert_operation(
110 &mut self,
111 hash: Hash,
112 header: &Header<E>,
113 body: Option<&Body>,
114 header_bytes: &[u8],
115 log_id: &L,
116 ) -> Result<bool, Self::Error> {
117 query(
118 "
119 INSERT INTO
120 operations_v1 (
121 hash,
122 log_id,
123 version,
124 public_key,
125 signature,
126 payload_size,
127 payload_hash,
128 timestamp,
129 seq_num,
130 backlink,
131 previous,
132 extensions,
133 body,
134 header_bytes
135 )
136 VALUES
137 (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
138 ",
139 )
140 .bind(hash.to_string())
141 .bind(calculate_hash(log_id).to_string())
142 .bind(header.version.to_string())
143 .bind(header.public_key.to_hex())
144 .bind(header.signature.map(|sig| sig.to_hex()))
145 .bind(header.payload_size.to_string())
146 .bind(header.payload_hash.map(|hash| hash.to_hex()))
147 .bind(header.timestamp.to_string())
148 .bind(header.seq_num.to_string())
149 .bind(header.backlink.map(|backlink| backlink.to_hex()))
150 .bind(
151 header
152 .previous
153 .iter()
154 .map(|previous| previous.to_hex())
155 .collect::<Vec<String>>()
156 .concat(),
157 )
158 .bind(
159 header
160 .extensions
161 .as_ref()
162 .map(|extensions| encode_cbor(extensions).expect("extenions are serializable")),
163 )
164 .bind(body.map(|body| body.to_bytes()))
165 .bind(header_bytes)
166 .execute(&self.pool)
167 .await?;
168
169 Ok(true)
170 }
171
172 async fn get_operation(
173 &self,
174 hash: Hash,
175 ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
176 if let Some(operation) = query_as::<_, OperationRow>(
177 "
178 SELECT
179 hash,
180 log_id,
181 version,
182 public_key,
183 signature,
184 payload_size,
185 payload_hash,
186 timestamp,
187 seq_num,
188 backlink,
189 previous,
190 extensions,
191 body,
192 header_bytes
193 FROM
194 operations_v1
195 WHERE
196 hash = ?
197 ",
198 )
199 .bind(hash.to_string())
200 .fetch_optional(&self.pool)
201 .await?
202 {
203 let body = operation.body.clone().map(|body| body.into());
204 let header: Header<E> = operation.into();
205
206 Ok(Some((header, body)))
207 } else {
208 Ok(None)
209 }
210 }
211
212 async fn get_raw_operation(&self, hash: Hash) -> Result<Option<RawOperation>, Self::Error> {
213 if let Some(operation) = query_as::<_, RawOperationRow>(
214 "
215 SELECT
216 hash,
217 body,
218 header_bytes
219 FROM
220 operations_v1
221 WHERE
222 hash = ?
223 ",
224 )
225 .bind(hash.to_string())
226 .fetch_optional(&self.pool)
227 .await?
228 {
229 let raw_operation = operation.into();
230
231 Ok(Some(raw_operation))
232 } else {
233 Ok(None)
234 }
235 }
236
237 async fn has_operation(&self, hash: Hash) -> Result<bool, Self::Error> {
238 let exists = query(
239 "
240 SELECT
241 1
242 FROM
243 operations_v1
244 WHERE
245 hash = ?
246 ",
247 )
248 .bind(hash.to_string())
249 .fetch_optional(&self.pool)
250 .await?;
251
252 Ok(exists.is_some())
253 }
254
255 async fn delete_operation(&mut self, hash: Hash) -> Result<bool, Self::Error> {
256 let result = query(
257 "
258 DELETE
259 FROM
260 operations_v1
261 WHERE
262 hash = ?
263 ",
264 )
265 .bind(hash.to_string())
266 .execute(&self.pool)
267 .await?;
268
269 Ok(result.rows_affected() > 0)
270 }
271
272 async fn delete_payload(&mut self, hash: Hash) -> Result<bool, Self::Error> {
273 let result = query(
274 "
275 UPDATE
276 operations_v1
277 SET
278 body = NULL
279 WHERE
280 operations_v1.hash = ?
281 ",
282 )
283 .bind(hash.to_string())
284 .execute(&self.pool)
285 .await?;
286
287 Ok(result.rows_affected() > 0)
288 }
289}
290
291impl<L, E> LogStore<L, E> for SqliteStore<L, E>
292where
293 L: LogId + Send + Sync,
294 E: Extensions + Send + Sync,
295{
296 type Error = SqliteStoreError;
297
298 async fn get_log(
299 &self,
300 public_key: &PublicKey,
301 log_id: &L,
302 from: Option<u64>,
303 ) -> Result<Option<Vec<(Header<E>, Option<Body>)>>, Self::Error> {
304 let operations = query_as::<_, OperationRow>(
305 "
306 SELECT
307 hash,
308 log_id,
309 version,
310 public_key,
311 signature,
312 payload_size,
313 payload_hash,
314 timestamp,
315 seq_num,
316 backlink,
317 previous,
318 extensions,
319 body,
320 header_bytes
321 FROM
322 operations_v1
323 WHERE
324 public_key = ?
325 AND log_id = ?
326 AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
327 ORDER BY
328 CAST(seq_num AS NUMERIC)
329 ",
330 )
331 .bind(public_key.to_string())
332 .bind(calculate_hash(log_id).to_string())
333 .bind(from.unwrap_or(0).to_string())
334 .fetch_all(&self.pool)
335 .await?;
336
337 let log: Vec<(Header<E>, Option<Body>)> = operations
338 .into_iter()
339 .map(|operation| {
340 (
341 operation.clone().into(),
342 operation.body.map(|body| body.into()),
343 )
344 })
345 .collect();
346
347 if log.is_empty() {
348 Ok(None)
349 } else {
350 Ok(Some(log))
351 }
352 }
353
354 async fn get_raw_log(
355 &self,
356 public_key: &PublicKey,
357 log_id: &L,
358 from: Option<u64>,
359 ) -> Result<Option<Vec<RawOperation>>, Self::Error> {
360 let operations = query_as::<_, RawOperationRow>(
361 "
362 SELECT
363 hash,
364 body,
365 header_bytes
366 FROM
367 operations_v1
368 WHERE
369 public_key = ?
370 AND log_id = ?
371 AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
372 ORDER BY
373 CAST(seq_num AS NUMERIC)
374 ",
375 )
376 .bind(public_key.to_string())
377 .bind(calculate_hash(log_id).to_string())
378 .bind(from.unwrap_or(0).to_string())
379 .fetch_all(&self.pool)
380 .await?;
381
382 let log: Vec<RawOperation> = operations
383 .into_iter()
384 .map(|operation| operation.into())
385 .collect();
386
387 if log.is_empty() {
388 Ok(None)
389 } else {
390 Ok(Some(log))
391 }
392 }
393
394 async fn latest_operation(
395 &self,
396 public_key: &PublicKey,
397 log_id: &L,
398 ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
399 if let Some(operation) = query_as::<_, OperationRow>(
400 "
401 SELECT
402 hash,
403 log_id,
404 version,
405 public_key,
406 signature,
407 payload_size,
408 payload_hash,
409 timestamp,
410 seq_num,
411 backlink,
412 previous,
413 extensions,
414 body,
415 header_bytes
416 FROM
417 operations_v1
418 WHERE
419 public_key = ?
420 AND log_id = ?
421 ORDER BY
422 CAST(seq_num AS NUMERIC) DESC LIMIT 1
423 ",
424 )
425 .bind(public_key.to_string())
426 .bind(calculate_hash(log_id).to_string())
427 .fetch_optional(&self.pool)
428 .await?
429 {
430 let body = operation.body.clone().map(|body| body.into());
431 let header: Header<E> = operation.into();
432
433 Ok(Some((header, body)))
434 } else {
435 Ok(None)
436 }
437 }
438
439 async fn delete_operations(
440 &mut self,
441 public_key: &PublicKey,
442 log_id: &L,
443 before: u64,
444 ) -> Result<bool, Self::Error> {
445 let result = query(
446 "
447 DELETE
448 FROM
449 operations_v1
450 WHERE
451 public_key = ?
452 AND log_id = ?
453 AND CAST(seq_num AS NUMERIC) < CAST(? as NUMERIC)
454 ",
455 )
456 .bind(public_key.to_string())
457 .bind(calculate_hash(log_id).to_string())
458 .bind(before.to_string())
459 .execute(&self.pool)
460 .await?;
461
462 Ok(result.rows_affected() > 0)
463 }
464
465 async fn delete_payloads(
466 &mut self,
467 public_key: &PublicKey,
468 log_id: &L,
469 from: u64,
470 to: u64,
471 ) -> Result<bool, Self::Error> {
472 let result = query(
473 "
474 UPDATE
475 operations_v1
476 SET
477 body = NULL
478 WHERE
479 public_key = ?
480 AND log_id = ?
481 AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
482 AND CAST(seq_num AS NUMERIC) < CAST(? as NUMERIC)
483 ",
484 )
485 .bind(public_key.to_string())
486 .bind(calculate_hash(log_id).to_string())
487 .bind(from.to_string())
488 .bind(to.to_string())
489 .execute(&self.pool)
490 .await?;
491
492 Ok(result.rows_affected() > 0)
493 }
494
495 async fn get_log_heights(&self, log_id: &L) -> Result<Vec<(PublicKey, u64)>, Self::Error> {
496 let operations = query_as::<_, LogHeightRow>(
497 "
498 SELECT
499 public_key,
500 CAST(MAX(CAST(seq_num AS NUMERIC)) AS TEXT) as seq_num
501 FROM
502 operations_v1
503 WHERE
504 log_id = ?
505 GROUP BY
506 public_key
507 ",
508 )
509 .bind(calculate_hash(log_id).to_string())
510 .fetch_all(&self.pool)
511 .await?;
512
513 let log_heights: Vec<(PublicKey, u64)> = operations
514 .into_iter()
515 .map(|operation| operation.into())
516 .collect();
517
518 Ok(log_heights)
519 }
520}
521
522#[cfg(test)]
523mod tests {
524 use p2panda_core::{Body, Hash, Header, PrivateKey};
525 use serde::{Deserialize, Serialize};
526
527 use crate::sqlite::test_utils::initialize_sqlite_db;
528 use crate::{LogStore, OperationStore};
529
530 use super::SqliteStore;
531
532 fn create_operation(
533 private_key: &PrivateKey,
534 body: &Body,
535 seq_num: u64,
536 timestamp: u64,
537 backlink: Option<Hash>,
538 ) -> (Hash, Header<()>, Vec<u8>) {
539 let mut header = Header {
540 version: 1,
541 public_key: private_key.public_key(),
542 signature: None,
543 payload_size: body.size(),
544 payload_hash: Some(body.hash()),
545 timestamp,
546 seq_num,
547 backlink,
548 previous: vec![],
549 extensions: None,
550 };
551 header.sign(private_key);
552 let header_bytes = header.to_bytes();
553 (header.hash(), header, header_bytes)
554 }
555
556 #[tokio::test]
557 async fn default_sqlite_store() {
558 let db_pool = initialize_sqlite_db().await;
559
560 let mut store = SqliteStore::new(db_pool);
561 let private_key = PrivateKey::new();
562
563 let body = Body::new("hello!".as_bytes());
564 let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
565
566 let inserted = store
567 .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
568 .await
569 .expect("no errors");
570
571 assert!(inserted);
572 }
573
574 #[tokio::test]
575 async fn generic_extensions_mem_store() {
576 #[derive(Clone, Debug, Default, Serialize, Deserialize)]
578 struct MyExtension {}
579
580 let db_pool = initialize_sqlite_db().await;
582
583 let mut store = SqliteStore::new(db_pool);
585
586 let private_key = PrivateKey::new();
588 let body = Body::new("hello!".as_bytes());
589 let mut header = Header {
590 version: 1,
591 public_key: private_key.public_key(),
592 signature: None,
593 payload_size: body.size(),
594 payload_hash: Some(body.hash()),
595 timestamp: 0,
596 seq_num: 0,
597 backlink: None,
598 previous: vec![],
599 extensions: Some(MyExtension {}),
600 };
601 header.sign(&private_key);
602
603 let inserted = store
605 .insert_operation(header.hash(), &header, Some(&body), &header.to_bytes(), &0)
606 .await
607 .expect("no errors");
608 assert!(inserted);
609 }
610
611 #[tokio::test]
612 async fn insert_operation_with_unsigned_header() {
613 let db_pool = initialize_sqlite_db().await;
614 let mut store = SqliteStore::new(db_pool);
615 let private_key = PrivateKey::new();
616
617 let body = Body::new("hello!".as_bytes());
619 let (hash, mut header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
620
621 header.signature = None;
623
624 let inserted = store
626 .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
627 .await;
628
629 assert!(inserted.is_err());
631 assert_eq!(
632 format!("{}", inserted.unwrap_err()),
633 "an error occurred with the sqlite database: error returned from database: (code: 1299) NOT NULL constraint failed: operations_v1.signature"
634 );
635 }
636
637 #[tokio::test]
638 async fn insert_get_operation() {
639 let db_pool = initialize_sqlite_db().await;
640 let mut store = SqliteStore::new(db_pool);
641 let private_key = PrivateKey::new();
642
643 let body = Body::new("hello!".as_bytes());
645 let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
646
647 let body_2 = Body::new("buenas!".as_bytes());
649 let (hash_2, _header_2, _header_bytes_2) =
650 create_operation(&private_key, &body_2, 0, 0, None);
651
652 let inserted = store
654 .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
655 .await
656 .expect("no errors");
657 assert!(inserted);
658 assert!(store.has_operation(hash).await.expect("no error"));
660 assert!(!store.has_operation(hash_2).await.expect("no error"));
661
662 let (header_again, body_again) = store
663 .get_operation(hash)
664 .await
665 .expect("no error")
666 .expect("operation exist");
667
668 assert_eq!(header.hash(), header_again.hash());
671 assert_eq!(Some(body.clone()), body_again);
673
674 let (header_bytes_again, body_bytes_again) = store
675 .get_raw_operation(hash)
676 .await
677 .expect("no error")
678 .expect("operation exist");
679
680 assert_eq!(header_bytes_again, header_bytes);
681 assert_eq!(body_bytes_again, Some(body.to_bytes()));
682 }
683
684 #[tokio::test]
685 async fn delete_operation() {
686 let db_pool = initialize_sqlite_db().await;
687 let mut store = SqliteStore::new(db_pool);
688 let private_key = PrivateKey::new();
689
690 let body = Body::new("hello!".as_bytes());
691 let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
692
693 let inserted = store
695 .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
696 .await
697 .expect("no errors");
698 assert!(inserted);
699
700 assert!(store.has_operation(hash).await.expect("no error"));
702
703 assert!(store.delete_operation(hash).await.expect("no error"));
705
706 let deleted_operation = store.get_operation(hash).await.expect("no error");
707 assert!(deleted_operation.is_none());
708 assert!(!store.has_operation(hash).await.expect("no error"));
709
710 let deleted_raw_operation = store.get_raw_operation(hash).await.expect("no error");
711 assert!(deleted_raw_operation.is_none());
712 }
713
714 #[tokio::test]
715 async fn delete_payload() {
716 let db_pool = initialize_sqlite_db().await;
717 let mut store = SqliteStore::new(db_pool);
718 let private_key = PrivateKey::new();
719
720 let body = Body::new("hello!".as_bytes());
721 let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
722
723 let inserted = store
724 .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
725 .await
726 .expect("no errors");
727 assert!(inserted);
728
729 assert!(store.delete_payload(hash).await.expect("no error"));
730
731 let (_, no_body) = store
732 .get_operation(hash)
733 .await
734 .expect("no error")
735 .expect("operation exist");
736 assert!(no_body.is_none());
737 assert!(store.has_operation(hash).await.expect("no error"));
738
739 let (_, no_body) = store
740 .get_raw_operation(hash)
741 .await
742 .expect("no error")
743 .expect("operation exist");
744 assert!(no_body.is_none());
745 }
746
747 #[tokio::test]
748 async fn get_log() {
749 let db_pool = initialize_sqlite_db().await;
750 let mut store = SqliteStore::new(db_pool);
751 let private_key = PrivateKey::new();
752 let log_id = 0;
753
754 let body_0 = Body::new("hello!".as_bytes());
755 let body_1 = Body::new("hello again!".as_bytes());
756 let body_2 = Body::new("hello for a third time!".as_bytes());
757
758 let (hash_0, header_0, header_bytes_0) =
759 create_operation(&private_key, &body_0, 0, 0, None);
760 let (hash_1, header_1, header_bytes_1) =
761 create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
762 let (hash_2, header_2, header_bytes_2) =
763 create_operation(&private_key, &body_2, 2, 0, Some(hash_1));
764
765 store
766 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
767 .await
768 .expect("no errors");
769 store
770 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
771 .await
772 .expect("no errors");
773 store
774 .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
775 .await
776 .expect("no errors");
777
778 let log = store
780 .get_log(&private_key.public_key(), &log_id, None)
781 .await
782 .expect("no errors")
783 .expect("log should exist");
784
785 assert_eq!(log.len(), 3);
786 assert_eq!(log[0].0.hash(), hash_0);
787 assert_eq!(log[1].0.hash(), hash_1);
788 assert_eq!(log[2].0.hash(), hash_2);
789 assert_eq!(log[0].1, Some(body_0.clone()));
790 assert_eq!(log[1].1, Some(body_1.clone()));
791 assert_eq!(log[2].1, Some(body_2.clone()));
792
793 let log = store
795 .get_log(&private_key.public_key(), &log_id, Some(1))
796 .await
797 .expect("no errors")
798 .expect("log should exist");
799
800 assert_eq!(log.len(), 2);
801 assert_eq!(log[0].0.hash(), hash_1);
802 assert_eq!(log[1].0.hash(), hash_2);
803 assert_eq!(log[0].1, Some(body_1.clone()));
804 assert_eq!(log[1].1, Some(body_2.clone()));
805
806 let log = store
808 .get_raw_log(&private_key.public_key(), &log_id, None)
809 .await
810 .expect("no errors")
811 .expect("log should exist");
812
813 assert_eq!(log.len(), 3);
814 assert_eq!(log[0].0, header_bytes_0);
815 assert_eq!(log[1].0, header_bytes_1);
816 assert_eq!(log[2].0, header_bytes_2);
817 assert_eq!(log[0].1, Some(body_0.to_bytes()));
818 assert_eq!(log[1].1, Some(body_1.to_bytes()));
819 assert_eq!(log[2].1, Some(body_2.to_bytes()));
820
821 let log = store
823 .get_raw_log(&private_key.public_key(), &log_id, Some(1))
824 .await
825 .expect("no errors")
826 .expect("log should exist");
827
828 assert_eq!(log.len(), 2);
829 assert_eq!(log[0].0, header_bytes_1);
830 assert_eq!(log[1].0, header_bytes_2);
831 assert_eq!(log[0].1, Some(body_1.to_bytes()));
832 assert_eq!(log[1].1, Some(body_2.to_bytes()));
833 }
834
835 #[tokio::test]
836 async fn get_latest_operation() {
837 let db_pool = initialize_sqlite_db().await;
838 let mut store = SqliteStore::new(db_pool);
839 let private_key = PrivateKey::new();
840 let log_id = 0;
841
842 let body_0 = Body::new("hello!".as_bytes());
843 let body_1 = Body::new("hello again!".as_bytes());
844
845 let (hash_0, header_0, header_bytes_0) =
846 create_operation(&private_key, &body_0, 0, 0, None);
847 let (hash_1, header_1, header_bytes_1) =
848 create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
849
850 store
851 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
852 .await
853 .expect("no errors");
854 store
855 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
856 .await
857 .expect("no errors");
858
859 let (latest_header, latest_body) = store
860 .latest_operation(&private_key.public_key(), &log_id)
861 .await
862 .expect("no errors")
863 .expect("there's an operation");
864
865 assert_eq!(latest_header.hash(), header_1.hash());
866 assert_eq!(latest_body, Some(body_1));
867 }
868
869 #[tokio::test]
870 async fn delete_operations() {
871 let db_pool = initialize_sqlite_db().await;
872 let mut store = SqliteStore::new(db_pool);
873 let private_key = PrivateKey::new();
874 let log_id = 0;
875
876 let body_0 = Body::new("hello!".as_bytes());
877 let body_1 = Body::new("hello again!".as_bytes());
878 let body_2 = Body::new("final hello!".as_bytes());
879
880 let (hash_0, header_0, header_bytes_0) =
881 create_operation(&private_key, &body_0, 0, 0, None);
882 let (hash_1, header_1, header_bytes_1) =
883 create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
884 let (hash_2, header_2, header_bytes_2) =
885 create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
886
887 store
888 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
889 .await
890 .expect("no errors");
891 store
892 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
893 .await
894 .expect("no errors");
895 store
896 .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
897 .await
898 .expect("no errors");
899
900 let log = store
902 .get_log(&private_key.public_key(), &log_id, None)
903 .await
904 .expect("no errors")
905 .expect("log should exist");
906
907 assert_eq!(log.len(), 3);
909
910 let deleted = store
912 .delete_operations(&private_key.public_key(), &log_id, 2)
913 .await
914 .expect("no errors");
915 assert!(deleted);
916
917 let log = store
918 .get_log(&private_key.public_key(), &log_id, None)
919 .await
920 .expect("no errors")
921 .expect("log should exist");
922
923 assert_eq!(log.len(), 1);
925
926 assert_eq!(log[0].0.hash(), header_2.hash());
928
929 let deleted = store
931 .delete_operations(&private_key.public_key(), &log_id, 2)
932 .await
933 .expect("no errors");
934 assert!(!deleted);
935 }
936
937 #[tokio::test]
938 async fn delete_payloads() {
939 let db_pool = initialize_sqlite_db().await;
940 let mut store = SqliteStore::new(db_pool);
941 let private_key = PrivateKey::new();
942 let log_id = 0;
943
944 let body_0 = Body::new("hello!".as_bytes());
945 let body_1 = Body::new("hello again!".as_bytes());
946 let body_2 = Body::new("final hello!".as_bytes());
947
948 let (hash_0, header_0, header_bytes_0) =
949 create_operation(&private_key, &body_0, 0, 0, None);
950 let (hash_1, header_1, header_bytes_1) =
951 create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
952 let (hash_2, header_2, header_bytes_2) =
953 create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
954
955 store
956 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
957 .await
958 .expect("no errors");
959 store
960 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
961 .await
962 .expect("no errors");
963 store
964 .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
965 .await
966 .expect("no errors");
967
968 let log = store
970 .get_log(&private_key.public_key(), &log_id, None)
971 .await
972 .expect("no errors")
973 .expect("log should exist");
974
975 assert_eq!(log.len(), 3);
977
978 assert_eq!(log[0].1, Some(body_0));
979 assert_eq!(log[1].1, Some(body_1));
980 assert_eq!(log[2].1, Some(body_2.clone()));
981
982 let deleted = store
984 .delete_payloads(&private_key.public_key(), &log_id, 0, 2)
985 .await
986 .expect("no errors");
987 assert!(deleted);
988
989 let log = store
990 .get_log(&private_key.public_key(), &log_id, None)
991 .await
992 .expect("no errors")
993 .expect("log should exist");
994
995 assert_eq!(log[0].1, None);
996 assert_eq!(log[1].1, None);
997 assert_eq!(log[2].1, Some(body_2));
998 }
999
1000 #[tokio::test]
1001 async fn get_log_heights() {
1002 let db_pool = initialize_sqlite_db().await;
1003 let mut store = SqliteStore::new(db_pool);
1004
1005 let log_id = 0;
1006
1007 let private_key_0 = PrivateKey::new();
1008 let private_key_1 = PrivateKey::new();
1009 let private_key_2 = PrivateKey::new();
1010
1011 let body_0 = Body::new("hello!".as_bytes());
1012 let body_1 = Body::new("hello again!".as_bytes());
1013 let body_2 = Body::new("hello for a third time!".as_bytes());
1014
1015 let (hash_0, header_0, header_bytes_0) =
1016 create_operation(&private_key_0, &body_0, 0, 0, None);
1017 let (hash_1, header_1, header_bytes_1) =
1018 create_operation(&private_key_0, &body_1, 1, 0, Some(hash_0));
1019 let (hash_2, header_2, header_bytes_2) =
1020 create_operation(&private_key_0, &body_2, 2, 0, Some(hash_1));
1021
1022 let log_heights = store.get_log_heights(&log_id).await.expect("no errors");
1023 assert!(log_heights.is_empty());
1024
1025 store
1026 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1027 .await
1028 .expect("no errors");
1029 store
1030 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
1031 .await
1032 .expect("no errors");
1033 store
1034 .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
1035 .await
1036 .expect("no errors");
1037
1038 let (hash_0, header_0, header_bytes_0) =
1039 create_operation(&private_key_1, &body_0, 0, 0, None);
1040 let (hash_1, header_1, header_bytes_1) =
1041 create_operation(&private_key_1, &body_1, 1, 0, Some(hash_0));
1042
1043 store
1044 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1045 .await
1046 .expect("no errors");
1047 store
1048 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
1049 .await
1050 .expect("no errors");
1051
1052 let (hash_0, header_0, header_bytes_0) =
1053 create_operation(&private_key_2, &body_0, 0, 0, None);
1054
1055 store
1056 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1057 .await
1058 .expect("no errors");
1059
1060 let log_heights = store.get_log_heights(&log_id).await.expect("no errors");
1061
1062 assert_eq!(log_heights.len(), 3);
1063
1064 assert!(log_heights.contains(&(private_key_0.public_key(), 2)));
1066 assert!(log_heights.contains(&(private_key_1.public_key(), 1)));
1067 assert!(log_heights.contains(&(private_key_2.public_key(), 0)));
1068 }
1069}