1use std::hash::{DefaultHasher, Hash as StdHash, Hasher};
5use std::marker::PhantomData;
6
7use sqlx::migrate;
8use sqlx::migrate::{MigrateDatabase, MigrateError};
9use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
10use sqlx::{Error as SqlxError, Sqlite, query, query_as};
11use thiserror::Error;
12
13use p2panda_core::cbor::{DecodeError, EncodeError, encode_cbor};
14use p2panda_core::{Body, Extensions, Hash, Header, PublicKey, RawOperation};
15
16use crate::sqlite::models::{LogHeightRow, OperationRow, RawOperationRow};
17use crate::{LogId, LogStore, OperationStore};
18
19#[derive(Debug, Error)]
20pub enum SqliteStoreError {
21 #[error("failed to encode operation extensions: {0}")]
22 EncodingFailed(#[from] EncodeError),
23
24 #[error("failed to decode operation extensions: {0}")]
25 DecodingFailed(#[from] DecodeError),
26
27 #[error("an error occurred with the sqlite database: {0}")]
28 Database(#[from] SqlxError),
29}
30
31impl From<MigrateError> for SqliteStoreError {
32 fn from(error: MigrateError) -> Self {
33 Self::Database(SqlxError::Migrate(Box::new(error)))
34 }
35}
36
37pub type Pool = SqlitePool;
39
40#[derive(Clone, Debug)]
42pub struct SqliteStore<L, E> {
43 pub(crate) pool: Pool,
44 _marker: PhantomData<(L, E)>,
45}
46
47impl<L, E> SqliteStore<L, E>
48where
49 L: LogId,
50 E: Extensions,
51{
52 pub fn new(pool: Pool) -> Self {
54 Self {
55 pool,
56 _marker: PhantomData {},
57 }
58 }
59}
60
61pub async fn create_database(url: &str) -> Result<(), SqliteStoreError> {
63 if !Sqlite::database_exists(url).await? {
64 Sqlite::create_database(url).await?
65 }
66
67 Ok(())
68}
69
70pub async fn drop_database(url: &str) -> Result<(), SqliteStoreError> {
72 if Sqlite::database_exists(url).await? {
73 Sqlite::drop_database(url).await?
74 }
75
76 Ok(())
77}
78
79pub async fn connection_pool(url: &str, max_connections: u32) -> Result<Pool, SqliteStoreError> {
81 let pool: Pool = SqlitePoolOptions::new()
82 .max_connections(max_connections)
83 .connect(url)
84 .await?;
85
86 Ok(pool)
87}
88
89pub fn migrations() -> migrate::Migrator {
91 migrate!()
92}
93
94pub async fn run_pending_migrations(pool: &Pool) -> Result<(), SqliteStoreError> {
96 migrations().run(pool).await?;
97
98 Ok(())
99}
100
101fn calculate_hash<T: StdHash>(t: &T) -> u64 {
102 let mut s = DefaultHasher::new();
103 t.hash(&mut s);
104 s.finish()
105}
106
107impl<L, E> OperationStore<L, E> for SqliteStore<L, E>
108where
109 L: LogId + Send + Sync,
110 E: Extensions + Send + Sync,
111{
112 type Error = SqliteStoreError;
113
114 async fn insert_operation(
115 &mut self,
116 hash: Hash,
117 header: &Header<E>,
118 body: Option<&Body>,
119 header_bytes: &[u8],
120 log_id: &L,
121 ) -> Result<bool, Self::Error> {
122 query(
123 "
124 INSERT INTO
125 operations_v1 (
126 hash,
127 log_id,
128 version,
129 public_key,
130 signature,
131 payload_size,
132 payload_hash,
133 timestamp,
134 seq_num,
135 backlink,
136 previous,
137 extensions,
138 body,
139 header_bytes
140 )
141 VALUES
142 (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
143 ",
144 )
145 .bind(hash.to_string())
146 .bind(calculate_hash(log_id).to_string())
147 .bind(header.version.to_string())
148 .bind(header.public_key.to_hex())
149 .bind(header.signature.map(|sig| sig.to_hex()))
150 .bind(header.payload_size.to_string())
151 .bind(header.payload_hash.map(|hash| hash.to_hex()))
152 .bind(header.timestamp.to_string())
153 .bind(header.seq_num.to_string())
154 .bind(header.backlink.map(|backlink| backlink.to_hex()))
155 .bind(
156 header
157 .previous
158 .iter()
159 .map(|previous| previous.to_hex())
160 .collect::<Vec<String>>()
161 .concat(),
162 )
163 .bind(
164 header
165 .extensions
166 .as_ref()
167 .map(|extensions| encode_cbor(extensions).expect("extenions are serializable")),
168 )
169 .bind(body.map(|body| body.to_bytes()))
170 .bind(header_bytes)
171 .execute(&self.pool)
172 .await?;
173
174 Ok(true)
175 }
176
177 async fn get_operation(
178 &self,
179 hash: Hash,
180 ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
181 if let Some(operation) = query_as::<_, OperationRow>(
182 "
183 SELECT
184 hash,
185 log_id,
186 version,
187 public_key,
188 signature,
189 payload_size,
190 payload_hash,
191 timestamp,
192 seq_num,
193 backlink,
194 previous,
195 extensions,
196 body,
197 header_bytes
198 FROM
199 operations_v1
200 WHERE
201 hash = ?
202 ",
203 )
204 .bind(hash.to_string())
205 .fetch_optional(&self.pool)
206 .await?
207 {
208 let body = operation.body.clone().map(|body| body.into());
209 let header: Header<E> = operation.into();
210
211 Ok(Some((header, body)))
212 } else {
213 Ok(None)
214 }
215 }
216
217 async fn get_raw_operation(&self, hash: Hash) -> Result<Option<RawOperation>, Self::Error> {
218 if let Some(operation) = query_as::<_, RawOperationRow>(
219 "
220 SELECT
221 hash,
222 body,
223 header_bytes
224 FROM
225 operations_v1
226 WHERE
227 hash = ?
228 ",
229 )
230 .bind(hash.to_string())
231 .fetch_optional(&self.pool)
232 .await?
233 {
234 let raw_operation = operation.into();
235
236 Ok(Some(raw_operation))
237 } else {
238 Ok(None)
239 }
240 }
241
242 async fn has_operation(&self, hash: Hash) -> Result<bool, Self::Error> {
243 let exists = query(
244 "
245 SELECT
246 1
247 FROM
248 operations_v1
249 WHERE
250 hash = ?
251 ",
252 )
253 .bind(hash.to_string())
254 .fetch_optional(&self.pool)
255 .await?;
256
257 Ok(exists.is_some())
258 }
259
260 async fn delete_operation(&mut self, hash: Hash) -> Result<bool, Self::Error> {
261 let result = query(
262 "
263 DELETE
264 FROM
265 operations_v1
266 WHERE
267 hash = ?
268 ",
269 )
270 .bind(hash.to_string())
271 .execute(&self.pool)
272 .await?;
273
274 Ok(result.rows_affected() > 0)
275 }
276
277 async fn delete_payload(&mut self, hash: Hash) -> Result<bool, Self::Error> {
278 let result = query(
279 "
280 UPDATE
281 operations_v1
282 SET
283 body = NULL
284 WHERE
285 operations_v1.hash = ?
286 ",
287 )
288 .bind(hash.to_string())
289 .execute(&self.pool)
290 .await?;
291
292 Ok(result.rows_affected() > 0)
293 }
294}
295
296impl<L, E> LogStore<L, E> for SqliteStore<L, E>
297where
298 L: LogId + Send + Sync,
299 E: Extensions + Send + Sync,
300{
301 type Error = SqliteStoreError;
302
303 async fn get_log(
304 &self,
305 public_key: &PublicKey,
306 log_id: &L,
307 from: Option<u64>,
308 ) -> Result<Option<Vec<(Header<E>, Option<Body>)>>, Self::Error> {
309 let operations = query_as::<_, OperationRow>(
310 "
311 SELECT
312 hash,
313 log_id,
314 version,
315 public_key,
316 signature,
317 payload_size,
318 payload_hash,
319 timestamp,
320 seq_num,
321 backlink,
322 previous,
323 extensions,
324 body,
325 header_bytes
326 FROM
327 operations_v1
328 WHERE
329 public_key = ?
330 AND log_id = ?
331 AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
332 ORDER BY
333 CAST(seq_num AS NUMERIC)
334 ",
335 )
336 .bind(public_key.to_string())
337 .bind(calculate_hash(log_id).to_string())
338 .bind(from.unwrap_or(0).to_string())
339 .fetch_all(&self.pool)
340 .await?;
341
342 let log: Vec<(Header<E>, Option<Body>)> = operations
343 .into_iter()
344 .map(|operation| {
345 (
346 operation.clone().into(),
347 operation.body.map(|body| body.into()),
348 )
349 })
350 .collect();
351
352 if log.is_empty() {
353 Ok(None)
354 } else {
355 Ok(Some(log))
356 }
357 }
358
359 async fn get_raw_log(
360 &self,
361 public_key: &PublicKey,
362 log_id: &L,
363 from: Option<u64>,
364 ) -> Result<Option<Vec<RawOperation>>, Self::Error> {
365 let operations = query_as::<_, RawOperationRow>(
366 "
367 SELECT
368 hash,
369 body,
370 header_bytes
371 FROM
372 operations_v1
373 WHERE
374 public_key = ?
375 AND log_id = ?
376 AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
377 ORDER BY
378 CAST(seq_num AS NUMERIC)
379 ",
380 )
381 .bind(public_key.to_string())
382 .bind(calculate_hash(log_id).to_string())
383 .bind(from.unwrap_or(0).to_string())
384 .fetch_all(&self.pool)
385 .await?;
386
387 let log: Vec<RawOperation> = operations
388 .into_iter()
389 .map(|operation| operation.into())
390 .collect();
391
392 if log.is_empty() {
393 Ok(None)
394 } else {
395 Ok(Some(log))
396 }
397 }
398
399 async fn latest_operation(
400 &self,
401 public_key: &PublicKey,
402 log_id: &L,
403 ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
404 if let Some(operation) = query_as::<_, OperationRow>(
405 "
406 SELECT
407 hash,
408 log_id,
409 version,
410 public_key,
411 signature,
412 payload_size,
413 payload_hash,
414 timestamp,
415 seq_num,
416 backlink,
417 previous,
418 extensions,
419 body,
420 header_bytes
421 FROM
422 operations_v1
423 WHERE
424 public_key = ?
425 AND log_id = ?
426 ORDER BY
427 CAST(seq_num AS NUMERIC) DESC LIMIT 1
428 ",
429 )
430 .bind(public_key.to_string())
431 .bind(calculate_hash(log_id).to_string())
432 .fetch_optional(&self.pool)
433 .await?
434 {
435 let body = operation.body.clone().map(|body| body.into());
436 let header: Header<E> = operation.into();
437
438 Ok(Some((header, body)))
439 } else {
440 Ok(None)
441 }
442 }
443
444 async fn delete_operations(
445 &mut self,
446 public_key: &PublicKey,
447 log_id: &L,
448 before: u64,
449 ) -> Result<bool, Self::Error> {
450 let result = query(
451 "
452 DELETE
453 FROM
454 operations_v1
455 WHERE
456 public_key = ?
457 AND log_id = ?
458 AND CAST(seq_num AS NUMERIC) < CAST(? as NUMERIC)
459 ",
460 )
461 .bind(public_key.to_string())
462 .bind(calculate_hash(log_id).to_string())
463 .bind(before.to_string())
464 .execute(&self.pool)
465 .await?;
466
467 Ok(result.rows_affected() > 0)
468 }
469
470 async fn delete_payloads(
471 &mut self,
472 public_key: &PublicKey,
473 log_id: &L,
474 from: u64,
475 to: u64,
476 ) -> Result<bool, Self::Error> {
477 let result = query(
478 "
479 UPDATE
480 operations_v1
481 SET
482 body = NULL
483 WHERE
484 public_key = ?
485 AND log_id = ?
486 AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
487 AND CAST(seq_num AS NUMERIC) < CAST(? as NUMERIC)
488 ",
489 )
490 .bind(public_key.to_string())
491 .bind(calculate_hash(log_id).to_string())
492 .bind(from.to_string())
493 .bind(to.to_string())
494 .execute(&self.pool)
495 .await?;
496
497 Ok(result.rows_affected() > 0)
498 }
499
500 async fn get_log_heights(&self, log_id: &L) -> Result<Vec<(PublicKey, u64)>, Self::Error> {
501 let operations = query_as::<_, LogHeightRow>(
502 "
503 SELECT
504 public_key,
505 CAST(MAX(CAST(seq_num AS NUMERIC)) AS TEXT) as seq_num
506 FROM
507 operations_v1
508 WHERE
509 log_id = ?
510 GROUP BY
511 public_key
512 ",
513 )
514 .bind(calculate_hash(log_id).to_string())
515 .fetch_all(&self.pool)
516 .await?;
517
518 let log_heights: Vec<(PublicKey, u64)> = operations
519 .into_iter()
520 .map(|operation| operation.into())
521 .collect();
522
523 Ok(log_heights)
524 }
525}
526
527#[cfg(test)]
528mod tests {
529 use p2panda_core::{Body, Hash, Header, PrivateKey};
530 use serde::{Deserialize, Serialize};
531
532 use crate::sqlite::test_utils::initialize_sqlite_db;
533 use crate::{LogStore, OperationStore};
534
535 use super::SqliteStore;
536
537 fn create_operation(
538 private_key: &PrivateKey,
539 body: &Body,
540 seq_num: u64,
541 timestamp: u64,
542 backlink: Option<Hash>,
543 ) -> (Hash, Header<()>, Vec<u8>) {
544 let mut header = Header {
545 version: 1,
546 public_key: private_key.public_key(),
547 signature: None,
548 payload_size: body.size(),
549 payload_hash: Some(body.hash()),
550 timestamp,
551 seq_num,
552 backlink,
553 previous: vec![],
554 extensions: None,
555 };
556 header.sign(private_key);
557 let header_bytes = header.to_bytes();
558 (header.hash(), header, header_bytes)
559 }
560
561 #[tokio::test]
562 async fn default_sqlite_store() {
563 let db_pool = initialize_sqlite_db().await;
564
565 let mut store = SqliteStore::new(db_pool);
566 let private_key = PrivateKey::new();
567
568 let body = Body::new("hello!".as_bytes());
569 let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
570
571 let inserted = store
572 .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
573 .await
574 .expect("no errors");
575
576 assert!(inserted);
577 }
578
579 #[tokio::test]
580 async fn generic_extensions_mem_store() {
581 #[derive(Clone, Debug, Default, Serialize, Deserialize)]
583 struct MyExtension {}
584
585 let db_pool = initialize_sqlite_db().await;
587
588 let mut store = SqliteStore::new(db_pool);
590
591 let private_key = PrivateKey::new();
593 let body = Body::new("hello!".as_bytes());
594 let mut header = Header {
595 version: 1,
596 public_key: private_key.public_key(),
597 signature: None,
598 payload_size: body.size(),
599 payload_hash: Some(body.hash()),
600 timestamp: 0,
601 seq_num: 0,
602 backlink: None,
603 previous: vec![],
604 extensions: Some(MyExtension {}),
605 };
606 header.sign(&private_key);
607
608 let inserted = store
610 .insert_operation(header.hash(), &header, Some(&body), &header.to_bytes(), &0)
611 .await
612 .expect("no errors");
613 assert!(inserted);
614 }
615
616 #[tokio::test]
617 async fn insert_operation_with_unsigned_header() {
618 let db_pool = initialize_sqlite_db().await;
619 let mut store = SqliteStore::new(db_pool);
620 let private_key = PrivateKey::new();
621
622 let body = Body::new("hello!".as_bytes());
624 let (hash, mut header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
625
626 header.signature = None;
628
629 let inserted = store
631 .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
632 .await;
633
634 assert!(inserted.is_err());
636 assert_eq!(
637 format!("{}", inserted.unwrap_err()),
638 "an error occurred with the sqlite database: error returned from database: (code: 1299) NOT NULL constraint failed: operations_v1.signature"
639 );
640 }
641
642 #[tokio::test]
643 async fn insert_get_operation() {
644 let db_pool = initialize_sqlite_db().await;
645 let mut store = SqliteStore::new(db_pool);
646 let private_key = PrivateKey::new();
647
648 let body = Body::new("hello!".as_bytes());
650 let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
651
652 let body_2 = Body::new("buenas!".as_bytes());
654 let (hash_2, _header_2, _header_bytes_2) =
655 create_operation(&private_key, &body_2, 0, 0, None);
656
657 let inserted = store
659 .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
660 .await
661 .expect("no errors");
662 assert!(inserted);
663 assert!(store.has_operation(hash).await.expect("no error"));
665 assert!(!store.has_operation(hash_2).await.expect("no error"));
666
667 let (header_again, body_again) = store
668 .get_operation(hash)
669 .await
670 .expect("no error")
671 .expect("operation exist");
672
673 assert_eq!(header.hash(), header_again.hash());
676 assert_eq!(Some(body.clone()), body_again);
678
679 let (header_bytes_again, body_bytes_again) = store
680 .get_raw_operation(hash)
681 .await
682 .expect("no error")
683 .expect("operation exist");
684
685 assert_eq!(header_bytes_again, header_bytes);
686 assert_eq!(body_bytes_again, Some(body.to_bytes()));
687 }
688
689 #[tokio::test]
690 async fn delete_operation() {
691 let db_pool = initialize_sqlite_db().await;
692 let mut store = SqliteStore::new(db_pool);
693 let private_key = PrivateKey::new();
694
695 let body = Body::new("hello!".as_bytes());
696 let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
697
698 let inserted = store
700 .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
701 .await
702 .expect("no errors");
703 assert!(inserted);
704
705 assert!(store.has_operation(hash).await.expect("no error"));
707
708 assert!(store.delete_operation(hash).await.expect("no error"));
710
711 let deleted_operation = store.get_operation(hash).await.expect("no error");
712 assert!(deleted_operation.is_none());
713 assert!(!store.has_operation(hash).await.expect("no error"));
714
715 let deleted_raw_operation = store.get_raw_operation(hash).await.expect("no error");
716 assert!(deleted_raw_operation.is_none());
717 }
718
719 #[tokio::test]
720 async fn delete_payload() {
721 let db_pool = initialize_sqlite_db().await;
722 let mut store = SqliteStore::new(db_pool);
723 let private_key = PrivateKey::new();
724
725 let body = Body::new("hello!".as_bytes());
726 let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
727
728 let inserted = store
729 .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
730 .await
731 .expect("no errors");
732 assert!(inserted);
733
734 assert!(store.delete_payload(hash).await.expect("no error"));
735
736 let (_, no_body) = store
737 .get_operation(hash)
738 .await
739 .expect("no error")
740 .expect("operation exist");
741 assert!(no_body.is_none());
742 assert!(store.has_operation(hash).await.expect("no error"));
743
744 let (_, no_body) = store
745 .get_raw_operation(hash)
746 .await
747 .expect("no error")
748 .expect("operation exist");
749 assert!(no_body.is_none());
750 }
751
752 #[tokio::test]
753 async fn get_log() {
754 let db_pool = initialize_sqlite_db().await;
755 let mut store = SqliteStore::new(db_pool);
756 let private_key = PrivateKey::new();
757 let log_id = 0;
758
759 let body_0 = Body::new("hello!".as_bytes());
760 let body_1 = Body::new("hello again!".as_bytes());
761 let body_2 = Body::new("hello for a third time!".as_bytes());
762
763 let (hash_0, header_0, header_bytes_0) =
764 create_operation(&private_key, &body_0, 0, 0, None);
765 let (hash_1, header_1, header_bytes_1) =
766 create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
767 let (hash_2, header_2, header_bytes_2) =
768 create_operation(&private_key, &body_2, 2, 0, Some(hash_1));
769
770 store
771 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
772 .await
773 .expect("no errors");
774 store
775 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
776 .await
777 .expect("no errors");
778 store
779 .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
780 .await
781 .expect("no errors");
782
783 let log = store
785 .get_log(&private_key.public_key(), &log_id, None)
786 .await
787 .expect("no errors")
788 .expect("log should exist");
789
790 assert_eq!(log.len(), 3);
791 assert_eq!(log[0].0.hash(), hash_0);
792 assert_eq!(log[1].0.hash(), hash_1);
793 assert_eq!(log[2].0.hash(), hash_2);
794 assert_eq!(log[0].1, Some(body_0.clone()));
795 assert_eq!(log[1].1, Some(body_1.clone()));
796 assert_eq!(log[2].1, Some(body_2.clone()));
797
798 let log = store
800 .get_log(&private_key.public_key(), &log_id, Some(1))
801 .await
802 .expect("no errors")
803 .expect("log should exist");
804
805 assert_eq!(log.len(), 2);
806 assert_eq!(log[0].0.hash(), hash_1);
807 assert_eq!(log[1].0.hash(), hash_2);
808 assert_eq!(log[0].1, Some(body_1.clone()));
809 assert_eq!(log[1].1, Some(body_2.clone()));
810
811 let log = store
813 .get_raw_log(&private_key.public_key(), &log_id, None)
814 .await
815 .expect("no errors")
816 .expect("log should exist");
817
818 assert_eq!(log.len(), 3);
819 assert_eq!(log[0].0, header_bytes_0);
820 assert_eq!(log[1].0, header_bytes_1);
821 assert_eq!(log[2].0, header_bytes_2);
822 assert_eq!(log[0].1, Some(body_0.to_bytes()));
823 assert_eq!(log[1].1, Some(body_1.to_bytes()));
824 assert_eq!(log[2].1, Some(body_2.to_bytes()));
825
826 let log = store
828 .get_raw_log(&private_key.public_key(), &log_id, Some(1))
829 .await
830 .expect("no errors")
831 .expect("log should exist");
832
833 assert_eq!(log.len(), 2);
834 assert_eq!(log[0].0, header_bytes_1);
835 assert_eq!(log[1].0, header_bytes_2);
836 assert_eq!(log[0].1, Some(body_1.to_bytes()));
837 assert_eq!(log[1].1, Some(body_2.to_bytes()));
838 }
839
840 #[tokio::test]
841 async fn get_latest_operation() {
842 let db_pool = initialize_sqlite_db().await;
843 let mut store = SqliteStore::new(db_pool);
844 let private_key = PrivateKey::new();
845 let log_id = 0;
846
847 let body_0 = Body::new("hello!".as_bytes());
848 let body_1 = Body::new("hello again!".as_bytes());
849
850 let (hash_0, header_0, header_bytes_0) =
851 create_operation(&private_key, &body_0, 0, 0, None);
852 let (hash_1, header_1, header_bytes_1) =
853 create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
854
855 store
856 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
857 .await
858 .expect("no errors");
859 store
860 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
861 .await
862 .expect("no errors");
863
864 let (latest_header, latest_body) = store
865 .latest_operation(&private_key.public_key(), &log_id)
866 .await
867 .expect("no errors")
868 .expect("there's an operation");
869
870 assert_eq!(latest_header.hash(), header_1.hash());
871 assert_eq!(latest_body, Some(body_1));
872 }
873
874 #[tokio::test]
875 async fn delete_operations() {
876 let db_pool = initialize_sqlite_db().await;
877 let mut store = SqliteStore::new(db_pool);
878 let private_key = PrivateKey::new();
879 let log_id = 0;
880
881 let body_0 = Body::new("hello!".as_bytes());
882 let body_1 = Body::new("hello again!".as_bytes());
883 let body_2 = Body::new("final hello!".as_bytes());
884
885 let (hash_0, header_0, header_bytes_0) =
886 create_operation(&private_key, &body_0, 0, 0, None);
887 let (hash_1, header_1, header_bytes_1) =
888 create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
889 let (hash_2, header_2, header_bytes_2) =
890 create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
891
892 store
893 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
894 .await
895 .expect("no errors");
896 store
897 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
898 .await
899 .expect("no errors");
900 store
901 .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
902 .await
903 .expect("no errors");
904
905 let log = store
907 .get_log(&private_key.public_key(), &log_id, None)
908 .await
909 .expect("no errors")
910 .expect("log should exist");
911
912 assert_eq!(log.len(), 3);
914
915 let deleted = store
917 .delete_operations(&private_key.public_key(), &log_id, 2)
918 .await
919 .expect("no errors");
920 assert!(deleted);
921
922 let log = store
923 .get_log(&private_key.public_key(), &log_id, None)
924 .await
925 .expect("no errors")
926 .expect("log should exist");
927
928 assert_eq!(log.len(), 1);
930
931 assert_eq!(log[0].0.hash(), header_2.hash());
933
934 let deleted = store
936 .delete_operations(&private_key.public_key(), &log_id, 2)
937 .await
938 .expect("no errors");
939 assert!(!deleted);
940 }
941
942 #[tokio::test]
943 async fn delete_payloads() {
944 let db_pool = initialize_sqlite_db().await;
945 let mut store = SqliteStore::new(db_pool);
946 let private_key = PrivateKey::new();
947 let log_id = 0;
948
949 let body_0 = Body::new("hello!".as_bytes());
950 let body_1 = Body::new("hello again!".as_bytes());
951 let body_2 = Body::new("final hello!".as_bytes());
952
953 let (hash_0, header_0, header_bytes_0) =
954 create_operation(&private_key, &body_0, 0, 0, None);
955 let (hash_1, header_1, header_bytes_1) =
956 create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
957 let (hash_2, header_2, header_bytes_2) =
958 create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
959
960 store
961 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
962 .await
963 .expect("no errors");
964 store
965 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
966 .await
967 .expect("no errors");
968 store
969 .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
970 .await
971 .expect("no errors");
972
973 let log = store
975 .get_log(&private_key.public_key(), &log_id, None)
976 .await
977 .expect("no errors")
978 .expect("log should exist");
979
980 assert_eq!(log.len(), 3);
982
983 assert_eq!(log[0].1, Some(body_0));
984 assert_eq!(log[1].1, Some(body_1));
985 assert_eq!(log[2].1, Some(body_2.clone()));
986
987 let deleted = store
989 .delete_payloads(&private_key.public_key(), &log_id, 0, 2)
990 .await
991 .expect("no errors");
992 assert!(deleted);
993
994 let log = store
995 .get_log(&private_key.public_key(), &log_id, None)
996 .await
997 .expect("no errors")
998 .expect("log should exist");
999
1000 assert_eq!(log[0].1, None);
1001 assert_eq!(log[1].1, None);
1002 assert_eq!(log[2].1, Some(body_2));
1003 }
1004
1005 #[tokio::test]
1006 async fn get_log_heights() {
1007 let db_pool = initialize_sqlite_db().await;
1008 let mut store = SqliteStore::new(db_pool);
1009
1010 let log_id = 0;
1011
1012 let private_key_0 = PrivateKey::new();
1013 let private_key_1 = PrivateKey::new();
1014 let private_key_2 = PrivateKey::new();
1015
1016 let body_0 = Body::new("hello!".as_bytes());
1017 let body_1 = Body::new("hello again!".as_bytes());
1018 let body_2 = Body::new("hello for a third time!".as_bytes());
1019
1020 let (hash_0, header_0, header_bytes_0) =
1021 create_operation(&private_key_0, &body_0, 0, 0, None);
1022 let (hash_1, header_1, header_bytes_1) =
1023 create_operation(&private_key_0, &body_1, 1, 0, Some(hash_0));
1024 let (hash_2, header_2, header_bytes_2) =
1025 create_operation(&private_key_0, &body_2, 2, 0, Some(hash_1));
1026
1027 let log_heights = store.get_log_heights(&log_id).await.expect("no errors");
1028 assert!(log_heights.is_empty());
1029
1030 store
1031 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1032 .await
1033 .expect("no errors");
1034 store
1035 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
1036 .await
1037 .expect("no errors");
1038 store
1039 .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
1040 .await
1041 .expect("no errors");
1042
1043 let (hash_0, header_0, header_bytes_0) =
1044 create_operation(&private_key_1, &body_0, 0, 0, None);
1045 let (hash_1, header_1, header_bytes_1) =
1046 create_operation(&private_key_1, &body_1, 1, 0, Some(hash_0));
1047
1048 store
1049 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1050 .await
1051 .expect("no errors");
1052 store
1053 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
1054 .await
1055 .expect("no errors");
1056
1057 let (hash_0, header_0, header_bytes_0) =
1058 create_operation(&private_key_2, &body_0, 0, 0, None);
1059
1060 store
1061 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1062 .await
1063 .expect("no errors");
1064
1065 let log_heights = store.get_log_heights(&log_id).await.expect("no errors");
1066
1067 assert_eq!(log_heights.len(), 3);
1068
1069 assert!(log_heights.contains(&(private_key_0.public_key(), 2)));
1071 assert!(log_heights.contains(&(private_key_1.public_key(), 1)));
1072 assert!(log_heights.contains(&(private_key_2.public_key(), 0)));
1073 }
1074}