1pub mod delta_tracking;
15pub mod merkle_patricia_trie;
16pub mod storage;
17
18pub use storage_core::*;
19
20#[cfg(feature = "state-translation")]
21pub mod state_translation;
22
23#[cfg(feature = "stress-test")]
26pub mod stress_test;
27
28#[cfg(feature = "stress-test")]
30pub mod stress_tests {
31 use crate::DefaultDB;
32 use crate::db::DB;
33 use crate::storable::Loader;
34 use crate::{self as storage, Storage, arena::Sp, storage::Array};
35 use serialize::Serializable;
36 use storage_core::arena::*;
37
38 fn new_arena() -> Arena<DefaultDB> {
39 let storage = Storage::<DefaultDB>::new(16, DefaultDB::default());
40 storage.arena
41 }
42
43 pub fn drop_deeply_nested_data() {
46 use bin_tree::BinTree;
47
48 let arena = new_arena();
49 let mut bt = BinTree::new(0, None, None);
50 let depth = 100_000;
51 for i in 1..depth {
52 bt = BinTree::new(i, Some(arena.alloc(bt)), None);
53 }
54 }
55
56 pub fn serialize_deeply_nested_data() {
59 use bin_tree::BinTree;
60
61 let arena = new_arena();
62 let mut bt = BinTree::new(0, None, None);
63 let depth = 100_000;
64 for i in 1..depth {
65 bt = BinTree::new(i, Some(arena.alloc(bt)), None);
66 }
67
68 let mut buf = std::vec::Vec::new();
69 Sp::serialize(&arena.alloc(bt), &mut buf).unwrap();
70 }
71
72 pub fn array_nesting() {
75 use super::Storable;
76 #[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq, Hash, Storable)]
77 struct Nesty(Array<Nesty>);
78 impl Drop for Nesty {
79 fn drop(&mut self) {
80 if self.0.is_empty() {
81 return;
82 }
83 let take = |ptr: &mut Nesty| {
84 let mut tmp = Array::new();
85 std::mem::swap(&mut tmp, &mut ptr.0);
86 tmp
87 };
88 let mut frontier = vec![take(self)];
89 while let Some(curr) = frontier.pop() {
90 let items = curr.iter().collect::<std::vec::Vec<_>>();
91 drop(curr);
92 frontier.extend(
93 items
94 .into_iter()
95 .flat_map(Sp::into_inner)
96 .map(|mut n| take(&mut n)),
97 );
98 }
99 }
100 }
101 let mut nest = Nesty(Array::new());
102 for i in 0..16_000 {
103 nest = Nesty(vec![nest].into());
104 if i % 100 == 0 {
105 dbg!(i);
106 }
107 }
108 drop(nest);
109 println!("drop(nest) returned!");
111 }
112
113 #[cfg(feature = "sqlite")]
115 pub fn thrash_the_cache_variations_sqldb(args: &[String]) {
116 use crate::db::SqlDB;
117 fn mk_open_db() -> impl Fn() -> SqlDB {
118 let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
119 move || SqlDB::exclusive_file(&path)
120 }
121 thrash_the_cache_variations(args, mk_open_db);
122 }
123
124 #[cfg(feature = "parity-db")]
126 pub fn thrash_the_cache_variations_paritydb(args: &[String]) {
127 use crate::db::ParityDb;
128 fn mk_open_db() -> impl Fn() -> ParityDb {
129 let path = tempfile::TempDir::new().unwrap().keep();
130 move || ParityDb::open(&path)
131 }
132 thrash_the_cache_variations(args, mk_open_db);
133 }
134
135 #[cfg(any(feature = "parity-db", feature = "sqlite"))]
136 fn thrash_the_cache_variations<D: DB, O: Fn() -> D>(
137 args: &[String],
138 mk_open_db: impl Fn() -> O,
139 ) {
140 let msg = "thrash_the_cache_variations(p: f64, include_cyclic: bool)";
141 if args.len() != 2 {
142 panic!("{msg}: wrong number of args");
143 }
144 let p = args[0]
145 .parse::<f64>()
146 .unwrap_or_else(|e| panic!("{msg}: couldn't parse p={}: {e}", args[0]));
147 let include_cyclic = args[1]
148 .parse()
149 .unwrap_or_else(|e| panic!("{msg}: couldn't parse include_cyclic={}: {e}", args[1]));
150 thrash_the_cache_variations_inner(p, include_cyclic, mk_open_db);
151 }
152
153 #[cfg(any(feature = "parity-db", feature = "sqlite"))]
158 fn thrash_the_cache_variations_inner<D: DB, O: Fn() -> D>(
159 p: f64,
160 include_cyclic: bool,
161 mk_open_db: impl Fn() -> O,
162 ) {
163 let num_lookups = 100_000;
164 thrash_the_cache(1000, p, num_lookups, false, mk_open_db());
165 if include_cyclic {
166 thrash_the_cache(1000, p, num_lookups, true, mk_open_db());
167 }
168 thrash_the_cache(10_000, p, num_lookups, false, mk_open_db());
169 if include_cyclic {
170 thrash_the_cache(10_000, p, num_lookups, true, mk_open_db());
171 }
172 }
173
174 #[cfg(any(feature = "parity-db", feature = "sqlite"))]
199 fn thrash_the_cache<D: DB>(
200 num_values: usize,
201 p: f64,
202 num_lookups: usize,
203 is_cyclic: bool,
204 open_db: impl Fn() -> D,
205 ) {
206 use crate::storage::Storage;
207 use rand::Rng;
208 use std::io::{Write, stdout};
209 use std::{collections::HashMap, time::Instant};
210
211 assert!(p > 0.0 && p <= 1.0, "Cache proportion must be in (0,1]");
212
213 let db = open_db();
214 let cache_size = (num_values as f64 * p) as usize;
215 let storage = Storage::new(cache_size, db);
216 let arena = storage.arena;
217 let mut key_map = HashMap::new();
218 let mut rng = rand::thread_rng();
219
220 let prefix = format!(
221 "thrash_the_cache(num_values={}, p={}, num_lookups={}, is_cyclic={})",
222 num_values, p, num_lookups, is_cyclic
223 );
224
225 let start_time = Instant::now();
227 println!("{prefix} inserting data:");
228 for x in 0..num_values {
229 if x % (num_values / 100) == 0 {
230 print!(".");
231 stdout().flush().unwrap();
232 }
233 let mut sp = arena.alloc(x as u64);
234 sp.persist();
235 key_map.insert(x, sp.as_typed_key());
236 }
237 let elapsed = start_time.elapsed();
238 println!("{:.2?}", elapsed);
239
240 let start_time = Instant::now();
242 print!("{prefix} flushing to disk: ");
243 arena.with_backend(|b| b.flush_all_changes_to_db());
244 drop(arena);
245 let db = open_db();
246 let storage = Storage::new(cache_size, db);
247 let arena = storage.arena;
248 let elapsed = start_time.elapsed();
249 println!("{:.2?}", elapsed);
250
251 println!("{prefix} warming up the cache:");
253 let start_time = Instant::now();
254 for x in 0..num_values {
255 if x % (num_values / 100) == 0 {
256 print!(".");
257 stdout().flush().unwrap();
258 }
259 let hash = key_map.get(&x).unwrap();
260 arena.get::<u64>(hash).unwrap();
261 }
262 let elapsed = start_time.elapsed();
263 println!("{:.2?}", elapsed);
264
265 let xs: std::vec::Vec<_> = if is_cyclic {
267 (0..num_values).cycle().take(num_lookups).collect()
268 } else {
269 (0..num_lookups)
270 .map(|_| rng.gen_range(0..num_values))
271 .collect()
272 };
273
274 println!("{prefix} fetching data:");
276 let start_time = Instant::now();
277 for (i, x) in xs.into_iter().enumerate() {
278 if i % (num_lookups / 100) == 0 {
279 print!(".");
280 stdout().flush().unwrap();
281 }
282 let hash = key_map.get(&x).unwrap();
283 arena.get::<u64>(hash).unwrap();
284 }
285 let elapsed = start_time.elapsed();
286 println!("{:.2?}", elapsed);
287 println!();
288 }
289
290 #[cfg(feature = "sqlite")]
323 pub fn load_large_tree_sqldb(args: &[String]) {
324 use crate::db::SqlDB;
325 let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
326 let open_db = || SqlDB::<crate::DefaultHasher>::exclusive_file(&path);
327 load_large_tree(args, open_db);
328 }
329
330 #[cfg(feature = "parity-db")]
360 pub fn load_large_tree_paritydb(args: &[String]) {
361 use crate::db::ParityDb;
362 let path = tempfile::TempDir::new().unwrap().keep();
363 let open_db = || ParityDb::<crate::DefaultHasher>::open(&path);
364 load_large_tree(args, open_db);
365 }
366
367 #[cfg(any(feature = "parity-db", feature = "sqlite"))]
368 fn load_large_tree<D: DB>(args: &[String], open_db: impl Fn() -> D) {
369 let msg = "load_large_tree(height: usize)";
370 if args.len() != 1 {
371 panic!("{msg}: wrong number of args");
372 }
373 let height = args[0]
374 .parse()
375 .unwrap_or_else(|e| panic!("{msg}: couldn't parse height={}: {e}", args[0]));
376 load_large_tree_inner(height, open_db);
377 }
378
379 #[cfg(any(feature = "parity-db", feature = "sqlite"))]
387 fn load_large_tree_inner<D: DB>(height: usize, open_db: impl Fn() -> D) {
388 use crate::storage::Storage;
389 use bin_tree::*;
390
391 let cache_size = 1 << height;
392 let sum = (1 << (height - 1)) * ((1 << height) - 1);
394 let mut timer = crate::test::Timer::new("load_large_tree");
395
396 let key = {
400 let db = open_db();
401 let storage = Storage::new(cache_size, db);
402 let arena = storage.arena;
403 timer.delta("init");
404
405 let mut bt = counting_tree(&arena, height);
406 timer.delta("create tree");
407
408 bt.persist();
409 arena.with_backend(|b| b.flush_all_changes_to_db());
410 timer.delta("persist tree to disk");
411
412 bt.as_typed_key()
413 };
414 timer.delta("drop");
415
416 {
418 let db = open_db();
419 let storage = Storage::new(cache_size, db);
420 let arena = storage.arena;
421 timer.delta("init");
422
423 let bt = arena.get_lazy::<BinTree<D>>(&key).unwrap();
424 assert_eq!(bt.sum(), sum);
425 timer.delta("lazy load and traverse tree, no prefetch");
426 }
427 timer.delta("drop");
428
429 {
431 let db = open_db();
432 let storage = Storage::new(cache_size, db);
433 let arena = storage.arena;
434 timer.delta("init");
435
436 let max_depth = Some(height);
437 let truncate = false;
438 arena.with_backend(|b| {
439 key.key
440 .refs()
441 .iter()
442 .for_each(|hash| b.pre_fetch(hash, max_depth, truncate))
443 });
444 let bt = arena.get_lazy::<BinTree<D>>(&key).unwrap();
445 assert_eq!(bt.sum(), sum);
446 timer.delta("lazy load and traverse tree, with prefetch");
447 }
448 timer.delta("drop");
449
450 {
452 let db = open_db();
453 let storage = Storage::new(cache_size, db);
454 let arena = storage.arena;
455 timer.delta("init");
456
457 let bt = arena.get::<BinTree<D>>(&key).unwrap();
458 assert_eq!(bt.sum(), sum);
459 timer.delta("eager load and traverse tree, no prefetch");
460 }
461 timer.delta("drop");
462
463 {
465 let db = open_db();
466 let storage = Storage::new(cache_size, db);
467 let arena = storage.arena;
468 timer.delta("init");
469
470 let max_depth = Some(height);
471 let truncate = false;
472 arena.with_backend(|b| {
473 key.key
474 .refs()
475 .iter()
476 .for_each(|hash| b.pre_fetch(hash, max_depth, truncate))
477 });
478 let bt = arena.get::<BinTree<D>>(&key).unwrap();
479 assert_eq!(bt.sum(), sum);
480 timer.delta("eager load and traverse tree, with prefetch");
481 }
482 timer.delta("drop");
483 }
484
485 pub fn read_write_map_loop<D: DB>(args: &[String]) {
488 let msg = "read_write_map_loop(num_operations: usize, flush_interval: usize)";
489 if args.len() != 2 {
490 panic!("{msg}: wrong number of args");
491 }
492 let num_operations = args[0]
493 .parse()
494 .unwrap_or_else(|e| panic!("{msg}: couldn't parse num_operations={}: {e}", args[0]));
495 let flush_interval = args[1]
496 .parse()
497 .unwrap_or_else(|e| panic!("{msg}: couldn't parse flush_interval={}: {e}", args[1]));
498 read_write_map_loop_inner::<D>(num_operations, flush_interval);
499 }
500
501 fn read_write_map_loop_inner<D: DB>(num_operations: usize, flush_interval: usize) {
534 use crate::storage::{Map, Storage, WrappedDB, set_default_storage};
535 use rand::{Rng, seq::SliceRandom as _};
536 use serde_json::json;
537 use std::io::{Write, stdout};
538 use std::time::Instant;
539
540 struct Tag;
542 type DB<D> = WrappedDB<D, Tag>;
543
544 let storage = set_default_storage(Storage::<DB<D>>::default).unwrap();
545
546 let mut rng = rand::thread_rng();
547 let mut map = Map::<u128, u128, DB<D>>::new();
548 let mut keys = vec![];
549 let mut total_write_time = std::time::Duration::new(0, 0);
550 let mut total_read_time = std::time::Duration::new(0, 0);
551 let mut total_flush_time = std::time::Duration::new(0, 0);
552 let mut reads = 0;
553 let mut writes = 0;
554 let mut flushes = 0;
555 let prefix = format!(
556 "read_write_map_loop(num_operations={num_operations}, flush_interval={flush_interval})"
557 );
558
559 let mut time_series: std::vec::Vec<serde_json::Value> = vec![];
560
561 println!("{prefix} running: ");
562 let start = Instant::now();
563 for i in 1..=num_operations {
564 if i % (num_operations / 100) == 0 {
566 print!(".");
567 stdout().flush().unwrap();
568 }
569
570 if i % 2 == 0 {
572 let key = keys.choose(&mut rng).unwrap();
574 let read_start = Instant::now();
575 let _ = map.get(key);
576 total_read_time += read_start.elapsed();
577 reads += 1;
578 } else {
579 let key = rng.r#gen::<u128>();
581 keys.push(key);
582 let value = rng.r#gen::<u128>();
583 let write_start = Instant::now();
584 map = map.insert(key, value);
585 total_write_time += write_start.elapsed();
586 writes += 1;
587 }
588
589 if i % flush_interval == 0 {
591 let cache_size = storage.arena.with_backend(|b| b.get_write_cache_len());
593 let cache_bytes = storage
594 .arena
595 .with_backend(|b| b.get_write_cache_obj_bytes());
596
597 let flush_start = Instant::now();
598 storage
599 .arena
600 .with_backend(|backend| backend.flush_all_changes_to_db());
601 let flush_time = flush_start.elapsed();
602 total_flush_time += flush_time;
603 flushes += 1;
604
605 let map_size = map.size();
607 let map_size_ratio = flush_time / (map_size as u32);
608 let cache_size_ratio = flush_time / (cache_size as u32);
609 let cache_bytes_ratio = flush_time / (cache_bytes as u32);
610 println!(
611 "ft: {:0.2?}; ms: {}; ft/ms: {:0.2?}; cs: {}; ft/cs: {:0.2?}; cb: {}; ft/cb: {:0.2?}; cb/cs: {}",
612 flush_time,
613 map_size,
614 map_size_ratio,
615 cache_size,
616 cache_size_ratio,
617 cache_bytes,
618 cache_bytes_ratio,
619 cache_bytes / cache_size,
620 );
621 time_series.push(json!({
622 "i": i,
623 "flush_time": flush_time.as_secs_f32(),
624 "map_size": map_size,
625 "cache_size": cache_size,
626 "cache_bytes": cache_bytes,
627 }));
628 }
629 }
630 println!();
631
632 let total_time = start.elapsed();
634 println!("{prefix} results:");
635 println!("- total time: {:.2?}", total_time);
636 println!("- operations performed: {num_operations}");
637 println!(
638 " - reads: {} (avg {:.2?} per op)",
639 reads,
640 total_read_time / reads as u32
641 );
642 println!(
643 " - writes: {} (avg {:.2?} per op)",
644 writes,
645 total_write_time / writes as u32
646 );
647 println!(
648 " - flushes: {} (avg {:.2?} per flush)",
649 flushes,
650 total_flush_time / std::cmp::max(flushes, 1) as u32
651 );
652 println!(
653 " - ops/second: {:.0}",
654 num_operations as f64 / total_time.as_secs_f64()
655 );
656
657 let write_json = || -> std::io::Result<()> {
659 let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs();
660 std::fs::create_dir_all("tmp")?;
661 let file_path =
662 format!("tmp/read_write_map_loop.{num_operations}_{flush_interval}.{now:?}.json");
663 let mut file = std::fs::File::create(&file_path)?;
664 let header = json!({
665 "total_time": total_time.as_secs_f32(),
666 "num_operations": num_operations,
667 "flush_interval": flush_interval,
668 "reads": reads,
669 "total_read_time": total_read_time.as_secs_f32(),
670 "writes": writes,
671 "total_write_time": total_write_time.as_secs_f32(),
672 "flushes": flushes,
673 "total_flush_time": total_flush_time.as_secs_f32(),
674 });
675 let json = json!({
676 "header": header,
677 "data": time_series,
678 });
679 writeln!(file, "{}", serde_json::to_string_pretty(&json)?)?;
680 let canon_path = std::path::Path::new(&file_path).canonicalize()?;
681 println!("- JSON stats: {}", canon_path.display());
682 Ok(())
683 };
684 write_json().unwrap();
685 }
686}
687
688#[cfg(test)]
689mod tests {
690 #[cfg(feature = "stress-test")]
691 #[test]
692 fn array_nesting() {
693 crate::stress_test::runner::StressTest::new()
694 .with_max_memory(1 << 30)
695 .run("arena::stress_tests::array_nesting");
696 }
697
698 #[cfg(feature = "stress-test")]
699 #[test]
700 #[should_panic = "has overflowed its stack"]
702 fn drop_deeply_nested_data() {
703 crate::stress_test::runner::StressTest::new()
704 .with_nocapture(false)
706 .run("arena::stress_tests::drop_deeply_nested_data");
707 }
708
709 #[cfg(feature = "stress-test")]
710 #[test]
711 #[should_panic = "has overflowed its stack"]
713 fn serialize_deeply_nested_data() {
714 crate::stress_test::runner::StressTest::new()
715 .with_nocapture(false)
717 .run("arena::stress_tests::serialize_deeply_nested_data");
718 }
719
720 #[cfg(all(feature = "stress-test", feature = "sqlite"))]
721 #[test]
722 fn thrash_the_cache_sqldb() {
723 thrash_the_cache("sqldb");
724 }
725 #[cfg(all(feature = "stress-test", feature = "parity-db"))]
726 #[test]
727 fn thrash_the_cache_paritydb() {
728 thrash_the_cache("paritydb");
729 }
730 #[cfg(all(
731 feature = "stress-test",
732 any(feature = "sqlite", feature = "parity-db")
733 ))]
734 fn thrash_the_cache(db_name: &str) {
736 let test_name = &format!("arena::stress_tests::thrash_the_cache_variations_{db_name}");
737 let time_limit = 10 * 60;
738 crate::stress_test::runner::StressTest::new()
743 .with_max_runtime(time_limit)
744 .run_with_args(test_name, &["0.1", "true"]);
745 crate::stress_test::runner::StressTest::new()
749 .with_max_runtime(time_limit)
750 .run_with_args(test_name, &["0.5", "false"]);
751 crate::stress_test::runner::StressTest::new()
755 .with_max_runtime(time_limit)
756 .run_with_args(test_name, &["0.8", "false"]);
757 crate::stress_test::runner::StressTest::new()
762 .with_max_runtime(time_limit)
763 .run_with_args(test_name, &["1.0", "true"]);
764 }
765
766 #[cfg(all(feature = "stress-test", feature = "sqlite"))]
767 #[test]
768 fn load_large_tree_sqldb() {
769 crate::stress_test::runner::StressTest::new()
770 .with_max_runtime(5 * 60)
771 .with_max_memory(2 << 30)
772 .run_with_args("arena::stress_tests::load_large_tree_sqldb", &["15"]);
773 }
774
775 #[cfg(all(feature = "stress-test", feature = "parity-db"))]
776 #[test]
777 fn load_large_tree_paritydb() {
778 crate::stress_test::runner::StressTest::new()
779 .with_max_runtime(5 * 60)
780 .with_max_memory(2 << 30)
781 .run_with_args("arena::stress_tests::load_large_tree_paritydb", &["15"]);
782 }
783
784 #[cfg(all(feature = "stress-test", feature = "sqlite"))]
785 #[test]
786 fn read_write_map_loop_sqldb() {
787 crate::stress_test::runner::StressTest::new()
788 .with_max_runtime(60)
789 .run_with_args(
790 "arena::stress_tests::read_write_map_loop_sqldb",
791 &["10000", "1000"],
792 );
793 }
794 #[cfg(all(feature = "stress-test", feature = "parity-db"))]
795 #[test]
796 fn read_write_map_loop_paritydb() {
797 crate::stress_test::runner::StressTest::new()
798 .with_max_runtime(60)
799 .run_with_args(
800 "arena::stress_tests::read_write_map_loop_paritydb",
801 &["10000", "1000"],
802 );
803 }
804}