1use std::{fmt, path::PathBuf, str::FromStr};
7
8use anyhow::{anyhow, bail};
9use async_trait::async_trait;
10use linera_core::GenesisConfig;
11use linera_execution::WasmRuntime;
12pub use linera_storage::StorageCacheConfig;
13pub type StorageCacheSizes = StorageCacheConfig;
15use linera_storage::{DbStorage, Storage, WallClock, DEFAULT_NAMESPACE};
16#[cfg(feature = "storage-service")]
17use linera_storage_service::{
18 client::StorageServiceDatabase,
19 common::{StorageServiceStoreConfig, StorageServiceStoreInternalConfig},
20};
21#[cfg(feature = "dynamodb")]
22use linera_views::dynamo_db::{DynamoDbDatabase, DynamoDbStoreConfig, DynamoDbStoreInternalConfig};
23#[cfg(feature = "rocksdb")]
24use linera_views::rocks_db::{
25 PathWithGuard, RocksDbDatabase, RocksDbSpawnMode, RocksDbStoreConfig,
26 RocksDbStoreInternalConfig,
27};
28use linera_views::{
29 lru_prefix_cache::StorageCacheConfig as ViewsStorageCacheConfig,
30 memory::{MemoryDatabase, MemoryStoreConfig},
31 store::{KeyValueDatabase, KeyValueStore},
32};
33use serde::{Deserialize, Serialize};
34use tracing::error;
35#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
36use {
37 linera_storage::ChainStatesFirstAssignment,
38 linera_views::backends::dual::{DualDatabase, DualStoreConfig},
39 std::path::Path,
40};
41#[cfg(feature = "scylladb")]
42use {
43 linera_views::scylla_db::{ScyllaDbDatabase, ScyllaDbStoreConfig, ScyllaDbStoreInternalConfig},
44 std::num::NonZeroU16,
45 tracing::debug,
46};
47
48#[derive(Clone, Debug, clap::Parser)]
49pub struct CommonStorageOptions {
50 #[arg(long, global = true)]
52 pub storage_max_concurrent_queries: Option<usize>,
53
54 #[arg(long, default_value = "10", global = true)]
56 pub storage_max_stream_queries: usize,
57
58 #[arg(long, default_value = "10000000", global = true)]
60 pub storage_max_cache_size: usize,
61
62 #[arg(long, default_value = "1000000", global = true)]
64 pub storage_max_value_entry_size: usize,
65
66 #[arg(long, default_value = "1000000", global = true)]
68 pub storage_max_find_keys_entry_size: usize,
69
70 #[arg(long, default_value = "1000000", global = true)]
72 pub storage_max_find_key_values_entry_size: usize,
73
74 #[arg(long, default_value = "1000", global = true)]
76 pub storage_max_cache_entries: usize,
77
78 #[arg(long, default_value = "10000000", global = true)]
80 pub storage_max_cache_value_size: usize,
81
82 #[arg(long, default_value = "10000000", global = true)]
84 pub storage_max_cache_find_keys_size: usize,
85
86 #[arg(long, default_value = "10000000", global = true)]
88 pub storage_max_cache_find_key_values_size: usize,
89
90 #[arg(long, default_value = "1000", global = true)]
92 pub blob_cache_size: usize,
93
94 #[arg(long, default_value = "1000", global = true)]
96 pub confirmed_block_cache_size: usize,
97
98 #[arg(long, default_value = "1000", global = true)]
100 pub certificate_cache_size: usize,
101
102 #[arg(long, default_value = "1000", global = true)]
104 pub certificate_raw_cache_size: usize,
105
106 #[arg(long, default_value = "1000", global = true)]
108 pub event_cache_size: usize,
109
110 #[arg(long, default_value = "5000", global = true)]
112 pub block_cache_size: usize,
113
114 #[arg(long, default_value = "10000", global = true)]
116 pub execution_state_cache_size: usize,
117
118 #[arg(long, default_value = "1", global = true)]
120 pub storage_replication_factor: u32,
121}
122
123impl CommonStorageOptions {
124 pub fn storage_cache_sizes(&self) -> StorageCacheSizes {
125 StorageCacheSizes {
126 blob_cache_size: self.blob_cache_size,
127 confirmed_block_cache_size: self.confirmed_block_cache_size,
128 certificate_cache_size: self.certificate_cache_size,
129 certificate_raw_cache_size: self.certificate_raw_cache_size,
130 event_cache_size: self.event_cache_size,
131 cache_cleanup_interval_secs: linera_storage::DEFAULT_CLEANUP_INTERVAL_SECS,
132 }
133 }
134
135 pub fn storage_cache_config(&self) -> ViewsStorageCacheConfig {
136 ViewsStorageCacheConfig {
137 max_cache_size: self.storage_max_cache_size,
138 max_value_entry_size: self.storage_max_value_entry_size,
139 max_find_keys_entry_size: self.storage_max_find_keys_entry_size,
140 max_find_key_values_entry_size: self.storage_max_find_key_values_entry_size,
141 max_cache_entries: self.storage_max_cache_entries,
142 max_cache_value_size: self.storage_max_cache_value_size,
143 max_cache_find_keys_size: self.storage_max_cache_find_keys_size,
144 max_cache_find_key_values_size: self.storage_max_cache_find_key_values_size,
145 }
146 }
147
148 pub fn with_defaults() -> Self {
150 use clap::Parser as _;
151 Self::parse_from(std::iter::empty::<String>())
152 }
153}
154
155#[derive(Clone, Debug, Deserialize, Serialize)]
157pub enum StoreConfig {
158 Memory {
160 config: MemoryStoreConfig,
161 namespace: String,
162 genesis_path: PathBuf,
163 },
164 #[cfg(feature = "storage-service")]
166 StorageService {
167 config: StorageServiceStoreConfig,
168 namespace: String,
169 },
170 #[cfg(feature = "rocksdb")]
172 RocksDb {
173 config: RocksDbStoreConfig,
174 namespace: String,
175 },
176 #[cfg(feature = "dynamodb")]
178 DynamoDb {
179 config: DynamoDbStoreConfig,
180 namespace: String,
181 },
182 #[cfg(feature = "scylladb")]
184 ScyllaDb {
185 config: ScyllaDbStoreConfig,
186 namespace: String,
187 },
188 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
189 DualRocksDbScyllaDb {
190 config: DualStoreConfig<RocksDbStoreConfig, ScyllaDbStoreConfig>,
191 namespace: String,
192 },
193}
194
195#[derive(Clone, Debug)]
197#[cfg_attr(any(test), derive(Eq, PartialEq))]
198pub enum InnerStorageConfig {
199 Memory {
201 genesis_path: PathBuf,
204 },
205 #[cfg(feature = "storage-service")]
207 Service {
208 endpoint: String,
210 },
211 #[cfg(feature = "rocksdb")]
213 RocksDb {
214 path: PathBuf,
216 spawn_mode: RocksDbSpawnMode,
218 },
219 #[cfg(feature = "dynamodb")]
221 DynamoDb {
222 use_dynamodb_local: bool,
224 },
225 #[cfg(feature = "scylladb")]
227 ScyllaDb {
228 uri: String,
230 },
231 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
232 DualRocksDbScyllaDb {
233 path_with_guard: PathWithGuard,
235 spawn_mode: RocksDbSpawnMode,
237 uri: String,
239 },
240}
241
242#[derive(Clone, Debug)]
244#[cfg_attr(any(test), derive(Eq, PartialEq))]
245pub struct StorageConfig {
246 pub inner_storage_config: InnerStorageConfig,
248 pub namespace: String,
250}
251
252const MEMORY: &str = "memory:";
253#[cfg(feature = "storage-service")]
254const STORAGE_SERVICE: &str = "service:";
255#[cfg(feature = "rocksdb")]
256const ROCKS_DB: &str = "rocksdb:";
257#[cfg(feature = "dynamodb")]
258const DYNAMO_DB: &str = "dynamodb:";
259#[cfg(feature = "scylladb")]
260const SCYLLA_DB: &str = "scylladb:";
261#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
262const DUAL_ROCKS_DB_SCYLLA_DB: &str = "dualrocksdbscylladb:";
263
264impl FromStr for StorageConfig {
265 type Err = anyhow::Error;
266
267 fn from_str(input: &str) -> Result<Self, Self::Err> {
268 if let Some(s) = input.strip_prefix(MEMORY) {
269 let parts = s.split(':').collect::<Vec<_>>();
270 if parts.len() == 1 {
271 let genesis_path = parts[0].to_string().into();
272 let namespace = DEFAULT_NAMESPACE.to_string();
273 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
274 return Ok(StorageConfig {
275 inner_storage_config,
276 namespace,
277 });
278 }
279 if parts.len() != 2 {
280 bail!("We should have one genesis config path and one optional namespace");
281 }
282 let genesis_path = parts[0].to_string().into();
283 let namespace = parts[1].to_string();
284 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
285 return Ok(StorageConfig {
286 inner_storage_config,
287 namespace,
288 });
289 }
290 #[cfg(feature = "storage-service")]
291 if let Some(s) = input.strip_prefix(STORAGE_SERVICE) {
292 if s.is_empty() {
293 bail!(
294 "For Storage service, the formatting has to be service:endpoint:namespace,\
295example service:tcp:127.0.0.1:7878:table_do_my_test"
296 );
297 }
298 let parts = s.split(':').collect::<Vec<_>>();
299 if parts.len() != 4 {
300 bail!("We should have one endpoint and one namespace");
301 }
302 let protocol = parts[0];
303 if protocol != "tcp" {
304 bail!("Only allowed protocol is tcp");
305 }
306 let endpoint = parts[1];
307 let port = parts[2];
308 let mut endpoint = endpoint.to_string();
309 endpoint.push(':');
310 endpoint.push_str(port);
311 let endpoint = endpoint.to_string();
312 let namespace = parts[3].to_string();
313 let inner_storage_config = InnerStorageConfig::Service { endpoint };
314 return Ok(StorageConfig {
315 inner_storage_config,
316 namespace,
317 });
318 }
319 #[cfg(feature = "rocksdb")]
320 if let Some(s) = input.strip_prefix(ROCKS_DB) {
321 if s.is_empty() {
322 bail!(
323 "For RocksDB, the formatting has to be rocksdb:directory or rocksdb:directory:spawn_mode:namespace");
324 }
325 let parts = s.split(':').collect::<Vec<_>>();
326 if parts.len() == 1 {
327 let path = parts[0].to_string().into();
328 let namespace = DEFAULT_NAMESPACE.to_string();
329 let spawn_mode = RocksDbSpawnMode::SpawnBlocking;
330 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
331 return Ok(StorageConfig {
332 inner_storage_config,
333 namespace,
334 });
335 }
336 if parts.len() == 2 || parts.len() == 3 {
337 let path = parts[0].to_string().into();
338 let spawn_mode_str = parts.get(1).expect("length already checked");
339 let spawn_mode = match *spawn_mode_str {
340 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
341 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
342 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
343 _ => Err(anyhow!(
344 "Failed to parse {} as a spawn_mode",
345 spawn_mode_str
346 )),
347 }?;
348 let namespace = if parts.len() == 2 {
349 DEFAULT_NAMESPACE.to_string()
350 } else {
351 (*parts.get(2).expect("length already checked")).to_string()
352 };
353 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
354 return Ok(StorageConfig {
355 inner_storage_config,
356 namespace,
357 });
358 }
359 bail!("We should have one, two or three parts");
360 }
361 #[cfg(feature = "dynamodb")]
362 if let Some(s) = input.strip_prefix(DYNAMO_DB) {
363 let mut parts = s.splitn(2, ':');
364 let namespace = parts
365 .next()
366 .ok_or_else(|| anyhow!("Missing DynamoDB table name, e.g. {DYNAMO_DB}TABLE"))?
367 .to_string();
368 let use_dynamodb_local = match parts.next() {
369 None | Some("env") => false,
370 Some("dynamodb_local") => true,
371 Some(unknown) => {
372 bail!(
373 "Invalid DynamoDB endpoint {unknown:?}. \
374 Expected {DYNAMO_DB}TABLE:[env|dynamodb_local]"
375 );
376 }
377 };
378 let inner_storage_config = InnerStorageConfig::DynamoDb { use_dynamodb_local };
379 return Ok(StorageConfig {
380 inner_storage_config,
381 namespace,
382 });
383 }
384 #[cfg(feature = "scylladb")]
385 if let Some(s) = input.strip_prefix(SCYLLA_DB) {
386 let mut uri: Option<String> = None;
387 let mut namespace: Option<String> = None;
388 let parse_error: &'static str = "Correct format is tcp:db_hostname:port.";
389 if !s.is_empty() {
390 let mut parts = s.split(':');
391 while let Some(part) = parts.next() {
392 match part {
393 "tcp" => {
394 let address = parts.next().ok_or_else(|| {
395 anyhow!("Failed to find address for {s}. {parse_error}")
396 })?;
397 let port_str = parts.next().ok_or_else(|| {
398 anyhow!("Failed to find port for {s}. {parse_error}")
399 })?;
400 let port = NonZeroU16::from_str(port_str).map_err(|_| {
401 anyhow!(
402 "Failed to find parse port {port_str} for {s}. {parse_error}",
403 )
404 })?;
405 if uri.is_some() {
406 bail!("The uri has already been assigned");
407 }
408 uri = Some(format!("{}:{}", &address, port));
409 }
410 _ if part.starts_with("table") => {
411 if namespace.is_some() {
412 bail!("The namespace has already been assigned");
413 }
414 namespace = Some(part.to_string());
415 }
416 _ => {
417 bail!("the entry \"{part}\" is not matching");
418 }
419 }
420 }
421 }
422 let uri = uri.unwrap_or_else(|| "localhost:9042".to_string());
423 let namespace = namespace.unwrap_or_else(|| DEFAULT_NAMESPACE.to_string());
424 let inner_storage_config = InnerStorageConfig::ScyllaDb { uri };
425 debug!("ScyllaDB connection info: {:?}", inner_storage_config);
426 return Ok(StorageConfig {
427 inner_storage_config,
428 namespace,
429 });
430 }
431 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
432 if let Some(s) = input.strip_prefix(DUAL_ROCKS_DB_SCYLLA_DB) {
433 let parts = s.split(':').collect::<Vec<_>>();
434 if parts.len() != 5 && parts.len() != 6 {
435 bail!(
436 "For DualRocksDbScyllaDb, the formatting has to be dualrocksdbscylladb:directory:mode:tcp:hostname:port:namespace"
437 );
438 }
439 let path = Path::new(parts[0]);
440 let path = path.to_path_buf();
441 let path_with_guard = PathWithGuard::new(path);
442 let spawn_mode_str = parts.get(1).expect("length already checked");
443 let spawn_mode = match *spawn_mode_str {
444 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
445 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
446 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
447 _ => Err(anyhow!(
448 "Failed to parse {} as a spawn_mode",
449 spawn_mode_str
450 )),
451 }?;
452 let protocol = parts[2];
453 if protocol != "tcp" {
454 bail!("The only allowed protocol is tcp");
455 }
456 let address = parts[3];
457 let port_str = parts[4];
458 let port = NonZeroU16::from_str(port_str)
459 .map_err(|_| anyhow!("Failed to find parse port {port_str} for {s}"))?;
460 let uri = format!("{}:{}", &address, port);
461 let inner_storage_config = InnerStorageConfig::DualRocksDbScyllaDb {
462 path_with_guard,
463 spawn_mode,
464 uri,
465 };
466 let namespace = if parts.len() == 5 {
467 DEFAULT_NAMESPACE.to_string()
468 } else {
469 parts[5].to_string()
470 };
471 return Ok(StorageConfig {
472 inner_storage_config,
473 namespace,
474 });
475 }
476 error!("available storage: memory");
477 #[cfg(feature = "storage-service")]
478 error!("Also available is linera-storage-service");
479 #[cfg(feature = "rocksdb")]
480 error!("Also available is RocksDB");
481 #[cfg(feature = "dynamodb")]
482 error!("Also available is DynamoDB");
483 #[cfg(feature = "scylladb")]
484 error!("Also available is ScyllaDB");
485 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
486 error!("Also available is DualRocksDbScyllaDb");
487 Err(anyhow!("The input has not matched: {input}"))
488 }
489}
490
491impl StorageConfig {
492 #[allow(unused_variables)]
493 pub fn maybe_append_shard_path(&mut self, shard: usize) -> std::io::Result<()> {
494 match &mut self.inner_storage_config {
495 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
496 InnerStorageConfig::DualRocksDbScyllaDb {
497 path_with_guard,
498 spawn_mode: _,
499 uri: _,
500 } => {
501 let shard_str = format!("shard_{shard}");
502 path_with_guard.path_buf.push(shard_str);
503 std::fs::create_dir_all(&path_with_guard.path_buf)
504 }
505 _ => Ok(()),
506 }
507 }
508
509 pub fn add_common_storage_options(
511 &self,
512 options: &CommonStorageOptions,
513 ) -> Result<StoreConfig, anyhow::Error> {
514 let namespace = self.namespace.clone();
515 match &self.inner_storage_config {
516 InnerStorageConfig::Memory { genesis_path } => {
517 let config = MemoryStoreConfig {
518 max_stream_queries: options.storage_max_stream_queries,
519 kill_on_drop: false,
520 };
521 let genesis_path = genesis_path.clone();
522 Ok(StoreConfig::Memory {
523 config,
524 namespace,
525 genesis_path,
526 })
527 }
528 #[cfg(feature = "storage-service")]
529 InnerStorageConfig::Service { endpoint } => {
530 let inner_config = StorageServiceStoreInternalConfig {
531 endpoint: endpoint.clone(),
532 max_concurrent_queries: options.storage_max_concurrent_queries,
533 max_stream_queries: options.storage_max_stream_queries,
534 };
535 let config = StorageServiceStoreConfig {
536 inner_config,
537 storage_cache_config: options.storage_cache_config(),
538 };
539 Ok(StoreConfig::StorageService { config, namespace })
540 }
541 #[cfg(feature = "rocksdb")]
542 InnerStorageConfig::RocksDb { path, spawn_mode } => {
543 let path_with_guard = PathWithGuard::new(path.to_path_buf());
544 let inner_config = RocksDbStoreInternalConfig {
545 spawn_mode: *spawn_mode,
546 path_with_guard,
547 max_stream_queries: options.storage_max_stream_queries,
548 };
549 let config = RocksDbStoreConfig {
550 inner_config,
551 storage_cache_config: options.storage_cache_config(),
552 };
553 Ok(StoreConfig::RocksDb { config, namespace })
554 }
555 #[cfg(feature = "dynamodb")]
556 InnerStorageConfig::DynamoDb { use_dynamodb_local } => {
557 let inner_config = DynamoDbStoreInternalConfig {
558 use_dynamodb_local: *use_dynamodb_local,
559 max_concurrent_queries: options.storage_max_concurrent_queries,
560 max_stream_queries: options.storage_max_stream_queries,
561 };
562 let config = DynamoDbStoreConfig {
563 inner_config,
564 storage_cache_config: options.storage_cache_config(),
565 };
566 Ok(StoreConfig::DynamoDb { config, namespace })
567 }
568 #[cfg(feature = "scylladb")]
569 InnerStorageConfig::ScyllaDb { uri } => {
570 let inner_config = ScyllaDbStoreInternalConfig {
571 uri: uri.clone(),
572 max_stream_queries: options.storage_max_stream_queries,
573 max_concurrent_queries: options.storage_max_concurrent_queries,
574 replication_factor: options.storage_replication_factor,
575 };
576 let config = ScyllaDbStoreConfig {
577 inner_config,
578 storage_cache_config: options.storage_cache_config(),
579 };
580 Ok(StoreConfig::ScyllaDb { config, namespace })
581 }
582 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
583 InnerStorageConfig::DualRocksDbScyllaDb {
584 path_with_guard,
585 spawn_mode,
586 uri,
587 } => {
588 let inner_config = RocksDbStoreInternalConfig {
589 spawn_mode: *spawn_mode,
590 path_with_guard: path_with_guard.clone(),
591 max_stream_queries: options.storage_max_stream_queries,
592 };
593 let first_config = RocksDbStoreConfig {
594 inner_config,
595 storage_cache_config: options.storage_cache_config(),
596 };
597
598 let inner_config = ScyllaDbStoreInternalConfig {
599 uri: uri.clone(),
600 max_stream_queries: options.storage_max_stream_queries,
601 max_concurrent_queries: options.storage_max_concurrent_queries,
602 replication_factor: options.storage_replication_factor,
603 };
604 let second_config = ScyllaDbStoreConfig {
605 inner_config,
606 storage_cache_config: options.storage_cache_config(),
607 };
608
609 let config = DualStoreConfig {
610 first_config,
611 second_config,
612 };
613 Ok(StoreConfig::DualRocksDbScyllaDb { config, namespace })
614 }
615 }
616 }
617}
618
619impl fmt::Display for StorageConfig {
620 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
621 let namespace = &self.namespace;
622 match &self.inner_storage_config {
623 #[cfg(feature = "storage-service")]
624 InnerStorageConfig::Service { endpoint } => {
625 write!(f, "service:tcp:{endpoint}:{namespace}")
626 }
627 InnerStorageConfig::Memory { genesis_path } => {
628 write!(f, "memory:{}:{namespace}", genesis_path.display())
629 }
630 #[cfg(feature = "rocksdb")]
631 InnerStorageConfig::RocksDb { path, spawn_mode } => {
632 let spawn_mode = spawn_mode.to_string();
633 write!(f, "rocksdb:{}:{spawn_mode}:{namespace}", path.display())
634 }
635 #[cfg(feature = "dynamodb")]
636 InnerStorageConfig::DynamoDb { use_dynamodb_local } => match use_dynamodb_local {
637 true => write!(f, "dynamodb:{namespace}:dynamodb_local"),
638 false => write!(f, "dynamodb:{namespace}:env"),
639 },
640 #[cfg(feature = "scylladb")]
641 InnerStorageConfig::ScyllaDb { uri } => {
642 write!(f, "scylladb:tcp:{uri}:{namespace}")
643 }
644 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
645 InnerStorageConfig::DualRocksDbScyllaDb {
646 path_with_guard,
647 spawn_mode,
648 uri,
649 } => {
650 write!(
651 f,
652 "dualrocksdbscylladb:{}:{}:tcp:{}:{}",
653 path_with_guard.path_buf.display(),
654 spawn_mode,
655 uri,
656 namespace
657 )
658 }
659 }
660 }
661}
662
663#[async_trait]
664pub trait Runnable {
665 type Output;
666
667 async fn run<S>(self, storage: S) -> Self::Output
668 where
669 S: Storage + Clone + Send + Sync + 'static;
670}
671
672#[async_trait]
673pub trait RunnableWithStore {
674 type Output;
675
676 async fn run<D>(
677 self,
678 config: D::Config,
679 namespace: String,
680 cache_sizes: StorageCacheSizes,
681 ) -> Result<Self::Output, anyhow::Error>
682 where
683 D: KeyValueDatabase + Clone + Send + Sync + 'static,
684 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
685 D::Error: Send + Sync;
686}
687
688fn read_json<T: serde::de::DeserializeOwned>(path: impl Into<PathBuf>) -> anyhow::Result<T> {
690 Ok(serde_json::from_reader(fs_err::File::open(path.into())?)?)
691}
692
693impl StoreConfig {
694 pub async fn run_with_storage<Job>(
695 self,
696 wasm_runtime: Option<WasmRuntime>,
697 allow_application_logs: bool,
698 cache_sizes: StorageCacheSizes,
699 job: Job,
700 ) -> Result<Job::Output, anyhow::Error>
701 where
702 Job: Runnable,
703 {
704 match self {
705 StoreConfig::Memory {
706 config,
707 namespace,
708 genesis_path,
709 } => {
710 let mut storage = DbStorage::<MemoryDatabase, _>::maybe_create_and_connect(
711 &config,
712 &namespace,
713 wasm_runtime,
714 cache_sizes,
715 )
716 .await?
717 .with_allow_application_logs(allow_application_logs);
718 let genesis_config = read_json::<GenesisConfig>(genesis_path)?;
719 genesis_config.initialize_storage(&mut storage).await?;
721 Ok(job.run(storage).await)
722 }
723 #[cfg(feature = "storage-service")]
724 StoreConfig::StorageService { config, namespace } => {
725 let storage = DbStorage::<StorageServiceDatabase, _>::connect(
726 &config,
727 &namespace,
728 wasm_runtime,
729 cache_sizes,
730 )
731 .await?
732 .with_allow_application_logs(allow_application_logs);
733 Ok(job.run(storage).await)
734 }
735 #[cfg(feature = "rocksdb")]
736 StoreConfig::RocksDb { config, namespace } => {
737 let storage = DbStorage::<RocksDbDatabase, _>::connect(
738 &config,
739 &namespace,
740 wasm_runtime,
741 cache_sizes,
742 )
743 .await?
744 .with_allow_application_logs(allow_application_logs);
745 Ok(job.run(storage).await)
746 }
747 #[cfg(feature = "dynamodb")]
748 StoreConfig::DynamoDb { config, namespace } => {
749 let storage = DbStorage::<DynamoDbDatabase, _>::connect(
750 &config,
751 &namespace,
752 wasm_runtime,
753 cache_sizes,
754 )
755 .await?
756 .with_allow_application_logs(allow_application_logs);
757 Ok(job.run(storage).await)
758 }
759 #[cfg(feature = "scylladb")]
760 StoreConfig::ScyllaDb { config, namespace } => {
761 let storage = DbStorage::<ScyllaDbDatabase, _>::connect(
762 &config,
763 &namespace,
764 wasm_runtime,
765 cache_sizes,
766 )
767 .await?
768 .with_allow_application_logs(allow_application_logs);
769 Ok(job.run(storage).await)
770 }
771 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
772 StoreConfig::DualRocksDbScyllaDb { config, namespace } => {
773 let storage =
774 DbStorage::<
775 DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>,
776 _,
777 >::connect(&config, &namespace, wasm_runtime, cache_sizes)
778 .await?
779 .with_allow_application_logs(allow_application_logs);
780 Ok(job.run(storage).await)
781 }
782 }
783 }
784
785 #[allow(unused_variables)]
786 pub async fn run_with_store<Job>(
787 self,
788 cache_sizes: StorageCacheSizes,
789 job: Job,
790 ) -> Result<Job::Output, anyhow::Error>
791 where
792 Job: RunnableWithStore,
793 {
794 match self {
795 StoreConfig::Memory { .. } => {
796 Err(anyhow!("Cannot run admin operations on the memory store"))
797 }
798 #[cfg(feature = "storage-service")]
799 StoreConfig::StorageService { config, namespace } => Ok(job
800 .run::<StorageServiceDatabase>(config, namespace, cache_sizes)
801 .await?),
802 #[cfg(feature = "rocksdb")]
803 StoreConfig::RocksDb { config, namespace } => Ok(job
804 .run::<RocksDbDatabase>(config, namespace, cache_sizes)
805 .await?),
806 #[cfg(feature = "dynamodb")]
807 StoreConfig::DynamoDb { config, namespace } => Ok(job
808 .run::<DynamoDbDatabase>(config, namespace, cache_sizes)
809 .await?),
810 #[cfg(feature = "scylladb")]
811 StoreConfig::ScyllaDb { config, namespace } => Ok(job
812 .run::<ScyllaDbDatabase>(config, namespace, cache_sizes)
813 .await?),
814 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
815 StoreConfig::DualRocksDbScyllaDb { config, namespace } => Ok(job
816 .run::<DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>>(
817 config,
818 namespace,
819 cache_sizes,
820 )
821 .await?),
822 }
823 }
824}
825
826pub struct StorageMigration;
827
828#[async_trait]
829impl RunnableWithStore for StorageMigration {
830 type Output = ();
831
832 async fn run<D>(
833 self,
834 config: D::Config,
835 namespace: String,
836 cache_sizes: StorageCacheSizes,
837 ) -> Result<Self::Output, anyhow::Error>
838 where
839 D: KeyValueDatabase + Clone + Send + Sync + 'static,
840 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
841 D::Error: Send + Sync,
842 {
843 if D::exists(&config, &namespace).await? {
844 let wasm_runtime = None;
845 let storage =
846 DbStorage::<D, WallClock>::connect(&config, &namespace, wasm_runtime, cache_sizes)
847 .await?;
848 storage.migrate_if_needed().await?;
849 }
850 Ok(())
851 }
852}
853
854pub struct AssertStorageV1;
855
856#[async_trait]
857impl RunnableWithStore for AssertStorageV1 {
858 type Output = ();
859
860 async fn run<D>(
861 self,
862 config: D::Config,
863 namespace: String,
864 cache_sizes: StorageCacheSizes,
865 ) -> Result<Self::Output, anyhow::Error>
866 where
867 D: KeyValueDatabase + Clone + Send + Sync + 'static,
868 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
869 D::Error: Send + Sync,
870 {
871 if D::exists(&config, &namespace).await? {
872 let wasm_runtime = None;
873 let storage =
874 DbStorage::<D, WallClock>::connect(&config, &namespace, wasm_runtime, cache_sizes)
875 .await?;
876 storage.assert_is_migrated_storage().await?;
877 }
878 Ok(())
879 }
880}
881
882#[test]
883fn test_memory_storage_config_from_str() {
884 assert_eq!(
885 StorageConfig::from_str("memory:path/to/genesis.json").unwrap(),
886 StorageConfig {
887 inner_storage_config: InnerStorageConfig::Memory {
888 genesis_path: PathBuf::from("path/to/genesis.json")
889 },
890 namespace: DEFAULT_NAMESPACE.into()
891 }
892 );
893 assert_eq!(
894 StorageConfig::from_str("memory:path/to/genesis.json:namespace").unwrap(),
895 StorageConfig {
896 inner_storage_config: InnerStorageConfig::Memory {
897 genesis_path: PathBuf::from("path/to/genesis.json")
898 },
899 namespace: "namespace".into()
900 }
901 );
902 assert!(StorageConfig::from_str("memory").is_err(),);
903}
904
905#[cfg(feature = "storage-service")]
906#[test]
907fn test_shared_store_config_from_str() {
908 assert_eq!(
909 StorageConfig::from_str("service:tcp:127.0.0.1:8942:linera").unwrap(),
910 StorageConfig {
911 inner_storage_config: InnerStorageConfig::Service {
912 endpoint: "127.0.0.1:8942".to_string()
913 },
914 namespace: "linera".into()
915 }
916 );
917 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:8942").is_err());
918 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:linera").is_err());
919}
920
921#[cfg(feature = "rocksdb")]
922#[test]
923fn test_rocks_db_storage_config_from_str() {
924 assert!(StorageConfig::from_str("rocksdb_foo.db").is_err());
925 assert_eq!(
926 StorageConfig::from_str("rocksdb:foo.db").unwrap(),
927 StorageConfig {
928 inner_storage_config: InnerStorageConfig::RocksDb {
929 path: "foo.db".into(),
930 spawn_mode: RocksDbSpawnMode::SpawnBlocking,
931 },
932 namespace: DEFAULT_NAMESPACE.to_string()
933 }
934 );
935 assert_eq!(
936 StorageConfig::from_str("rocksdb:foo.db:block_in_place").unwrap(),
937 StorageConfig {
938 inner_storage_config: InnerStorageConfig::RocksDb {
939 path: "foo.db".into(),
940 spawn_mode: RocksDbSpawnMode::BlockInPlace,
941 },
942 namespace: DEFAULT_NAMESPACE.to_string()
943 }
944 );
945 assert_eq!(
946 StorageConfig::from_str("rocksdb:foo.db:block_in_place:chosen_namespace").unwrap(),
947 StorageConfig {
948 inner_storage_config: InnerStorageConfig::RocksDb {
949 path: "foo.db".into(),
950 spawn_mode: RocksDbSpawnMode::BlockInPlace,
951 },
952 namespace: "chosen_namespace".into()
953 }
954 );
955}
956
957#[cfg(feature = "dynamodb")]
958#[test]
959fn test_aws_storage_config_from_str() {
960 assert_eq!(
961 StorageConfig::from_str("dynamodb:table").unwrap(),
962 StorageConfig {
963 inner_storage_config: InnerStorageConfig::DynamoDb {
964 use_dynamodb_local: false
965 },
966 namespace: "table".to_string()
967 }
968 );
969 assert_eq!(
970 StorageConfig::from_str("dynamodb:table:env").unwrap(),
971 StorageConfig {
972 inner_storage_config: InnerStorageConfig::DynamoDb {
973 use_dynamodb_local: false
974 },
975 namespace: "table".to_string()
976 }
977 );
978 assert_eq!(
979 StorageConfig::from_str("dynamodb:table:dynamodb_local").unwrap(),
980 StorageConfig {
981 inner_storage_config: InnerStorageConfig::DynamoDb {
982 use_dynamodb_local: true
983 },
984 namespace: "table".to_string()
985 }
986 );
987 assert!(StorageConfig::from_str("dynamodb").is_err());
988 assert!(StorageConfig::from_str("dynamodb:").is_err());
989 assert!(StorageConfig::from_str("dynamodb:1").is_err());
990 assert!(StorageConfig::from_str("dynamodb:wrong:endpoint").is_err());
991}
992
993#[cfg(feature = "scylladb")]
994#[test]
995fn test_scylla_db_storage_config_from_str() {
996 assert_eq!(
997 StorageConfig::from_str("scylladb:").unwrap(),
998 StorageConfig {
999 inner_storage_config: InnerStorageConfig::ScyllaDb {
1000 uri: "localhost:9042".to_string()
1001 },
1002 namespace: DEFAULT_NAMESPACE.to_string()
1003 }
1004 );
1005 assert_eq!(
1006 StorageConfig::from_str("scylladb:tcp:db_hostname:230:table_other_storage").unwrap(),
1007 StorageConfig {
1008 inner_storage_config: InnerStorageConfig::ScyllaDb {
1009 uri: "db_hostname:230".to_string()
1010 },
1011 namespace: "table_other_storage".to_string()
1012 }
1013 );
1014 assert_eq!(
1015 StorageConfig::from_str("scylladb:tcp:db_hostname:230").unwrap(),
1016 StorageConfig {
1017 inner_storage_config: InnerStorageConfig::ScyllaDb {
1018 uri: "db_hostname:230".to_string()
1019 },
1020 namespace: DEFAULT_NAMESPACE.to_string()
1021 }
1022 );
1023 assert!(StorageConfig::from_str("scylladb:-10").is_err());
1024 assert!(StorageConfig::from_str("scylladb:70000").is_err());
1025 assert!(StorageConfig::from_str("scylladb:230:234").is_err());
1026 assert!(StorageConfig::from_str("scylladb:tcp:address1").is_err());
1027 assert!(StorageConfig::from_str("scylladb:tcp:address1:tcp:/address2").is_err());
1028 assert!(StorageConfig::from_str("scylladb:wrong").is_err());
1029}