1use crate::{
9 error::cache::CacheError,
10 models::{
11 BlockRow, LocalKeyRow, NewBlockRow, NewLocalKeyRow, NewLocalValueRow, NewPrefixScanRow,
12 NewStorageRow,
13 },
14 schema::{blocks, local_keys, local_values, prefix_scans, storage},
15 strings::cache::{errors, lock_patterns, pragmas, urls},
16};
17use bb8::CustomizeConnection;
18use diesel::{
19 OptionalExtension, prelude::*, result::Error as DieselError, sqlite::SqliteConnection,
20};
21use diesel_async::{
22 AsyncConnection, AsyncMigrationHarness, RunQueryDsl,
23 pooled_connection::{
24 AsyncDieselConnectionManager, PoolError,
25 bb8::{Pool, PooledConnection},
26 },
27 sync_connection_wrapper::SyncConnectionWrapper,
28};
29use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
30use std::{
31 collections::{HashMap, HashSet},
32 future::Future,
33 ops::{Deref, DerefMut},
34 path::Path,
35 pin::Pin,
36 sync::Arc,
37 time::Duration,
38};
39use subxt::config::substrate::H256;
40use tokio::sync::{Mutex, MutexGuard};
41
42const MAX_POOL_CONNECTIONS: u32 = 5;
48const MAX_LOCK_RETRIES: u32 = 30;
50
51#[derive(Debug, Clone)]
62pub struct PrefixScanProgress {
63 pub last_scanned_key: Option<Vec<u8>>,
66 pub is_complete: bool,
68}
69
70#[derive(Clone, Debug)]
75pub struct StorageCache {
76 inner: StorageConn,
77}
78
79#[derive(Clone)]
81enum StorageConn {
82 Pool(Pool<SyncConnectionWrapper<SqliteConnection>>),
86 Single(Arc<Mutex<SyncConnectionWrapper<SqliteConnection>>>),
90}
91
92impl std::fmt::Debug for StorageConn {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 match self {
95 StorageConn::Pool(_) => f.debug_tuple("Pool").field(&"...").finish(),
96 StorageConn::Single(_) => f.debug_tuple("Single").field(&"...").finish(),
97 }
98 }
99}
100
101pub(crate) enum ConnectionGuard<'a> {
105 Pool(PooledConnection<'a, SyncConnectionWrapper<SqliteConnection>>),
106 Single(MutexGuard<'a, SyncConnectionWrapper<SqliteConnection>>),
107}
108
109impl<'a> Deref for ConnectionGuard<'a> {
110 type Target = SyncConnectionWrapper<SqliteConnection>;
111
112 fn deref(&self) -> &Self::Target {
113 match self {
114 ConnectionGuard::Pool(conn) => conn,
115 ConnectionGuard::Single(guard) => guard,
116 }
117 }
118}
119
120impl<'a> DerefMut for ConnectionGuard<'a> {
121 fn deref_mut(&mut self) -> &mut Self::Target {
122 match self {
123 ConnectionGuard::Pool(conn) => conn,
124 ConnectionGuard::Single(guard) => guard,
125 }
126 }
127}
128
129async fn retry_conn(attempts: &mut u32) {
134 *attempts += 1;
135 let delay_ms = 10u64.saturating_mul(*attempts as u64);
136 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
137}
138
139#[derive(Debug, Clone, Copy)]
141struct SqliteConnectionCustomizer;
142
143impl CustomizeConnection<SyncConnectionWrapper<SqliteConnection>, PoolError>
144 for SqliteConnectionCustomizer
145{
146 fn on_acquire<'a>(
147 &'a self,
148 conn: &'a mut SyncConnectionWrapper<SqliteConnection>,
149 ) -> Pin<Box<dyn Future<Output = Result<(), PoolError>> + Send + 'a>> {
150 Box::pin(async move {
151 diesel::sql_query(pragmas::BUSY_TIMEOUT)
153 .execute(conn)
154 .await
155 .map_err(PoolError::QueryError)?;
156 Ok(())
157 })
158 }
159}
160
161impl StorageCache {
162 pub async fn open(maybe_path: Option<&Path>) -> Result<Self, CacheError> {
166 if let Some(path) = maybe_path {
168 if let Some(parent) = path.parent() {
170 std::fs::create_dir_all(parent)?;
171 }
172 let url = path.display().to_string();
173
174 {
176 let mut conn = SyncConnectionWrapper::<SqliteConnection>::establish(&url).await?;
177 diesel::sql_query(pragmas::JOURNAL_MODE_WAL).execute(&mut conn).await?;
180 diesel::sql_query(pragmas::BUSY_TIMEOUT).execute(&mut conn).await?;
182 let mut harness = AsyncMigrationHarness::new(conn);
183 harness.run_pending_migrations(MIGRATIONS)?;
184 let _ = harness.into_inner();
185 }
186
187 let manager =
189 AsyncDieselConnectionManager::<SyncConnectionWrapper<SqliteConnection>>::new(url);
190 let pool = Pool::builder()
191 .max_size(MAX_POOL_CONNECTIONS)
192 .connection_customizer(Box::new(SqliteConnectionCustomizer))
193 .build(manager)
194 .await?;
195 Ok(Self { inner: StorageConn::Pool(pool) })
196 } else {
197 let mut conn =
199 SyncConnectionWrapper::<SqliteConnection>::establish(urls::IN_MEMORY).await?;
200 diesel::sql_query(pragmas::BUSY_TIMEOUT).execute(&mut conn).await?;
203 let mut harness = AsyncMigrationHarness::new(conn);
204 harness.run_pending_migrations(MIGRATIONS)?;
205 let conn = harness.into_inner();
206 Ok(Self {
207 inner: StorageConn::Single(std::sync::Arc::new(tokio::sync::Mutex::new(conn))),
208 })
209 }
210 }
211
212 pub async fn in_memory() -> Result<Self, CacheError> {
217 Self::open(None).await
218 }
219
220 pub(crate) async fn get_conn(&self) -> Result<ConnectionGuard<'_>, CacheError> {
225 match &self.inner {
226 StorageConn::Pool(pool) => {
227 let conn = pool.get().await.map_err(|e| {
228 CacheError::Connection(ConnectionError::BadConnection(e.to_string()))
229 })?;
230 Ok(ConnectionGuard::Pool(conn))
231 },
232 StorageConn::Single(m) => {
233 let conn = m.lock().await;
234 Ok(ConnectionGuard::Single(conn))
235 },
236 }
237 }
238
239 pub async fn get_storage(
246 &self,
247 block_hash: H256,
248 key: &[u8],
249 ) -> Result<Option<Option<Vec<u8>>>, CacheError> {
250 use crate::schema::storage::columns as sc;
256
257 let mut conn = self.get_conn().await?;
258
259 let row: Option<(Option<Vec<u8>>, bool)> = storage::table
260 .filter(sc::block_hash.eq(block_hash.as_bytes()))
261 .filter(sc::key.eq(key))
262 .select((sc::value, sc::is_empty))
263 .first::<(Option<Vec<u8>>, bool)>(&mut conn)
264 .await
265 .optional()?;
266
267 Ok(row.map(|(val, empty)| if empty { None } else { val }))
268 }
269
270 pub async fn set_storage(
277 &self,
278 block_hash: H256,
279 key: &[u8],
280 value: Option<&[u8]>,
281 ) -> Result<(), CacheError> {
282 use crate::schema::storage::columns as sc;
284
285 let mut attempts = 0;
289 loop {
290 let mut conn = self.get_conn().await?;
291
292 let row = NewStorageRow {
293 block_hash: block_hash.as_bytes(),
294 key,
295 value,
296 is_empty: value.is_none(),
297 };
298
299 let res = diesel::insert_into(storage::table)
300 .values(&row)
301 .on_conflict((sc::block_hash, sc::key))
302 .do_update()
303 .set((sc::value.eq(value), sc::is_empty.eq(row.is_empty)))
304 .execute(&mut conn)
305 .await;
306
307 match res {
308 Ok(_) => return Ok(()),
309 Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
310 retry_conn(&mut attempts).await;
311 continue;
312 },
313 Err(e) => return Err(e.into()),
314 }
315 }
316 }
317
318 pub async fn get_storage_batch(
322 &self,
323 block_hash: H256,
324 keys: &[&[u8]],
325 ) -> Result<Vec<Option<Option<Vec<u8>>>>, CacheError> {
326 if keys.is_empty() {
327 return Ok(vec![]);
328 }
329
330 let mut seen = HashSet::with_capacity(keys.len());
331 if keys.iter().any(|key| !seen.insert(key)) {
332 return Err(CacheError::DuplicatedKeys);
333 }
334
335 use crate::schema::storage::columns as sc;
336 let mut conn = self.get_conn().await?;
337
338 let rows: Vec<(Vec<u8>, Option<Vec<u8>>, bool)> = storage::table
339 .filter(sc::block_hash.eq(block_hash.as_bytes()))
340 .filter(sc::key.eq_any(keys))
341 .select((sc::key, sc::value, sc::is_empty))
342 .load::<(Vec<u8>, Option<Vec<u8>>, bool)>(&mut conn)
343 .await?;
344
345 let mut cache_map = HashMap::new();
348 for (key, value, empty) in rows {
349 let value = if empty { None } else { value };
350 cache_map.insert(key, value);
351 }
352
353 Ok(keys.iter().map(|key| cache_map.remove(*key)).collect())
357 }
358
359 pub async fn set_storage_batch(
363 &self,
364 block_hash: H256,
365 entries: &[(&[u8], Option<&[u8]>)],
366 ) -> Result<(), CacheError> {
367 if entries.is_empty() {
368 return Ok(());
369 }
370
371 let mut seen = HashSet::with_capacity(entries.len());
372 if entries.iter().any(|(key, _)| !seen.insert(key)) {
373 return Err(CacheError::DuplicatedKeys);
374 }
375
376 use crate::schema::storage::columns as sc;
382 let entries = Arc::new(entries);
383 let block_hash = Arc::new(block_hash);
384
385 let mut attempts = 0;
389 loop {
390 let entries = Arc::clone(&entries);
391 let block_hash = Arc::clone(&block_hash);
392 let mut conn = self.get_conn().await?;
393 let res = conn
394 .transaction::<_, DieselError, _>(move |conn| {
395 Box::pin(async move {
396 let new_rows: Vec<NewStorageRow> = entries
397 .iter()
398 .map(|(key, value)| NewStorageRow {
399 block_hash: block_hash.as_bytes(),
400 key,
401 value: *value,
402 is_empty: value.is_none(),
403 })
404 .collect();
405 for row in new_rows {
406 diesel::insert_into(storage::table)
407 .values(&row)
408 .on_conflict((sc::block_hash, sc::key))
409 .do_update()
410 .set((sc::value.eq(row.value), sc::is_empty.eq(row.is_empty)))
411 .execute(conn)
412 .await?;
413 }
414 Ok(())
415 })
416 })
417 .await;
418
419 match res {
420 Ok(_) => return Ok(()),
421 Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
422 retry_conn(&mut attempts).await;
423 continue;
424 },
425 Err(e) => return Err(e.into()),
426 }
427 }
428 }
429
430 pub async fn get_local_key(&self, key: &[u8]) -> Result<Option<LocalKeyRow>, CacheError> {
436 use crate::schema::local_keys::columns as lkc;
437
438 let mut conn = self.get_conn().await?;
439
440 let row = local_keys::table
441 .filter(lkc::key.eq(key))
442 .select(LocalKeyRow::as_select())
443 .first(&mut conn)
444 .await
445 .optional()?;
446
447 Ok(row)
448 }
449
450 pub async fn insert_local_key(&self, key: &[u8]) -> Result<i32, CacheError> {
454 use crate::schema::local_keys::columns as lkc;
455
456 let mut attempts = 0;
457 loop {
458 let mut conn = self.get_conn().await?;
459
460 let res = diesel::insert_into(local_keys::table)
462 .values(NewLocalKeyRow { key })
463 .on_conflict(lkc::key)
464 .do_nothing()
465 .execute(&mut conn)
466 .await;
467
468 match res {
469 Ok(_) => {
470 let key_id: i32 = local_keys::table
472 .filter(lkc::key.eq(key))
473 .select(lkc::id)
474 .first(&mut conn)
475 .await?;
476 return Ok(key_id);
477 },
478 Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
479 retry_conn(&mut attempts).await;
480 continue;
481 },
482 Err(e) => return Err(e.into()),
483 }
484 }
485 }
486
487 pub async fn get_local_value_at_block(
497 &self,
498 key: &[u8],
499 block_number: u32,
500 ) -> Result<Option<Option<Vec<u8>>>, CacheError> {
501 use crate::schema::{local_keys::columns as lkc, local_values::columns as lvc};
502
503 let mut conn = self.get_conn().await?;
504 let block_num = block_number as i64;
505
506 let key_id: i32 = match local_keys::table
508 .filter(lkc::key.eq(key))
509 .select(lkc::id)
510 .first(&mut conn)
511 .await
512 .optional()?
513 {
514 Some(id) => id,
515 _ => return Ok(None),
516 };
517
518 let value: Option<Option<Vec<u8>>> = local_values::table
520 .filter(lvc::key_id.eq(key_id))
521 .filter(lvc::valid_from.le(block_num))
522 .filter(lvc::valid_until.is_null().or(lvc::valid_until.gt(block_num)))
523 .select(lvc::value)
524 .first(&mut conn)
525 .await
526 .optional()?;
527
528 Ok(value)
529 }
530
531 pub async fn get_local_values_at_block_batch(
538 &self,
539 keys: &[&[u8]],
540 block_number: u32,
541 ) -> Result<Vec<Option<Option<Vec<u8>>>>, CacheError> {
542 if keys.is_empty() {
543 return Ok(vec![]);
544 }
545
546 let mut seen = HashSet::with_capacity(keys.len());
547 if keys.iter().any(|key| !seen.insert(key)) {
548 return Err(CacheError::DuplicatedKeys);
549 }
550
551 use crate::schema::{local_keys::columns as lkc, local_values::columns as lvc};
552
553 let mut conn = self.get_conn().await?;
554 let block_num = block_number as i64;
555
556 let key_rows: Vec<LocalKeyRow> = local_keys::table
558 .filter(lkc::key.eq_any(keys))
559 .select(LocalKeyRow::as_select())
560 .load(&mut conn)
561 .await?;
562
563 let key_to_id: HashMap<Vec<u8>, i32> =
565 key_rows.iter().map(|r| (r.key.clone(), r.id)).collect();
566
567 let key_ids: Vec<i32> = key_to_id.values().copied().collect();
569
570 if key_ids.is_empty() {
571 return Ok(vec![None; keys.len()]);
572 }
573
574 let value_rows: Vec<(i32, Option<Vec<u8>>)> = local_values::table
576 .filter(lvc::key_id.eq_any(&key_ids))
577 .filter(lvc::valid_from.le(block_num))
578 .filter(lvc::valid_until.is_null().or(lvc::valid_until.gt(block_num)))
579 .select((lvc::key_id, lvc::value))
580 .load(&mut conn)
581 .await?;
582
583 let mut id_to_value: HashMap<i32, Option<Vec<u8>>> = HashMap::new();
585 for (key_id, value) in value_rows {
586 id_to_value.insert(key_id, value);
587 }
588
589 Ok(keys
594 .iter()
595 .map(|key| key_to_id.get(*key).and_then(|key_id| id_to_value.remove(key_id)))
596 .collect())
597 }
598
599 pub async fn get_local_keys_at_block(
609 &self,
610 prefix: &[u8],
611 block_number: u32,
612 ) -> Result<Vec<Vec<u8>>, CacheError> {
613 use crate::schema::{local_keys::columns as lkc, local_values::columns as lvc};
614
615 let mut conn = self.get_conn().await?;
616 let block_num = block_number as i64;
617 let prefix_vec = prefix.to_vec();
618
619 let mut query = local_keys::table
620 .inner_join(local_values::table)
621 .filter(lkc::key.ge(&prefix_vec))
622 .filter(lvc::valid_from.le(block_num))
623 .filter(lvc::valid_until.is_null().or(lvc::valid_until.gt(block_num)))
624 .filter(lvc::value.is_not_null())
625 .select(lkc::key)
626 .distinct()
627 .order(lkc::key.asc())
628 .into_boxed();
629
630 if let Some(upper) = Self::prefix_upper_bound(prefix) {
631 query = query.filter(lkc::key.lt(upper));
632 }
633
634 let keys: Vec<Vec<u8>> = query.load(&mut conn).await?;
635 Ok(keys)
636 }
637
638 pub async fn get_local_deleted_keys_at_block(
644 &self,
645 prefix: &[u8],
646 block_number: u32,
647 ) -> Result<Vec<Vec<u8>>, CacheError> {
648 use crate::schema::{local_keys::columns as lkc, local_values::columns as lvc};
649
650 let mut conn = self.get_conn().await?;
651 let block_num = block_number as i64;
652 let prefix_vec = prefix.to_vec();
653
654 let mut query = local_keys::table
655 .inner_join(local_values::table)
656 .filter(lkc::key.ge(&prefix_vec))
657 .filter(lvc::valid_from.le(block_num))
658 .filter(lvc::valid_until.is_null().or(lvc::valid_until.gt(block_num)))
659 .filter(lvc::value.is_null())
660 .select(lkc::key)
661 .distinct()
662 .order(lkc::key.asc())
663 .into_boxed();
664
665 if let Some(upper) = Self::prefix_upper_bound(prefix) {
666 query = query.filter(lkc::key.lt(upper));
667 }
668
669 let keys: Vec<Vec<u8>> = query.load(&mut conn).await?;
670 Ok(keys)
671 }
672
673 fn prefix_upper_bound(prefix: &[u8]) -> Option<Vec<u8>> {
679 let mut upper = prefix.to_vec();
680 while let Some(last) = upper.last_mut() {
682 if *last < 0xFF {
683 *last += 1;
684 return Some(upper);
685 }
686 upper.pop();
687 }
688 None
690 }
691
692 pub async fn insert_local_value(
699 &self,
700 key_id: i32,
701 value: Option<&[u8]>,
702 valid_from: u32,
703 ) -> Result<(), CacheError> {
704 let mut attempts = 0;
705 loop {
706 let mut conn = self.get_conn().await?;
707
708 let row = NewLocalValueRow {
709 key_id,
710 value: value.map(|v| v.to_vec()),
711 valid_from: valid_from as i64,
712 valid_until: None,
713 };
714
715 let res =
716 diesel::insert_into(local_values::table).values(&row).execute(&mut conn).await;
717
718 match res {
719 Ok(_) => return Ok(()),
720 Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
721 retry_conn(&mut attempts).await;
722 continue;
723 },
724 Err(e) => return Err(e.into()),
725 }
726 }
727 }
728
729 pub async fn close_local_value(&self, key_id: i32, valid_until: u32) -> Result<(), CacheError> {
733 use crate::schema::local_values::columns as lvc;
734
735 let mut attempts = 0;
736 loop {
737 let mut conn = self.get_conn().await?;
738
739 let res = diesel::update(
740 local_values::table
741 .filter(lvc::key_id.eq(key_id))
742 .filter(lvc::valid_until.is_null()),
743 )
744 .set(lvc::valid_until.eq(Some(valid_until as i64)))
745 .execute(&mut conn)
746 .await;
747
748 match res {
749 Ok(_) => return Ok(()),
750 Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
751 retry_conn(&mut attempts).await;
752 continue;
753 },
754 Err(e) => return Err(e.into()),
755 }
756 }
757 }
758
759 pub async fn commit_local_changes(
766 &self,
767 entries: &[(&[u8], Option<&[u8]>)],
768 block_number: u32,
769 ) -> Result<(), CacheError> {
770 use crate::schema::{local_keys::columns as lkc, local_values::columns as lvc};
771
772 if entries.is_empty() {
773 return Ok(());
774 }
775
776 let owned: Vec<(Vec<u8>, Option<Vec<u8>>)> =
778 entries.iter().map(|(k, v)| (k.to_vec(), v.map(|val| val.to_vec()))).collect();
779
780 let mut attempts = 0;
781 loop {
782 let owned = owned.clone();
783 let mut conn = self.get_conn().await?;
784
785 let res = conn
786 .transaction::<_, DieselError, _>(move |conn| {
787 Box::pin(async move {
788 for (key, value) in &owned {
789 diesel::insert_into(local_keys::table)
791 .values(NewLocalKeyRow { key })
792 .on_conflict(lkc::key)
793 .do_nothing()
794 .execute(conn)
795 .await?;
796
797 let key_id: i32 = local_keys::table
798 .filter(lkc::key.eq(key.as_slice()))
799 .select(lkc::id)
800 .first(conn)
801 .await?;
802
803 diesel::update(
805 local_values::table
806 .filter(lvc::key_id.eq(key_id))
807 .filter(lvc::valid_until.is_null()),
808 )
809 .set(lvc::valid_until.eq(Some(block_number as i64)))
810 .execute(conn)
811 .await?;
812
813 let row = NewLocalValueRow {
815 key_id,
816 value: value.clone(),
817 valid_from: block_number as i64,
818 valid_until: None,
819 };
820 diesel::insert_into(local_values::table)
821 .values(&row)
822 .execute(conn)
823 .await?;
824 }
825 Ok(())
826 })
827 })
828 .await;
829
830 match res {
831 Ok(_) => return Ok(()),
832 Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
833 retry_conn(&mut attempts).await;
834 continue;
835 },
836 Err(e) => return Err(e.into()),
837 }
838 }
839 }
840
841 pub async fn clear_local_storage(&self) -> Result<(), CacheError> {
846 let mut attempts = 0;
847 loop {
848 let mut conn = self.get_conn().await?;
849
850 let res = conn
851 .transaction::<_, DieselError, _>(|conn| {
852 Box::pin(async move {
853 diesel::delete(local_values::table).execute(conn).await?;
855 diesel::delete(local_keys::table).execute(conn).await?;
857 Ok(())
858 })
859 })
860 .await;
861
862 match res {
863 Ok(_) => return Ok(()),
864 Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
865 retry_conn(&mut attempts).await;
866 continue;
867 },
868 Err(e) => return Err(e.into()),
869 }
870 }
871 }
872
873 pub async fn cache_block(
875 &self,
876 hash: H256,
877 number: u32,
878 parent_hash: H256,
879 header: &[u8],
880 ) -> Result<(), CacheError> {
881 use crate::schema::blocks::columns as bc;
883
884 let mut attempts = 0;
888 let parent_hash_bytes = parent_hash.as_bytes();
889 loop {
890 let mut conn = self.get_conn().await?;
891
892 let block = NewBlockRow {
893 hash: hash.as_bytes(),
894 number: number as i64,
895 parent_hash: parent_hash_bytes,
896 header,
897 };
898
899 let res = diesel::insert_into(blocks::table)
900 .values(&block)
901 .on_conflict(bc::hash)
902 .do_update()
903 .set((
904 bc::number.eq(number as i64),
905 bc::parent_hash.eq(parent_hash_bytes),
906 bc::header.eq(header),
907 ))
908 .execute(&mut conn)
909 .await;
910
911 match res {
912 Ok(_) => return Ok(()),
913 Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
914 retry_conn(&mut attempts).await;
915 continue;
916 },
917 Err(e) => return Err(e.into()),
918 }
919 }
920 }
921
922 pub async fn get_block(&self, hash: H256) -> Result<Option<BlockRow>, CacheError> {
924 use crate::schema::blocks::columns as bc;
927
928 let mut conn = self.get_conn().await?;
929
930 let row = blocks::table
931 .filter(bc::hash.eq(hash.as_bytes()))
932 .select(BlockRow::as_select())
933 .first(&mut conn)
934 .await
935 .optional()?;
936
937 match row {
938 Some(BlockRow { number, .. }) if number < 0 || number > u32::MAX.into() =>
941 Err(CacheError::DataCorruption(errors::BLOCK_NUMBER_OUT_OF_U32_RANGE.into())),
942 row @ Some(_) => Ok(row),
943 None => Ok(None),
944 }
945 }
946
947 pub async fn get_block_by_number(
949 &self,
950 block_number: u32,
951 ) -> Result<Option<BlockRow>, CacheError> {
952 use crate::schema::blocks::columns as bc;
955
956 let mut conn = self.get_conn().await?;
957
958 let row = blocks::table
959 .filter(bc::number.eq(block_number as i64))
960 .select(BlockRow::as_select())
961 .first(&mut conn)
962 .await
963 .optional()?;
964
965 match row {
966 Some(BlockRow { number, .. }) if number < 0 || number > u32::MAX.into() =>
968 Err(CacheError::DataCorruption(errors::BLOCK_NUMBER_OUT_OF_U32_RANGE.into())),
969 row @ Some(_) => Ok(row),
970 None => Ok(None),
971 }
972 }
973
974 pub async fn clear_block(&self, hash: H256) -> Result<(), CacheError> {
976 use crate::schema::{
980 blocks::columns as bc, prefix_scans::columns as psc, storage::columns as sc,
981 };
982 let block_hash = Arc::new(hash.as_bytes());
983
984 let mut attempts = 0;
988 loop {
989 let block_hash = Arc::clone(&block_hash);
990 let mut conn = self.get_conn().await?;
991
992 let res = conn
993 .transaction::<_, DieselError, _>(move |conn| {
994 Box::pin(async move {
995 diesel::delete(storage::table.filter(sc::block_hash.eq(*block_hash)))
996 .execute(conn)
997 .await?;
998 diesel::delete(blocks::table.filter(bc::hash.eq(*block_hash)))
999 .execute(conn)
1000 .await?;
1001 diesel::delete(prefix_scans::table.filter(psc::block_hash.eq(*block_hash)))
1002 .execute(conn)
1003 .await?;
1004 Ok(())
1005 })
1006 })
1007 .await;
1008
1009 match res {
1010 Ok(_) => return Ok(()),
1011 Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
1012 retry_conn(&mut attempts).await;
1013 continue;
1014 },
1015 Err(e) => return Err(e.into()),
1016 }
1017 }
1018 }
1019
1020 pub async fn get_prefix_scan_progress(
1026 &self,
1027 block_hash: H256,
1028 prefix: &[u8],
1029 ) -> Result<Option<PrefixScanProgress>, CacheError> {
1030 use crate::schema::prefix_scans::columns as psc;
1031
1032 let mut conn = self.get_conn().await?;
1033
1034 let row: Option<(Option<Vec<u8>>, bool)> = prefix_scans::table
1035 .filter(psc::block_hash.eq(block_hash.as_bytes()))
1036 .filter(psc::prefix.eq(prefix))
1037 .select((psc::last_scanned_key, psc::is_complete))
1038 .first::<(Option<Vec<u8>>, bool)>(&mut conn)
1039 .await
1040 .optional()?;
1041
1042 Ok(row.map(|(last_key, complete)| PrefixScanProgress {
1043 last_scanned_key: last_key,
1044 is_complete: complete,
1045 }))
1046 }
1047
1048 pub async fn update_prefix_scan(
1059 &self,
1060 block_hash: H256,
1061 prefix: &[u8],
1062 last_key: &[u8],
1063 is_complete: bool,
1064 ) -> Result<(), CacheError> {
1065 use crate::schema::prefix_scans::columns as psc;
1066 use diesel::upsert::excluded;
1067
1068 let new_row = NewPrefixScanRow {
1069 block_hash: block_hash.as_bytes(),
1070 prefix,
1071 last_scanned_key: Some(last_key),
1072 is_complete,
1073 };
1074
1075 let mut attempts = 0;
1076 loop {
1077 let mut conn = self.get_conn().await?;
1078 let res = diesel::insert_into(prefix_scans::table)
1079 .values(&new_row)
1080 .on_conflict((psc::block_hash, psc::prefix))
1081 .do_update()
1082 .set((
1083 psc::last_scanned_key.eq(excluded(psc::last_scanned_key)),
1084 psc::is_complete.eq(excluded(psc::is_complete)),
1085 ))
1086 .execute(&mut conn)
1087 .await;
1088
1089 match res {
1090 Ok(_) => return Ok(()),
1091 Err(e) if is_locked_error(&e) && attempts < MAX_LOCK_RETRIES => {
1092 retry_conn(&mut attempts).await;
1093 continue;
1094 },
1095 Err(e) => return Err(e.into()),
1096 }
1097 }
1098 }
1099
1100 pub async fn get_keys_by_prefix(
1106 &self,
1107 block_hash: H256,
1108 prefix: &[u8],
1109 ) -> Result<Vec<Vec<u8>>, CacheError> {
1110 use crate::schema::storage::columns as sc;
1111
1112 let mut conn = self.get_conn().await?;
1113
1114 let prefix_end = increment_prefix(prefix);
1116
1117 let mut query = storage::table
1118 .filter(sc::block_hash.eq(block_hash.as_bytes()))
1119 .filter(sc::key.ge(prefix))
1120 .select(sc::key)
1121 .into_boxed();
1122
1123 if let Some(ref end) = prefix_end {
1124 query = query.filter(sc::key.lt(end));
1125 }
1126
1127 Ok(query.load::<Vec<u8>>(&mut conn).await?)
1128 }
1129
1130 pub async fn next_key_from_cache(
1139 &self,
1140 block_hash: H256,
1141 prefix: &[u8],
1142 key: &[u8],
1143 ) -> Result<Option<Vec<u8>>, CacheError> {
1144 use crate::schema::storage::columns as sc;
1145
1146 let mut conn = self.get_conn().await?;
1147 let prefix_end = increment_prefix(prefix);
1148
1149 let mut query = storage::table
1150 .filter(sc::block_hash.eq(block_hash.as_bytes()))
1151 .filter(sc::key.gt(key))
1152 .filter(sc::key.ge(prefix))
1153 .filter(sc::is_empty.eq(false))
1154 .select(sc::key)
1155 .order(sc::key.asc())
1156 .limit(1)
1157 .into_boxed();
1158
1159 if let Some(ref end) = prefix_end {
1160 query = query.filter(sc::key.lt(end));
1161 }
1162
1163 Ok(query.first::<Vec<u8>>(&mut conn).await.optional()?)
1164 }
1165
1166 pub async fn count_keys_by_prefix(
1171 &self,
1172 block_hash: H256,
1173 prefix: &[u8],
1174 ) -> Result<usize, CacheError> {
1175 use crate::schema::storage::columns as sc;
1176
1177 let mut conn = self.get_conn().await?;
1178 let prefix_end = increment_prefix(prefix);
1179
1180 let mut query = storage::table
1181 .filter(sc::block_hash.eq(block_hash.as_bytes()))
1182 .filter(sc::key.ge(prefix))
1183 .into_boxed();
1184
1185 if let Some(ref end) = prefix_end {
1186 query = query.filter(sc::key.lt(end));
1187 }
1188
1189 let count: i64 = query.count().get_result(&mut conn).await?;
1190
1191 Ok(count as usize)
1192 }
1193}
1194
1195fn increment_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
1198 let mut result = prefix.to_vec();
1199 for i in (0..result.len()).rev() {
1201 if result[i] < 0xFF {
1202 result[i] += 1;
1203 result.truncate(i + 1);
1204 return Some(result);
1205 }
1206 }
1207 None
1209}
1210
1211fn is_locked_error(e: &DieselError) -> bool {
1212 match e {
1213 DieselError::DatabaseError(_, info) => {
1214 let msg = info.message().to_ascii_lowercase();
1215 msg.contains(lock_patterns::DATABASE_IS_LOCKED) || msg.contains(lock_patterns::BUSY)
1216 },
1217 _ => false,
1218 }
1219}
1220
1221pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");
1223
1224#[cfg(test)]
1225mod tests {
1226 use super::*;
1227
1228 #[tokio::test(flavor = "multi_thread")]
1229 async fn in_memory_cache_works() {
1230 let cache = StorageCache::in_memory().await.unwrap();
1231
1232 let block_hash = H256::from([1u8; 32]);
1233 let key = b"test_key";
1234 let value = b"test_value";
1235
1236 assert!(cache.get_storage(block_hash, key).await.unwrap().is_none());
1238
1239 cache.set_storage(block_hash, key, Some(value)).await.unwrap();
1241
1242 let cached = cache.get_storage(block_hash, key).await.unwrap();
1244 assert_eq!(cached, Some(Some(value.to_vec())));
1245 }
1246
1247 #[tokio::test(flavor = "multi_thread")]
1248 async fn cache_empty_value() {
1249 let cache = StorageCache::in_memory().await.unwrap();
1250
1251 let block_hash = H256::from([2u8; 32]);
1252 let key = b"empty_key";
1253
1254 cache.set_storage(block_hash, key, None).await.unwrap();
1256
1257 let cached = cache.get_storage(block_hash, key).await.unwrap();
1259 assert_eq!(cached, Some(None));
1260 }
1261
1262 #[tokio::test(flavor = "multi_thread")]
1263 async fn batch_operations() {
1264 let cache = StorageCache::in_memory().await.unwrap();
1265
1266 let block_hash = H256::from([3u8; 32]);
1267 let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
1268 (b"key1", Some(b"value1")),
1269 (b"key2", Some(b"value2")),
1270 (b"key3", None), ];
1272
1273 cache.set_storage_batch(block_hash, &entries).await.unwrap();
1275
1276 let keys: Vec<&[u8]> = vec![b"key1", b"key2", b"key3", b"key4"];
1278 let results = cache.get_storage_batch(block_hash, &keys).await.unwrap();
1279
1280 assert_eq!(results.len(), 4);
1281 assert_eq!(results[0], Some(Some(b"value1".to_vec())));
1282 assert_eq!(results[1], Some(Some(b"value2".to_vec())));
1283 assert_eq!(results[2], Some(None)); assert_eq!(results[3], None); }
1286
1287 #[tokio::test(flavor = "multi_thread")]
1288 async fn block_caching() {
1289 let cache = StorageCache::in_memory().await.unwrap();
1290
1291 let hash = H256::from([4u8; 32]);
1292 let parent_hash = H256::from([3u8; 32]);
1293 let header = b"mock_header_data";
1294
1295 cache.cache_block(hash, 100, parent_hash, header).await.unwrap();
1297
1298 let block = cache.get_block(hash).await.unwrap().unwrap();
1300 assert_eq!(block.hash, hash.as_bytes().to_vec());
1301 assert_eq!(block.number, 100i64);
1302 assert_eq!(block.parent_hash, parent_hash.as_bytes().to_vec());
1303 assert_eq!(block.header, header.to_vec());
1304 }
1305
1306 #[tokio::test(flavor = "multi_thread")]
1307 async fn get_block_with_non_cached_block() {
1308 let cache = StorageCache::in_memory().await.unwrap();
1309
1310 let hash = H256::from([4u8; 32]);
1311
1312 let block = cache.get_block(hash).await.unwrap();
1314
1315 assert!(block.is_none());
1316 }
1317
1318 #[tokio::test(flavor = "multi_thread")]
1319 async fn get_block_number_corrupted_block_number_fails() {
1320 let cache = StorageCache::in_memory().await.unwrap();
1321
1322 let hash1 = H256::from([4u8; 32]);
1323 let hash2 = H256::from([5u8; 32]);
1324 let parent_hash = H256::from([3u8; 32]);
1325 let header = b"mock_header_data";
1326
1327 let invalid_block1 = NewBlockRow {
1329 hash: hash1.as_bytes(),
1330 number: -1, parent_hash: parent_hash.as_bytes(),
1332 header,
1333 };
1334
1335 let invalid_block2 = NewBlockRow {
1337 hash: hash2.as_bytes(),
1338 number: u32::MAX as i64 + 1,
1339 parent_hash: parent_hash.as_bytes(),
1340 header,
1341 };
1342
1343 match &cache.inner {
1345 StorageConn::Single(m) => {
1346 let mut conn = m.lock().await;
1347 for block in [invalid_block1, invalid_block2] {
1348 diesel::insert_into(blocks::table)
1349 .values(&block)
1350 .execute(&mut *conn)
1351 .await
1352 .unwrap();
1353 }
1354 },
1355 _ => unreachable!("Test single connection; qed;"),
1356 }
1357
1358 assert!(
1360 matches!(cache.get_block(hash1).await, Err(CacheError::DataCorruption(msg)) if msg == errors::BLOCK_NUMBER_OUT_OF_U32_RANGE)
1361 );
1362 assert!(
1363 matches!(cache.get_block(hash2).await, Err(CacheError::DataCorruption(msg)) if msg == errors::BLOCK_NUMBER_OUT_OF_U32_RANGE)
1364 );
1365 }
1366
1367 #[tokio::test(flavor = "multi_thread")]
1368 async fn different_blocks_have_separate_storage() {
1369 let cache = StorageCache::in_memory().await.unwrap();
1370
1371 let block1 = H256::from([5u8; 32]);
1372 let block2 = H256::from([6u8; 32]);
1373 let key = b"same_key";
1374
1375 cache.set_storage(block1, key, Some(b"value1")).await.unwrap();
1376 cache.set_storage(block2, key, Some(b"value2")).await.unwrap();
1377
1378 let cached1 = cache.get_storage(block1, key).await.unwrap();
1379 let cached2 = cache.get_storage(block2, key).await.unwrap();
1380
1381 assert_eq!(cached1, Some(Some(b"value1".to_vec())));
1382 assert_eq!(cached2, Some(Some(b"value2".to_vec())));
1383 }
1384
1385 #[tokio::test(flavor = "multi_thread")]
1386 async fn clear_block_removes_data() {
1387 let cache = StorageCache::in_memory().await.unwrap();
1388
1389 let hash = H256::from([7u8; 32]);
1390 let parent_hash = H256::from([6u8; 32]);
1391 let key = b"test_key";
1392
1393 cache.set_storage(hash, key, Some(b"value")).await.unwrap();
1394 cache.cache_block(hash, 50, parent_hash, b"header").await.unwrap();
1395
1396 assert!(cache.get_storage(hash, key).await.unwrap().is_some());
1398 assert!(cache.get_block(hash).await.unwrap().is_some());
1399
1400 cache.clear_block(hash).await.unwrap();
1402
1403 assert!(cache.get_storage(hash, key).await.unwrap().is_none());
1405 assert!(cache.get_block(hash).await.unwrap().is_none());
1406 }
1407
1408 #[tokio::test(flavor = "multi_thread")]
1409 async fn file_persistence() {
1410 let temp_dir = tempfile::tempdir().unwrap();
1411 let db_path = temp_dir.path().join("test_cache.db");
1412
1413 let block_hash = H256::from([8u8; 32]);
1414 let key = b"persistent_key";
1415 let value = b"persistent_value";
1416
1417 {
1419 let cache = StorageCache::open(Some(&db_path)).await.unwrap();
1420 cache.set_storage(block_hash, key, Some(value)).await.unwrap();
1421 }
1422
1423 {
1425 let cache = StorageCache::open(Some(&db_path)).await.unwrap();
1426 let cached = cache.get_storage(block_hash, key).await.unwrap();
1427 assert_eq!(cached, Some(Some(value.to_vec())));
1428 }
1429 }
1430
1431 #[tokio::test(flavor = "multi_thread")]
1432 async fn concurrent_access() {
1433 let temp_dir = tempfile::tempdir().unwrap();
1434 let db_path = temp_dir.path().join("concurrent_test.db");
1435 let cache = StorageCache::open(Some(&db_path)).await.unwrap();
1436
1437 let block_hash = H256::from([9u8; 32]);
1438
1439 let mut handles = vec![];
1442 for i in 0..10u8 {
1443 let cache = cache.clone();
1444 let handle = tokio::spawn(async move {
1445 let key = format!("key_{}", i);
1446 let value = format!("value_{}", i);
1447 cache.set_storage(block_hash, key.as_bytes(), Some(value.as_bytes())).await
1448 });
1449 handles.push(handle);
1450 }
1451
1452 for handle in handles {
1454 handle.await.unwrap().unwrap();
1455 }
1456
1457 let mut read_handles = vec![];
1459 for i in 0..10u8 {
1460 let cache = cache.clone();
1461 let handle = tokio::spawn(async move {
1462 let key = format!("key_{}", i);
1463 cache.get_storage(block_hash, key.as_bytes()).await
1464 });
1465 read_handles.push((i, handle));
1466 }
1467
1468 for (i, handle) in read_handles {
1470 let result = handle.await.unwrap().unwrap();
1471 let expected_value = format!("value_{}", i);
1472 assert_eq!(result, Some(Some(expected_value.into_bytes())));
1473 }
1474
1475 let cache1 = cache.clone();
1477 let cache2 = cache.clone();
1478 let block_hash2 = H256::from([10u8; 32]);
1479
1480 let batch_handle1 = tokio::spawn(async move {
1481 let keys: Vec<Vec<u8>> = (0..5).map(|i| format!("batch1_{}", i).into_bytes()).collect();
1482 let values: Vec<Vec<u8>> = (0..5).map(|i| vec![i]).collect();
1483 let entries: Vec<(&[u8], Option<&[u8]>)> = keys
1484 .iter()
1485 .zip(values.iter())
1486 .map(|(k, v)| (k.as_slice(), Some(v.as_slice())))
1487 .collect();
1488 cache1.set_storage_batch(block_hash2, &entries).await
1489 });
1490
1491 let batch_handle2 = tokio::spawn(async move {
1492 let keys: Vec<Vec<u8>> =
1493 (5..10).map(|i| format!("batch2_{}", i).into_bytes()).collect();
1494 let values: Vec<Vec<u8>> = (5..10).map(|i| vec![i]).collect();
1495 let entries: Vec<(&[u8], Option<&[u8]>)> = keys
1496 .iter()
1497 .zip(values.iter())
1498 .map(|(k, v)| (k.as_slice(), Some(v.as_slice())))
1499 .collect();
1500 cache2.set_storage_batch(block_hash2, &entries).await
1501 });
1502
1503 batch_handle1.await.unwrap().unwrap();
1504 batch_handle2.await.unwrap().unwrap();
1505
1506 let keys: Vec<Vec<u8>> = (0..5).map(|i| format!("batch1_{}", i).into_bytes()).collect();
1508 let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
1509 let results = cache.get_storage_batch(block_hash2, &key_refs).await.unwrap();
1510 for (i, result) in results.iter().enumerate() {
1511 assert_eq!(*result, Some(Some(vec![i as u8])));
1512 }
1513 }
1514
1515 #[tokio::test(flavor = "multi_thread")]
1516 async fn get_storage_batch_with_duplicate_keys() {
1517 let cache = StorageCache::in_memory().await.unwrap();
1518
1519 let block_hash = H256::from([11u8; 32]);
1520 let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
1521 (b"key1", Some(b"value1")),
1522 (b"key2", Some(b"value2")),
1523 (b"key3", Some(b"value3")),
1524 ];
1525
1526 cache.set_storage_batch(block_hash, &entries).await.unwrap();
1528
1529 let keys: Vec<&[u8]> = vec![b"key1", b"key2", b"key1", b"key3", b"key2", b"key2"];
1531 let results = cache.get_storage_batch(block_hash, &keys).await;
1532
1533 assert!(matches!(results, Err(CacheError::DuplicatedKeys)));
1534 }
1535
1536 #[tokio::test(flavor = "multi_thread")]
1537 async fn set_storage_batch_with_duplicate_keys() {
1538 let cache = StorageCache::in_memory().await.unwrap();
1539
1540 let block_hash = H256::from([12u8; 32]);
1541
1542 let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
1544 (b"key1", Some(b"first_value")),
1545 (b"key2", Some(b"value2")),
1546 (b"key1", Some(b"second_value")), (b"key3", Some(b"value3")),
1548 (b"key1", Some(b"final_value")), ];
1550
1551 let result = cache.set_storage_batch(block_hash, &entries).await;
1552 assert!(matches!(result, Err(CacheError::DuplicatedKeys)));
1553 }
1554
1555 #[tokio::test(flavor = "multi_thread")]
1556 async fn prefix_scan_progress_tracking() {
1557 let cache = StorageCache::in_memory().await.unwrap();
1558 let block_hash = H256::from([11u8; 32]);
1559 let prefix = b"balances:";
1560
1561 let progress = cache.get_prefix_scan_progress(block_hash, prefix).await.unwrap();
1563 assert!(progress.is_none());
1564
1565 let last_key = b"balances:account123";
1567 cache.update_prefix_scan(block_hash, prefix, last_key, false).await.unwrap();
1568
1569 let progress = cache.get_prefix_scan_progress(block_hash, prefix).await.unwrap();
1571 assert!(progress.is_some());
1572 let p = progress.unwrap();
1573 assert_eq!(p.last_scanned_key, Some(last_key.to_vec()));
1574 assert!(!p.is_complete);
1575
1576 let final_key = b"balances:zzz";
1578 cache.update_prefix_scan(block_hash, prefix, final_key, true).await.unwrap();
1579
1580 let progress = cache.get_prefix_scan_progress(block_hash, prefix).await.unwrap();
1581 let p = progress.unwrap();
1582 assert_eq!(p.last_scanned_key, Some(final_key.to_vec()));
1583 assert!(p.is_complete);
1584 }
1585
1586 #[tokio::test(flavor = "multi_thread")]
1587 async fn prefix_scan_different_blocks_separate() {
1588 let cache = StorageCache::in_memory().await.unwrap();
1589 let block1 = H256::from([12u8; 32]);
1590 let block2 = H256::from([13u8; 32]);
1591 let prefix = b"system:";
1592
1593 cache.update_prefix_scan(block1, prefix, b"system:key1", true).await.unwrap();
1595
1596 let p1 = cache.get_prefix_scan_progress(block1, prefix).await.unwrap();
1598 assert!(p1.is_some());
1599 assert!(p1.unwrap().is_complete);
1600
1601 let p2 = cache.get_prefix_scan_progress(block2, prefix).await.unwrap();
1603 assert!(p2.is_none());
1604 }
1605
1606 #[tokio::test(flavor = "multi_thread")]
1607 async fn get_keys_by_prefix_works() {
1608 let cache = StorageCache::in_memory().await.unwrap();
1609 let block_hash = H256::from([14u8; 32]);
1610
1611 let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
1613 (b"tokens:alice", Some(b"100")),
1614 (b"tokens:bob", Some(b"200")),
1615 (b"tokens:charlie", Some(b"300")),
1616 (b"balances:alice", Some(b"50")),
1617 (b"balances:bob", Some(b"75")),
1618 ];
1619 cache.set_storage_batch(block_hash, &entries).await.unwrap();
1620
1621 let token_keys = cache.get_keys_by_prefix(block_hash, b"tokens:").await.unwrap();
1623 assert_eq!(token_keys.len(), 3);
1624 assert!(token_keys.contains(&b"tokens:alice".to_vec()));
1625 assert!(token_keys.contains(&b"tokens:bob".to_vec()));
1626 assert!(token_keys.contains(&b"tokens:charlie".to_vec()));
1627
1628 let balance_keys = cache.get_keys_by_prefix(block_hash, b"balances:").await.unwrap();
1630 assert_eq!(balance_keys.len(), 2);
1631 assert!(balance_keys.contains(&b"balances:alice".to_vec()));
1632 assert!(balance_keys.contains(&b"balances:bob".to_vec()));
1633
1634 let empty_keys = cache.get_keys_by_prefix(block_hash, b"nonexistent:").await.unwrap();
1636 assert!(empty_keys.is_empty());
1637 }
1638
1639 #[tokio::test(flavor = "multi_thread")]
1640 async fn count_keys_by_prefix_works() {
1641 let cache = StorageCache::in_memory().await.unwrap();
1642 let block_hash = H256::from([15u8; 32]);
1643
1644 let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
1646 (b"prefix_a:1", Some(b"v1")),
1647 (b"prefix_a:2", Some(b"v2")),
1648 (b"prefix_a:3", Some(b"v3")),
1649 (b"prefix_b:1", Some(b"v4")),
1650 ];
1651 cache.set_storage_batch(block_hash, &entries).await.unwrap();
1652
1653 assert_eq!(cache.count_keys_by_prefix(block_hash, b"prefix_a:").await.unwrap(), 3);
1654 assert_eq!(cache.count_keys_by_prefix(block_hash, b"prefix_b:").await.unwrap(), 1);
1655 assert_eq!(cache.count_keys_by_prefix(block_hash, b"prefix_c:").await.unwrap(), 0);
1656 }
1657
1658 #[tokio::test(flavor = "multi_thread")]
1659 async fn next_key_from_cache_works() {
1660 let cache = StorageCache::in_memory().await.unwrap();
1661 let block_hash = H256::from([20u8; 32]);
1662
1663 let entries: Vec<(&[u8], Option<&[u8]>)> = vec![
1665 (b"prefix:aaa", Some(b"v1")),
1666 (b"prefix:bbb", Some(b"v2")),
1667 (b"prefix:ccc", Some(b"v3")),
1668 (b"other:ddd", Some(b"v4")),
1669 ];
1670 cache.set_storage_batch(block_hash, &entries).await.unwrap();
1671
1672 let next = cache.next_key_from_cache(block_hash, b"prefix:", b"prefix:aaa").await.unwrap();
1674 assert_eq!(next, Some(b"prefix:bbb".to_vec()));
1675
1676 let next = cache.next_key_from_cache(block_hash, b"prefix:", b"prefix:bbb").await.unwrap();
1678 assert_eq!(next, Some(b"prefix:ccc".to_vec()));
1679
1680 let next = cache.next_key_from_cache(block_hash, b"prefix:", b"prefix:ccc").await.unwrap();
1682 assert!(next.is_none());
1683
1684 let next = cache.next_key_from_cache(block_hash, b"prefix:", b"prefix:").await.unwrap();
1686 assert_eq!(next, Some(b"prefix:aaa".to_vec()));
1687 }
1688
1689 #[test]
1690 fn increment_prefix_works() {
1691 assert_eq!(increment_prefix(b"abc"), Some(b"abd".to_vec()));
1693
1694 assert_eq!(increment_prefix(b"ab\xff"), Some(b"ac".to_vec()));
1696
1697 assert_eq!(increment_prefix(b"a\xff\xff"), Some(b"b".to_vec()));
1699
1700 assert_eq!(increment_prefix(b"\xff\xff\xff"), None);
1702
1703 assert_eq!(increment_prefix(b""), None);
1705
1706 assert_eq!(increment_prefix(b"a"), Some(b"b".to_vec()));
1708 }
1709
1710 #[tokio::test(flavor = "multi_thread")]
1711 async fn clear_block_removes_prefix_scans() {
1712 let cache = StorageCache::in_memory().await.unwrap();
1713 let hash = H256::from([16u8; 32]);
1714 let prefix = b"test:";
1715
1716 cache.update_prefix_scan(hash, prefix, b"test:key", true).await.unwrap();
1718 assert!(cache.get_prefix_scan_progress(hash, prefix).await.unwrap().is_some());
1719
1720 cache.clear_block(hash).await.unwrap();
1722
1723 assert!(cache.get_prefix_scan_progress(hash, prefix).await.unwrap().is_none());
1725 }
1726
1727 #[tokio::test(flavor = "multi_thread")]
1730 async fn get_local_key_returns_none_for_nonexistent_key() {
1731 let cache = StorageCache::in_memory().await.unwrap();
1732
1733 let result = cache.get_local_key(b"nonexistent_key").await.unwrap();
1734 assert!(result.is_none());
1735 }
1736
1737 #[tokio::test(flavor = "multi_thread")]
1738 async fn insert_local_key_creates_new_key() {
1739 let cache = StorageCache::in_memory().await.unwrap();
1740 let key = b"new_key";
1741
1742 let key_id = cache.insert_local_key(key).await.unwrap();
1744 assert_eq!(key_id, 1);
1745
1746 let result = cache.get_local_key(key).await.unwrap();
1748 assert!(result.is_some());
1749 assert_eq!(result.unwrap().id, key_id);
1750 }
1751
1752 #[tokio::test(flavor = "multi_thread")]
1753 async fn insert_local_key_returns_existing_id() {
1754 let cache = StorageCache::in_memory().await.unwrap();
1755 let key = b"duplicate_key";
1756
1757 let key_id1 = cache.insert_local_key(key).await.unwrap();
1759 let key_id2 = cache.insert_local_key(key).await.unwrap();
1760
1761 assert_eq!(key_id1, key_id2);
1763 }
1764
1765 #[tokio::test(flavor = "multi_thread")]
1766 async fn insert_and_get_local_value_at_block() {
1767 let cache = StorageCache::in_memory().await.unwrap();
1768 let key = b"test_key";
1769 let value = b"test_value";
1770
1771 let key_id = cache.insert_local_key(key).await.unwrap();
1773 cache.insert_local_value(key_id, Some(value), 100).await.unwrap();
1774
1775 let result = cache.get_local_value_at_block(key, 100).await.unwrap();
1777 assert_eq!(result, Some(Some(value.to_vec())));
1778
1779 let result = cache.get_local_value_at_block(key, 150).await.unwrap();
1781 assert_eq!(result, Some(Some(value.to_vec())));
1782
1783 let result = cache.get_local_value_at_block(key, 99).await.unwrap();
1785 assert!(result.is_none());
1786 }
1787
1788 #[tokio::test(flavor = "multi_thread")]
1789 async fn get_local_value_at_block_nonexistent_key() {
1790 let cache = StorageCache::in_memory().await.unwrap();
1791
1792 let result = cache.get_local_value_at_block(b"nonexistent", 100).await.unwrap();
1793 assert!(result.is_none());
1794 }
1795
1796 #[tokio::test(flavor = "multi_thread")]
1797 async fn close_local_value_sets_valid_until() {
1798 let cache = StorageCache::in_memory().await.unwrap();
1799 let key = b"closing_key";
1800 let value1 = b"value1";
1801 let value2 = b"value2";
1802
1803 let key_id = cache.insert_local_key(key).await.unwrap();
1805 cache.insert_local_value(key_id, Some(value1), 100).await.unwrap();
1806
1807 cache.close_local_value(key_id, 150).await.unwrap();
1809 cache.insert_local_value(key_id, Some(value2), 150).await.unwrap();
1810
1811 let result = cache.get_local_value_at_block(key, 120).await.unwrap();
1813 assert_eq!(result, Some(Some(value1.to_vec())));
1814
1815 let result = cache.get_local_value_at_block(key, 150).await.unwrap();
1817 assert_eq!(result, Some(Some(value2.to_vec())));
1818
1819 let result = cache.get_local_value_at_block(key, 200).await.unwrap();
1821 assert_eq!(result, Some(Some(value2.to_vec())));
1822 }
1823
1824 #[tokio::test(flavor = "multi_thread")]
1825 async fn get_local_values_at_block_batch_works() {
1826 let cache = StorageCache::in_memory().await.unwrap();
1827
1828 let key1 = b"batch_key1";
1829 let key2 = b"batch_key2";
1830 let key3 = b"batch_key3";
1831 let value1 = b"batch_value1";
1832 let value2 = b"batch_value2";
1833
1834 let key_id1 = cache.insert_local_key(key1).await.unwrap();
1836 let key_id2 = cache.insert_local_key(key2).await.unwrap();
1837 cache.insert_local_value(key_id1, Some(value1), 100).await.unwrap();
1838 cache.insert_local_value(key_id2, Some(value2), 100).await.unwrap();
1839
1840 let keys: Vec<&[u8]> = vec![key1, key2, key3];
1842 let results = cache.get_local_values_at_block_batch(&keys, 100).await.unwrap();
1843
1844 assert_eq!(results.len(), 3);
1845 assert_eq!(results[0], Some(Some(value1.to_vec())));
1846 assert_eq!(results[1], Some(Some(value2.to_vec())));
1847 assert!(results[2].is_none()); }
1849
1850 #[tokio::test(flavor = "multi_thread")]
1851 async fn get_local_values_at_block_batch_respects_validity() {
1852 let cache = StorageCache::in_memory().await.unwrap();
1853
1854 let key = b"validity_key";
1855 let value1 = b"value_v1";
1856 let value2 = b"value_v2";
1857
1858 let key_id = cache.insert_local_key(key).await.unwrap();
1860 cache.insert_local_value(key_id, Some(value1), 100).await.unwrap();
1861 cache.close_local_value(key_id, 200).await.unwrap();
1862 cache.insert_local_value(key_id, Some(value2), 200).await.unwrap();
1863
1864 let keys: Vec<&[u8]> = vec![key];
1866
1867 let results = cache.get_local_values_at_block_batch(&keys, 150).await.unwrap();
1868 assert_eq!(results[0], Some(Some(value1.to_vec())));
1869
1870 let results = cache.get_local_values_at_block_batch(&keys, 200).await.unwrap();
1871 assert_eq!(results[0], Some(Some(value2.to_vec())));
1872
1873 let results = cache.get_local_values_at_block_batch(&keys, 99).await.unwrap();
1874 assert!(results[0].is_none());
1875 }
1876
1877 #[tokio::test(flavor = "multi_thread")]
1878 async fn get_local_values_at_block_batch_with_duplicate_keys() {
1879 let cache = StorageCache::in_memory().await.unwrap();
1880
1881 let key = b"dup_key";
1882 let keys: Vec<&[u8]> = vec![key, key]; let result = cache.get_local_values_at_block_batch(&keys, 100).await;
1885 assert!(matches!(result, Err(CacheError::DuplicatedKeys)));
1886 }
1887
1888 #[tokio::test(flavor = "multi_thread")]
1889 async fn clear_local_storage_removes_all_data() {
1890 let cache = StorageCache::in_memory().await.unwrap();
1891
1892 let key1 = b"clear_key1";
1893 let key2 = b"clear_key2";
1894 let value = b"some_value";
1895
1896 let key_id1 = cache.insert_local_key(key1).await.unwrap();
1898 let key_id2 = cache.insert_local_key(key2).await.unwrap();
1899 cache.insert_local_value(key_id1, Some(value), 100).await.unwrap();
1900 cache.insert_local_value(key_id2, Some(value), 100).await.unwrap();
1901
1902 assert!(cache.get_local_key(key1).await.unwrap().is_some());
1904 assert!(cache.get_local_key(key2).await.unwrap().is_some());
1905 assert!(cache.get_local_value_at_block(key1, 100).await.unwrap().is_some());
1906
1907 cache.clear_local_storage().await.unwrap();
1909
1910 assert!(cache.get_local_key(key1).await.unwrap().is_none());
1912 assert!(cache.get_local_key(key2).await.unwrap().is_none());
1913 assert!(cache.get_local_value_at_block(key1, 100).await.unwrap().is_none());
1914 }
1915
1916 #[tokio::test(flavor = "multi_thread")]
1917 async fn get_local_keys_at_block_returns_live_keys() {
1918 let cache = StorageCache::in_memory().await.unwrap();
1919
1920 let k1 = b"pallet:alice";
1922 let k2 = b"pallet:bob";
1923 let k3 = b"other:charlie";
1924
1925 let id1 = cache.insert_local_key(k1).await.unwrap();
1926 let id2 = cache.insert_local_key(k2).await.unwrap();
1927 let id3 = cache.insert_local_key(k3).await.unwrap();
1928
1929 cache.insert_local_value(id1, Some(b"v1"), 100).await.unwrap();
1931 cache.insert_local_value(id2, Some(b"v2"), 200).await.unwrap();
1933 cache.insert_local_value(id3, Some(b"v3"), 100).await.unwrap();
1935
1936 let keys = cache.get_local_keys_at_block(b"pallet:", 150).await.unwrap();
1938 assert_eq!(keys, vec![k1.to_vec()]);
1939
1940 let keys = cache.get_local_keys_at_block(b"pallet:", 200).await.unwrap();
1942 assert_eq!(keys, vec![k1.to_vec(), k2.to_vec()]);
1943
1944 let keys = cache.get_local_keys_at_block(b"pallet:", 99).await.unwrap();
1946 assert!(keys.is_empty());
1947 }
1948
1949 #[tokio::test(flavor = "multi_thread")]
1950 async fn get_local_keys_at_block_excludes_deleted() {
1951 let cache = StorageCache::in_memory().await.unwrap();
1952
1953 let k1 = b"pallet:alice";
1954 let id1 = cache.insert_local_key(k1).await.unwrap();
1955
1956 cache.insert_local_value(id1, Some(b"v1"), 100).await.unwrap();
1958 cache.close_local_value(id1, 200).await.unwrap();
1959 cache.insert_local_value(id1, None, 200).await.unwrap();
1960
1961 let keys = cache.get_local_keys_at_block(b"pallet:", 150).await.unwrap();
1963 assert_eq!(keys, vec![k1.to_vec()]);
1964
1965 let keys = cache.get_local_keys_at_block(b"pallet:", 200).await.unwrap();
1967 assert!(keys.is_empty());
1968 }
1969
1970 #[tokio::test(flavor = "multi_thread")]
1971 async fn get_local_deleted_keys_at_block_works() {
1972 let cache = StorageCache::in_memory().await.unwrap();
1973
1974 let k1 = b"pallet:alice";
1975 let id1 = cache.insert_local_key(k1).await.unwrap();
1976
1977 cache.insert_local_value(id1, Some(b"v1"), 100).await.unwrap();
1979 cache.close_local_value(id1, 200).await.unwrap();
1980 cache.insert_local_value(id1, None, 200).await.unwrap();
1981
1982 let deleted = cache.get_local_deleted_keys_at_block(b"pallet:", 150).await.unwrap();
1984 assert!(deleted.is_empty());
1985
1986 let deleted = cache.get_local_deleted_keys_at_block(b"pallet:", 200).await.unwrap();
1988 assert_eq!(deleted, vec![k1.to_vec()]);
1989 }
1990
1991 #[tokio::test(flavor = "multi_thread")]
1992 async fn prefix_upper_bound_works() {
1993 assert_eq!(StorageCache::prefix_upper_bound(b"abc"), Some(b"abd".to_vec()));
1995 assert_eq!(StorageCache::prefix_upper_bound(b"ab\xff"), Some(b"ac".to_vec()));
1997 assert_eq!(StorageCache::prefix_upper_bound(b"\xff\xff"), None);
1999 assert_eq!(StorageCache::prefix_upper_bound(b""), None);
2001 }
2002}