1use std::hash::{DefaultHasher, Hash as StdHash, Hasher};
5use std::marker::PhantomData;
6
7use sqlx::migrate;
8use sqlx::migrate::{MigrateDatabase, MigrateError};
9use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
10use sqlx::{Error as SqlxError, Sqlite, query, query_as};
11use thiserror::Error;
12
13use p2panda_core::cbor::{DecodeError, EncodeError, encode_cbor};
14use p2panda_core::{Body, Extensions, Hash, Header, PublicKey, RawOperation};
15
16use crate::operations::{LogId, LogStore, OperationStore};
17use crate::sqlite::models::{ByteCount, LogHeightRow, OperationRow, RawOperationRow, SeqAndHash};
18
19#[derive(Debug, Error)]
20pub enum SqliteStoreError {
21 #[error("failed to encode operation extensions: {0}")]
22 EncodingFailed(#[from] EncodeError),
23
24 #[error("failed to decode operation extensions: {0}")]
25 DecodingFailed(#[from] DecodeError),
26
27 #[error("an error occurred with the sqlite database: {0}")]
28 Database(#[from] SqlxError),
29}
30
31impl From<MigrateError> for SqliteStoreError {
32 fn from(error: MigrateError) -> Self {
33 Self::Database(SqlxError::Migrate(Box::new(error)))
34 }
35}
36
37pub type Pool = SqlitePool;
39
40#[derive(Clone, Debug)]
42pub struct SqliteStore<L, E> {
43 pub(crate) pool: Pool,
44 _marker: PhantomData<(L, E)>,
45}
46
47impl<L, E> SqliteStore<L, E>
48where
49 L: LogId,
50 E: Extensions,
51{
52 pub fn new(pool: Pool) -> Self {
54 Self {
55 pool,
56 _marker: PhantomData {},
57 }
58 }
59}
60
61pub async fn create_database(url: &str) -> Result<(), SqliteStoreError> {
63 if !Sqlite::database_exists(url).await? {
64 Sqlite::create_database(url).await?
65 }
66
67 Ok(())
68}
69
70pub async fn drop_database(url: &str) -> Result<(), SqliteStoreError> {
72 if Sqlite::database_exists(url).await? {
73 Sqlite::drop_database(url).await?
74 }
75
76 Ok(())
77}
78
79pub async fn connection_pool(url: &str, max_connections: u32) -> Result<Pool, SqliteStoreError> {
81 let pool: Pool = SqlitePoolOptions::new()
82 .max_connections(max_connections)
83 .connect(url)
84 .await?;
85
86 Ok(pool)
87}
88
89pub fn migrations() -> migrate::Migrator {
91 migrate!()
92}
93
94pub async fn run_pending_migrations(pool: &Pool) -> Result<(), SqliteStoreError> {
96 migrations().run(pool).await?;
97
98 Ok(())
99}
100
101fn calculate_hash<T: StdHash>(t: &T) -> u64 {
102 let mut s = DefaultHasher::new();
103 t.hash(&mut s);
104 s.finish()
105}
106
107impl<L, E> OperationStore<L, E> for SqliteStore<L, E>
108where
109 L: LogId + Send + Sync,
110 E: Extensions + Send + Sync,
111{
112 type Error = SqliteStoreError;
113
114 async fn insert_operation(
115 &mut self,
116 hash: Hash,
117 header: &Header<E>,
118 body: Option<&Body>,
119 header_bytes: &[u8],
120 log_id: &L,
121 ) -> Result<bool, Self::Error> {
122 query(
123 "
124 INSERT INTO
125 operations_v1 (
126 hash,
127 log_id,
128 version,
129 public_key,
130 signature,
131 payload_size,
132 payload_hash,
133 timestamp,
134 seq_num,
135 backlink,
136 previous,
137 extensions,
138 body,
139 header_bytes,
140 header_size
141 )
142 VALUES
143 (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
144 ",
145 )
146 .bind(hash.to_string())
147 .bind(calculate_hash(log_id).to_string())
148 .bind(header.version.to_string())
149 .bind(header.public_key.to_hex())
150 .bind(header.signature.map(|sig| sig.to_hex()))
151 .bind(header.payload_size.to_string())
152 .bind(header.payload_hash.map(|hash| hash.to_hex()))
153 .bind(header.timestamp.to_string())
154 .bind(header.seq_num.to_string())
155 .bind(header.backlink.map(|backlink| backlink.to_hex()))
156 .bind(
157 header
158 .previous
159 .iter()
160 .map(|previous| previous.to_hex())
161 .collect::<Vec<String>>()
162 .concat(),
163 )
164 .bind(encode_cbor(&header.extensions).expect("extenions are serializable"))
165 .bind(body.map(|body| body.to_bytes()))
166 .bind(header_bytes)
167 .bind(header_bytes.len().to_string())
168 .execute(&self.pool)
169 .await?;
170
171 Ok(true)
172 }
173
174 async fn get_operation(
175 &self,
176 hash: Hash,
177 ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
178 if let Some(operation) = query_as::<_, OperationRow>(
179 "
180 SELECT
181 hash,
182 log_id,
183 version,
184 public_key,
185 signature,
186 payload_size,
187 payload_hash,
188 timestamp,
189 seq_num,
190 backlink,
191 previous,
192 extensions,
193 body,
194 header_bytes
195 FROM
196 operations_v1
197 WHERE
198 hash = ?
199 ",
200 )
201 .bind(hash.to_string())
202 .fetch_optional(&self.pool)
203 .await?
204 {
205 let body = operation.body.clone().map(|body| body.into());
206 let header: Header<E> = operation.into();
207
208 Ok(Some((header, body)))
209 } else {
210 Ok(None)
211 }
212 }
213
214 async fn get_raw_operation(&self, hash: Hash) -> Result<Option<RawOperation>, Self::Error> {
215 if let Some(operation) = query_as::<_, RawOperationRow>(
216 "
217 SELECT
218 hash,
219 body,
220 header_bytes
221 FROM
222 operations_v1
223 WHERE
224 hash = ?
225 ",
226 )
227 .bind(hash.to_string())
228 .fetch_optional(&self.pool)
229 .await?
230 {
231 let raw_operation = operation.into();
232
233 Ok(Some(raw_operation))
234 } else {
235 Ok(None)
236 }
237 }
238
239 async fn has_operation(&self, hash: Hash) -> Result<bool, Self::Error> {
240 let exists = query(
241 "
242 SELECT
243 1
244 FROM
245 operations_v1
246 WHERE
247 hash = ?
248 ",
249 )
250 .bind(hash.to_string())
251 .fetch_optional(&self.pool)
252 .await?;
253
254 Ok(exists.is_some())
255 }
256
257 async fn delete_operation(&mut self, hash: Hash) -> Result<bool, Self::Error> {
258 let result = query(
259 "
260 DELETE
261 FROM
262 operations_v1
263 WHERE
264 hash = ?
265 ",
266 )
267 .bind(hash.to_string())
268 .execute(&self.pool)
269 .await?;
270
271 Ok(result.rows_affected() > 0)
272 }
273
274 async fn delete_payload(&mut self, hash: Hash) -> Result<bool, Self::Error> {
275 let result = query(
276 "
277 UPDATE
278 operations_v1
279 SET
280 body = NULL
281 WHERE
282 operations_v1.hash = ?
283 ",
284 )
285 .bind(hash.to_string())
286 .execute(&self.pool)
287 .await?;
288
289 Ok(result.rows_affected() > 0)
290 }
291}
292
293impl<L, E> LogStore<L, E> for SqliteStore<L, E>
294where
295 L: LogId + Send + Sync,
296 E: Extensions + Send + Sync,
297{
298 type Error = SqliteStoreError;
299
300 async fn get_log(
301 &self,
302 public_key: &PublicKey,
303 log_id: &L,
304 from: Option<u64>,
305 ) -> Result<Option<Vec<(Header<E>, Option<Body>)>>, Self::Error> {
306 let operations = query_as::<_, OperationRow>(
307 "
308 SELECT
309 hash,
310 log_id,
311 version,
312 public_key,
313 signature,
314 payload_size,
315 payload_hash,
316 timestamp,
317 seq_num,
318 backlink,
319 previous,
320 extensions,
321 body,
322 header_bytes
323 FROM
324 operations_v1
325 WHERE
326 public_key = ?
327 AND log_id = ?
328 AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
329 ORDER BY
330 CAST(seq_num AS NUMERIC)
331 ",
332 )
333 .bind(public_key.to_string())
334 .bind(calculate_hash(log_id).to_string())
335 .bind(from.unwrap_or(0).to_string())
336 .fetch_all(&self.pool)
337 .await?;
338
339 let log: Vec<(Header<E>, Option<Body>)> = operations
340 .into_iter()
341 .map(|operation| {
342 (
343 operation.clone().into(),
344 operation.body.map(|body| body.into()),
345 )
346 })
347 .collect();
348
349 if log.is_empty() {
350 Ok(None)
351 } else {
352 Ok(Some(log))
353 }
354 }
355
356 async fn get_raw_log(
357 &self,
358 public_key: &PublicKey,
359 log_id: &L,
360 from: Option<u64>,
361 ) -> Result<Option<Vec<RawOperation>>, Self::Error> {
362 let operations = query_as::<_, RawOperationRow>(
363 "
364 SELECT
365 hash,
366 body,
367 header_bytes
368 FROM
369 operations_v1
370 WHERE
371 public_key = ?
372 AND log_id = ?
373 AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
374 ORDER BY
375 CAST(seq_num AS NUMERIC)
376 ",
377 )
378 .bind(public_key.to_string())
379 .bind(calculate_hash(log_id).to_string())
380 .bind(from.unwrap_or(0).to_string())
381 .fetch_all(&self.pool)
382 .await?;
383
384 let log: Vec<RawOperation> = operations
385 .into_iter()
386 .map(|operation| operation.into())
387 .collect();
388
389 if log.is_empty() {
390 Ok(None)
391 } else {
392 Ok(Some(log))
393 }
394 }
395
396 async fn get_log_size(
397 &self,
398 public_key: &PublicKey,
399 log_id: &L,
400 from: Option<u64>,
401 ) -> Result<Option<u64>, Self::Error> {
402 let bytes_count = query_as::<_, ByteCount>(
403 "
404 SELECT
405 CAST(SUM(CAST(header_size AS NUMERIC)) AS TEXT) AS total_header_size,
406 CAST(SUM(CAST(payload_size AS NUMERIC)) AS TEXT) AS total_payload_size
407 FROM
408 operations_v1
409 WHERE
410 public_key = ?
411 AND log_id = ?
412 AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
413 ",
414 )
415 .bind(public_key.to_string())
416 .bind(calculate_hash(log_id).to_string())
417 .bind(from.unwrap_or(0).to_string())
418 .fetch_one(&self.pool)
419 .await?;
420
421 let total_bytes = bytes_count.total_header_size.parse::<u64>().unwrap()
422 + bytes_count.total_payload_size.parse::<u64>().unwrap();
423
424 if total_bytes == 0 {
425 Ok(None)
426 } else {
427 Ok(Some(total_bytes))
428 }
429 }
430
431 async fn get_log_hashes(
432 &self,
433 public_key: &PublicKey,
434 log_id: &L,
435 from: Option<u64>,
436 ) -> Result<Option<Vec<(u64, Hash)>>, Self::Error> {
437 let hashes = query_as::<_, SeqAndHash>(
438 "
439 SELECT
440 seq_num,
441 hash
442 FROM
443 operations_v1
444 WHERE
445 public_key = ?
446 AND log_id = ?
447 AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
448 ORDER BY
449 CAST(seq_num AS NUMERIC)
450 ",
451 )
452 .bind(public_key.to_string())
453 .bind(calculate_hash(log_id).to_string())
454 .bind(from.unwrap_or(0).to_string())
455 .fetch_all(&self.pool)
456 .await?;
457
458 let hashes: Vec<(u64, Hash)> = hashes.into_iter().map(Into::<(u64, Hash)>::into).collect();
459
460 if hashes.is_empty() {
461 Ok(None)
462 } else {
463 Ok(Some(hashes))
464 }
465 }
466
467 async fn latest_operation(
468 &self,
469 public_key: &PublicKey,
470 log_id: &L,
471 ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
472 if let Some(operation) = query_as::<_, OperationRow>(
473 "
474 SELECT
475 hash,
476 log_id,
477 version,
478 public_key,
479 signature,
480 payload_size,
481 payload_hash,
482 timestamp,
483 seq_num,
484 backlink,
485 previous,
486 extensions,
487 body,
488 header_bytes
489 FROM
490 operations_v1
491 WHERE
492 public_key = ?
493 AND log_id = ?
494 ORDER BY
495 CAST(seq_num AS NUMERIC) DESC LIMIT 1
496 ",
497 )
498 .bind(public_key.to_string())
499 .bind(calculate_hash(log_id).to_string())
500 .fetch_optional(&self.pool)
501 .await?
502 {
503 let body = operation.body.clone().map(|body| body.into());
504 let header: Header<E> = operation.into();
505
506 Ok(Some((header, body)))
507 } else {
508 Ok(None)
509 }
510 }
511
512 async fn delete_operations(
513 &mut self,
514 public_key: &PublicKey,
515 log_id: &L,
516 before: u64,
517 ) -> Result<bool, Self::Error> {
518 let result = query(
519 "
520 DELETE
521 FROM
522 operations_v1
523 WHERE
524 public_key = ?
525 AND log_id = ?
526 AND CAST(seq_num AS NUMERIC) < CAST(? as NUMERIC)
527 ",
528 )
529 .bind(public_key.to_string())
530 .bind(calculate_hash(log_id).to_string())
531 .bind(before.to_string())
532 .execute(&self.pool)
533 .await?;
534
535 Ok(result.rows_affected() > 0)
536 }
537
538 async fn delete_payloads(
539 &mut self,
540 public_key: &PublicKey,
541 log_id: &L,
542 from: u64,
543 to: u64,
544 ) -> Result<bool, Self::Error> {
545 let result = query(
546 "
547 UPDATE
548 operations_v1
549 SET
550 body = NULL
551 WHERE
552 public_key = ?
553 AND log_id = ?
554 AND CAST(seq_num AS NUMERIC) >= CAST(? as NUMERIC)
555 AND CAST(seq_num AS NUMERIC) < CAST(? as NUMERIC)
556 ",
557 )
558 .bind(public_key.to_string())
559 .bind(calculate_hash(log_id).to_string())
560 .bind(from.to_string())
561 .bind(to.to_string())
562 .execute(&self.pool)
563 .await?;
564
565 Ok(result.rows_affected() > 0)
566 }
567
568 async fn get_log_heights(&self, log_id: &L) -> Result<Vec<(PublicKey, u64)>, Self::Error> {
569 let operations = query_as::<_, LogHeightRow>(
570 "
571 SELECT
572 public_key,
573 CAST(MAX(CAST(seq_num AS NUMERIC)) AS TEXT) as seq_num
574 FROM
575 operations_v1
576 WHERE
577 log_id = ?
578 GROUP BY
579 public_key
580 ",
581 )
582 .bind(calculate_hash(log_id).to_string())
583 .fetch_all(&self.pool)
584 .await?;
585
586 let log_heights: Vec<(PublicKey, u64)> = operations
587 .into_iter()
588 .map(|operation| operation.into())
589 .collect();
590
591 Ok(log_heights)
592 }
593}
594
595#[cfg(test)]
596mod tests {
597 use p2panda_core::{Body, Hash, Header, PrivateKey};
598 use serde::{Deserialize, Serialize};
599
600 use crate::sqlite::test_utils::initialize_sqlite_db;
601 use crate::{LogStore, OperationStore};
602
603 use super::SqliteStore;
604
605 fn create_operation(
606 private_key: &PrivateKey,
607 body: &Body,
608 seq_num: u64,
609 timestamp: u64,
610 backlink: Option<Hash>,
611 ) -> (Hash, Header<()>, Vec<u8>) {
612 let mut header = Header {
613 version: 1,
614 public_key: private_key.public_key(),
615 signature: None,
616 payload_size: body.size(),
617 payload_hash: Some(body.hash()),
618 timestamp,
619 seq_num,
620 backlink,
621 previous: vec![],
622 extensions: (),
623 };
624 header.sign(private_key);
625 let header_bytes = header.to_bytes();
626 (header.hash(), header, header_bytes)
627 }
628
629 #[tokio::test]
630 async fn default_sqlite_store() {
631 let db_pool = initialize_sqlite_db().await;
632
633 let mut store = SqliteStore::new(db_pool);
634 let private_key = PrivateKey::new();
635
636 let body = Body::new("hello!".as_bytes());
637 let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
638
639 let inserted = store
640 .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
641 .await
642 .expect("no errors");
643
644 assert!(inserted);
645 }
646
647 #[tokio::test]
648 async fn generic_extensions_mem_store() {
649 #[derive(Clone, Debug, Default, Serialize, Deserialize)]
651 struct MyExtension {}
652
653 let db_pool = initialize_sqlite_db().await;
655
656 let mut store = SqliteStore::new(db_pool);
658
659 let private_key = PrivateKey::new();
661 let body = Body::new("hello!".as_bytes());
662 let mut header = Header {
663 version: 1,
664 public_key: private_key.public_key(),
665 signature: None,
666 payload_size: body.size(),
667 payload_hash: Some(body.hash()),
668 timestamp: 0,
669 seq_num: 0,
670 backlink: None,
671 previous: vec![],
672 extensions: Some(MyExtension {}),
673 };
674 header.sign(&private_key);
675
676 let inserted = store
678 .insert_operation(header.hash(), &header, Some(&body), &header.to_bytes(), &0)
679 .await
680 .expect("no errors");
681 assert!(inserted);
682 }
683
684 #[tokio::test]
685 async fn insert_operation_with_unsigned_header() {
686 let db_pool = initialize_sqlite_db().await;
687 let mut store = SqliteStore::new(db_pool);
688 let private_key = PrivateKey::new();
689
690 let body = Body::new("hello!".as_bytes());
692 let (hash, mut header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
693
694 header.signature = None;
696
697 let inserted = store
699 .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
700 .await;
701
702 assert!(inserted.is_err());
704 assert_eq!(
705 format!("{}", inserted.unwrap_err()),
706 "an error occurred with the sqlite database: error returned from database: (code: 1299) NOT NULL constraint failed: operations_v1.signature"
707 );
708 }
709
710 #[tokio::test]
711 async fn insert_get_operation() {
712 let db_pool = initialize_sqlite_db().await;
713 let mut store = SqliteStore::new(db_pool);
714 let private_key = PrivateKey::new();
715
716 let body = Body::new("hello!".as_bytes());
718 let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
719
720 let body_2 = Body::new("buenas!".as_bytes());
722 let (hash_2, _header_2, _header_bytes_2) =
723 create_operation(&private_key, &body_2, 0, 0, None);
724
725 let inserted = store
727 .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
728 .await
729 .expect("no errors");
730 assert!(inserted);
731 assert!(store.has_operation(hash).await.expect("no error"));
733 assert!(!store.has_operation(hash_2).await.expect("no error"));
734
735 let (header_again, body_again) = store
736 .get_operation(hash)
737 .await
738 .expect("no error")
739 .expect("operation exist");
740
741 assert_eq!(header.hash(), header_again.hash());
744 assert_eq!(Some(body.clone()), body_again);
746
747 let (header_bytes_again, body_bytes_again) = store
748 .get_raw_operation(hash)
749 .await
750 .expect("no error")
751 .expect("operation exist");
752
753 assert_eq!(header_bytes_again, header_bytes);
754 assert_eq!(body_bytes_again, Some(body.to_bytes()));
755 }
756
757 #[tokio::test]
758 async fn delete_operation() {
759 let db_pool = initialize_sqlite_db().await;
760 let mut store = SqliteStore::new(db_pool);
761 let private_key = PrivateKey::new();
762
763 let body = Body::new("hello!".as_bytes());
764 let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
765
766 let inserted = store
768 .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
769 .await
770 .expect("no errors");
771 assert!(inserted);
772
773 assert!(store.has_operation(hash).await.expect("no error"));
775
776 assert!(store.delete_operation(hash).await.expect("no error"));
778
779 let deleted_operation = store.get_operation(hash).await.expect("no error");
780 assert!(deleted_operation.is_none());
781 assert!(!store.has_operation(hash).await.expect("no error"));
782
783 let deleted_raw_operation = store.get_raw_operation(hash).await.expect("no error");
784 assert!(deleted_raw_operation.is_none());
785 }
786
787 #[tokio::test]
788 async fn delete_payload() {
789 let db_pool = initialize_sqlite_db().await;
790 let mut store = SqliteStore::new(db_pool);
791 let private_key = PrivateKey::new();
792
793 let body = Body::new("hello!".as_bytes());
794 let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
795
796 let inserted = store
797 .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
798 .await
799 .expect("no errors");
800 assert!(inserted);
801
802 assert!(store.delete_payload(hash).await.expect("no error"));
803
804 let (_, no_body) = store
805 .get_operation(hash)
806 .await
807 .expect("no error")
808 .expect("operation exist");
809 assert!(no_body.is_none());
810 assert!(store.has_operation(hash).await.expect("no error"));
811
812 let (_, no_body) = store
813 .get_raw_operation(hash)
814 .await
815 .expect("no error")
816 .expect("operation exist");
817 assert!(no_body.is_none());
818 }
819
820 #[tokio::test]
821 async fn get_log() {
822 let db_pool = initialize_sqlite_db().await;
823 let mut store = SqliteStore::new(db_pool);
824 let private_key = PrivateKey::new();
825 let log_id = 0;
826
827 let body_0 = Body::new("hello!".as_bytes());
828 let body_1 = Body::new("hello again!".as_bytes());
829 let body_2 = Body::new("hello for a third time!".as_bytes());
830
831 let (hash_0, header_0, header_bytes_0) =
832 create_operation(&private_key, &body_0, 0, 0, None);
833 let (hash_1, header_1, header_bytes_1) =
834 create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
835 let (hash_2, header_2, header_bytes_2) =
836 create_operation(&private_key, &body_2, 2, 0, Some(hash_1));
837
838 store
839 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
840 .await
841 .expect("no errors");
842 store
843 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
844 .await
845 .expect("no errors");
846 store
847 .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
848 .await
849 .expect("no errors");
850
851 let log = store
853 .get_log(&private_key.public_key(), &log_id, None)
854 .await
855 .expect("no errors")
856 .expect("log should exist");
857
858 assert_eq!(log.len(), 3);
859 assert_eq!(log[0].0.hash(), hash_0);
860 assert_eq!(log[1].0.hash(), hash_1);
861 assert_eq!(log[2].0.hash(), hash_2);
862 assert_eq!(log[0].1, Some(body_0.clone()));
863 assert_eq!(log[1].1, Some(body_1.clone()));
864 assert_eq!(log[2].1, Some(body_2.clone()));
865
866 let log = store
868 .get_log(&private_key.public_key(), &log_id, Some(1))
869 .await
870 .expect("no errors")
871 .expect("log should exist");
872
873 assert_eq!(log.len(), 2);
874 assert_eq!(log[0].0.hash(), hash_1);
875 assert_eq!(log[1].0.hash(), hash_2);
876 assert_eq!(log[0].1, Some(body_1.clone()));
877 assert_eq!(log[1].1, Some(body_2.clone()));
878
879 let log = store
881 .get_raw_log(&private_key.public_key(), &log_id, None)
882 .await
883 .expect("no errors")
884 .expect("log should exist");
885
886 assert_eq!(log.len(), 3);
887 assert_eq!(log[0].0, header_bytes_0);
888 assert_eq!(log[1].0, header_bytes_1);
889 assert_eq!(log[2].0, header_bytes_2);
890 assert_eq!(log[0].1, Some(body_0.to_bytes()));
891 assert_eq!(log[1].1, Some(body_1.to_bytes()));
892 assert_eq!(log[2].1, Some(body_2.to_bytes()));
893
894 let log = store
896 .get_raw_log(&private_key.public_key(), &log_id, Some(1))
897 .await
898 .expect("no errors")
899 .expect("log should exist");
900
901 assert_eq!(log.len(), 2);
902 assert_eq!(log[0].0, header_bytes_1);
903 assert_eq!(log[1].0, header_bytes_2);
904 assert_eq!(log[0].1, Some(body_1.to_bytes()));
905 assert_eq!(log[1].1, Some(body_2.to_bytes()));
906 }
907
908 #[tokio::test]
909 async fn get_log_hashes_and_size() {
910 let db_pool = initialize_sqlite_db().await;
911 let mut store = SqliteStore::new(db_pool);
912 let private_key = PrivateKey::new();
913 let log_id = 0;
914
915 let body_0 = Body::new("hello!".as_bytes());
916 let body_1 = Body::new("hello again!".as_bytes());
917 let body_2 = Body::new("hello for a third time!".as_bytes());
918
919 let (hash_0, header_0, header_bytes_0) =
920 create_operation(&private_key, &body_0, 0, 0, None);
921 let (hash_1, header_1, header_bytes_1) =
922 create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
923 let (hash_2, header_2, header_bytes_2) =
924 create_operation(&private_key, &body_2, 2, 0, Some(hash_1));
925
926 store
927 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
928 .await
929 .expect("no errors");
930 store
931 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
932 .await
933 .expect("no errors");
934 store
935 .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
936 .await
937 .expect("no errors");
938
939 let hashes = store
941 .get_log_hashes(&private_key.public_key(), &log_id, None)
942 .await
943 .expect("no errors")
944 .expect("log should exist");
945
946 assert_eq!(hashes.len(), 3);
947 assert_eq!(hashes[0], (0, hash_0));
948 assert_eq!(hashes[1], (1, hash_1));
949 assert_eq!(hashes[2], (2, hash_2));
950
951 let size = store
953 .get_log_size(&private_key.public_key(), &log_id, None)
954 .await
955 .expect("no errors")
956 .expect("log should exist");
957
958 let expected_size = header_bytes_0.len() as u64
959 + header_0.payload_size
960 + header_bytes_1.len() as u64
961 + header_1.payload_size
962 + header_bytes_2.len() as u64
963 + header_2.payload_size;
964 assert_eq!(size, expected_size);
965
966 let hashes = store
968 .get_log_hashes(&private_key.public_key(), &log_id, Some(1))
969 .await
970 .expect("no errors")
971 .expect("log should exist");
972
973 assert_eq!(hashes.len(), 2);
974 assert_eq!(hashes[0], (1, hash_1));
975 assert_eq!(hashes[1], (2, hash_2));
976
977 let size = store
979 .get_log_size(&private_key.public_key(), &log_id, Some(1))
980 .await
981 .expect("no errors")
982 .expect("log should exist");
983
984 let expected_size = header_bytes_1.len() as u64
985 + header_1.payload_size
986 + header_bytes_2.len() as u64
987 + header_2.payload_size;
988 assert_eq!(size, expected_size);
989 }
990
991 #[tokio::test]
992 async fn get_latest_operation() {
993 let db_pool = initialize_sqlite_db().await;
994 let mut store = SqliteStore::new(db_pool);
995 let private_key = PrivateKey::new();
996 let log_id = 0;
997
998 let body_0 = Body::new("hello!".as_bytes());
999 let body_1 = Body::new("hello again!".as_bytes());
1000 let body_2 = Body::new("hello?!?!".as_bytes());
1001
1002 let (hash_0, header_0, header_bytes_0) =
1003 create_operation(&private_key, &body_0, 0, 0, None);
1004 let (hash_1, header_1, header_bytes_1) =
1005 create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
1006 let (hash_2, mut header_2, header_bytes_2) =
1007 create_operation(&private_key, &body_2, 2, 0, Some(hash_1));
1008 header_2.previous = vec![hash_0];
1009
1010 store
1011 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
1012 .await
1013 .expect("no errors");
1014 store
1015 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
1016 .await
1017 .expect("no errors");
1018 store
1019 .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
1020 .await
1021 .expect("no errors");
1022
1023 let (latest_header, latest_body) = store
1024 .latest_operation(&private_key.public_key(), &log_id)
1025 .await
1026 .expect("no errors")
1027 .expect("there's an operation");
1028
1029 assert_eq!(latest_header.hash(), header_2.hash());
1030 assert_eq!(latest_body, Some(body_2));
1031 }
1032
1033 #[tokio::test]
1034 async fn delete_operations() {
1035 let db_pool = initialize_sqlite_db().await;
1036 let mut store = SqliteStore::new(db_pool);
1037 let private_key = PrivateKey::new();
1038 let log_id = 0;
1039
1040 let body_0 = Body::new("hello!".as_bytes());
1041 let body_1 = Body::new("hello again!".as_bytes());
1042 let body_2 = Body::new("final hello!".as_bytes());
1043
1044 let (hash_0, header_0, header_bytes_0) =
1045 create_operation(&private_key, &body_0, 0, 0, None);
1046 let (hash_1, header_1, header_bytes_1) =
1047 create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
1048 let (hash_2, header_2, header_bytes_2) =
1049 create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
1050
1051 store
1052 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
1053 .await
1054 .expect("no errors");
1055 store
1056 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
1057 .await
1058 .expect("no errors");
1059 store
1060 .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
1061 .await
1062 .expect("no errors");
1063
1064 let log = store
1066 .get_log(&private_key.public_key(), &log_id, None)
1067 .await
1068 .expect("no errors")
1069 .expect("log should exist");
1070
1071 assert_eq!(log.len(), 3);
1073
1074 let deleted = store
1076 .delete_operations(&private_key.public_key(), &log_id, 2)
1077 .await
1078 .expect("no errors");
1079 assert!(deleted);
1080
1081 let log = store
1082 .get_log(&private_key.public_key(), &log_id, None)
1083 .await
1084 .expect("no errors")
1085 .expect("log should exist");
1086
1087 assert_eq!(log.len(), 1);
1089
1090 assert_eq!(log[0].0.hash(), header_2.hash());
1092
1093 let deleted = store
1095 .delete_operations(&private_key.public_key(), &log_id, 2)
1096 .await
1097 .expect("no errors");
1098 assert!(!deleted);
1099 }
1100
1101 #[tokio::test]
1102 async fn delete_payloads() {
1103 let db_pool = initialize_sqlite_db().await;
1104 let mut store = SqliteStore::new(db_pool);
1105 let private_key = PrivateKey::new();
1106 let log_id = 0;
1107
1108 let body_0 = Body::new("hello!".as_bytes());
1109 let body_1 = Body::new("hello again!".as_bytes());
1110 let body_2 = Body::new("final hello!".as_bytes());
1111
1112 let (hash_0, header_0, header_bytes_0) =
1113 create_operation(&private_key, &body_0, 0, 0, None);
1114 let (hash_1, header_1, header_bytes_1) =
1115 create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
1116 let (hash_2, header_2, header_bytes_2) =
1117 create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
1118
1119 store
1120 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
1121 .await
1122 .expect("no errors");
1123 store
1124 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
1125 .await
1126 .expect("no errors");
1127 store
1128 .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
1129 .await
1130 .expect("no errors");
1131
1132 let log = store
1134 .get_log(&private_key.public_key(), &log_id, None)
1135 .await
1136 .expect("no errors")
1137 .expect("log should exist");
1138
1139 assert_eq!(log.len(), 3);
1141
1142 assert_eq!(log[0].1, Some(body_0));
1143 assert_eq!(log[1].1, Some(body_1));
1144 assert_eq!(log[2].1, Some(body_2.clone()));
1145
1146 let deleted = store
1148 .delete_payloads(&private_key.public_key(), &log_id, 0, 2)
1149 .await
1150 .expect("no errors");
1151 assert!(deleted);
1152
1153 let log = store
1154 .get_log(&private_key.public_key(), &log_id, None)
1155 .await
1156 .expect("no errors")
1157 .expect("log should exist");
1158
1159 assert_eq!(log[0].1, None);
1160 assert_eq!(log[1].1, None);
1161 assert_eq!(log[2].1, Some(body_2));
1162 }
1163
1164 #[tokio::test]
1165 async fn get_log_heights() {
1166 let db_pool = initialize_sqlite_db().await;
1167 let mut store = SqliteStore::new(db_pool);
1168
1169 let log_id = 0;
1170
1171 let private_key_0 = PrivateKey::new();
1172 let private_key_1 = PrivateKey::new();
1173 let private_key_2 = PrivateKey::new();
1174
1175 let body_0 = Body::new("hello!".as_bytes());
1176 let body_1 = Body::new("hello again!".as_bytes());
1177 let body_2 = Body::new("hello for a third time!".as_bytes());
1178
1179 let (hash_0, header_0, header_bytes_0) =
1180 create_operation(&private_key_0, &body_0, 0, 0, None);
1181 let (hash_1, header_1, header_bytes_1) =
1182 create_operation(&private_key_0, &body_1, 1, 0, Some(hash_0));
1183 let (hash_2, header_2, header_bytes_2) =
1184 create_operation(&private_key_0, &body_2, 2, 0, Some(hash_1));
1185
1186 let log_heights = store.get_log_heights(&log_id).await.expect("no errors");
1187 assert!(log_heights.is_empty());
1188
1189 store
1190 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1191 .await
1192 .expect("no errors");
1193 store
1194 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
1195 .await
1196 .expect("no errors");
1197 store
1198 .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
1199 .await
1200 .expect("no errors");
1201
1202 let (hash_0, header_0, header_bytes_0) =
1203 create_operation(&private_key_1, &body_0, 0, 0, None);
1204 let (hash_1, header_1, header_bytes_1) =
1205 create_operation(&private_key_1, &body_1, 1, 0, Some(hash_0));
1206
1207 store
1208 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1209 .await
1210 .expect("no errors");
1211 store
1212 .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
1213 .await
1214 .expect("no errors");
1215
1216 let (hash_0, header_0, header_bytes_0) =
1217 create_operation(&private_key_2, &body_0, 0, 0, None);
1218
1219 store
1220 .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
1221 .await
1222 .expect("no errors");
1223
1224 let log_heights = store.get_log_heights(&log_id).await.expect("no errors");
1225
1226 assert_eq!(log_heights.len(), 3);
1227
1228 assert!(log_heights.contains(&(private_key_0.public_key(), 2)));
1230 assert!(log_heights.contains(&(private_key_1.public_key(), 1)));
1231 assert!(log_heights.contains(&(private_key_2.public_key(), 0)));
1232 }
1233}