Skip to main content

linera_storage_runtime/
lib.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Storage configuration and runtime infrastructure for the Linera protocol.
5
6use std::{fmt, path::PathBuf, str::FromStr};
7
8use anyhow::{anyhow, bail};
9use async_trait::async_trait;
10use linera_core::GenesisConfig;
11use linera_execution::WasmRuntime;
12pub use linera_storage::StorageCacheConfig;
13/// Backward-compatible alias.
14pub type StorageCacheSizes = StorageCacheConfig;
15use linera_storage::{DbStorage, Storage, WallClock, DEFAULT_NAMESPACE};
16#[cfg(feature = "storage-service")]
17use linera_storage_service::{
18    client::StorageServiceDatabase,
19    common::{StorageServiceStoreConfig, StorageServiceStoreInternalConfig},
20};
21#[cfg(feature = "dynamodb")]
22use linera_views::dynamo_db::{DynamoDbDatabase, DynamoDbStoreConfig, DynamoDbStoreInternalConfig};
23#[cfg(feature = "rocksdb")]
24use linera_views::rocks_db::{
25    PathWithGuard, RocksDbDatabase, RocksDbSpawnMode, RocksDbStoreConfig,
26    RocksDbStoreInternalConfig,
27};
28use linera_views::{
29    lru_prefix_cache::StorageCacheConfig as ViewsStorageCacheConfig,
30    memory::{MemoryDatabase, MemoryStoreConfig},
31    store::{KeyValueDatabase, KeyValueStore},
32};
33use serde::{Deserialize, Serialize};
34use tracing::error;
35#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
36use {
37    linera_storage::ChainStatesFirstAssignment,
38    linera_views::backends::dual::{DualDatabase, DualStoreConfig},
39    std::path::Path,
40};
41#[cfg(feature = "scylladb")]
42use {
43    linera_views::scylla_db::{ScyllaDbDatabase, ScyllaDbStoreConfig, ScyllaDbStoreInternalConfig},
44    std::num::NonZeroU16,
45    tracing::debug,
46};
47
48#[derive(Clone, Debug, clap::Parser)]
49pub struct CommonStorageOptions {
50    /// The maximal number of simultaneous queries to the database
51    #[arg(long, global = true)]
52    pub storage_max_concurrent_queries: Option<usize>,
53
54    /// The maximal number of simultaneous stream queries to the database
55    #[arg(long, default_value = "10", global = true)]
56    pub storage_max_stream_queries: usize,
57
58    /// The maximal memory used in the storage cache.
59    #[arg(long, default_value = "10000000", global = true)]
60    pub storage_max_cache_size: usize,
61
62    /// The maximal size of a value entry in the storage cache.
63    #[arg(long, default_value = "1000000", global = true)]
64    pub storage_max_value_entry_size: usize,
65
66    /// The maximal size of a find-keys entry in the storage cache.
67    #[arg(long, default_value = "1000000", global = true)]
68    pub storage_max_find_keys_entry_size: usize,
69
70    /// The maximal size of a find-key-values entry in the storage cache.
71    #[arg(long, default_value = "1000000", global = true)]
72    pub storage_max_find_key_values_entry_size: usize,
73
74    /// The maximal number of entries in the storage cache.
75    #[arg(long, default_value = "1000", global = true)]
76    pub storage_max_cache_entries: usize,
77
78    /// The maximal memory used in the value cache.
79    #[arg(long, default_value = "10000000", global = true)]
80    pub storage_max_cache_value_size: usize,
81
82    /// The maximal memory used in the find_keys_by_prefix cache.
83    #[arg(long, default_value = "10000000", global = true)]
84    pub storage_max_cache_find_keys_size: usize,
85
86    /// The maximal memory used in the find_key_values_by_prefix cache.
87    #[arg(long, default_value = "10000000", global = true)]
88    pub storage_max_cache_find_key_values_size: usize,
89
90    /// The maximal number of entries in the blob cache.
91    #[arg(long, default_value = "1000", global = true)]
92    pub blob_cache_size: usize,
93
94    /// The maximal number of entries in the confirmed block cache.
95    #[arg(long, default_value = "1000", global = true)]
96    pub confirmed_block_cache_size: usize,
97
98    /// The maximal number of entries in the confirmed block certificate cache.
99    #[arg(long, default_value = "1000", global = true)]
100    pub certificate_cache_size: usize,
101
102    /// The maximal number of entries in the raw certificate cache.
103    #[arg(long, default_value = "1000", global = true)]
104    pub certificate_raw_cache_size: usize,
105
106    /// The maximal number of entries in the event cache.
107    #[arg(long, default_value = "1000", global = true)]
108    pub event_cache_size: usize,
109
110    /// The number of entries in the block cache.
111    #[arg(long, default_value = "5000", global = true)]
112    pub block_cache_size: usize,
113
114    /// The number of entries in the execution state cache.
115    #[arg(long, default_value = "10000", global = true)]
116    pub execution_state_cache_size: usize,
117
118    /// The replication factor for the keyspace
119    #[arg(long, default_value = "1", global = true)]
120    pub storage_replication_factor: u32,
121}
122
123impl CommonStorageOptions {
124    pub fn storage_cache_sizes(&self) -> StorageCacheSizes {
125        StorageCacheSizes {
126            blob_cache_size: self.blob_cache_size,
127            confirmed_block_cache_size: self.confirmed_block_cache_size,
128            certificate_cache_size: self.certificate_cache_size,
129            certificate_raw_cache_size: self.certificate_raw_cache_size,
130            event_cache_size: self.event_cache_size,
131            cache_cleanup_interval_secs: linera_storage::DEFAULT_CLEANUP_INTERVAL_SECS,
132        }
133    }
134
135    pub fn storage_cache_config(&self) -> ViewsStorageCacheConfig {
136        ViewsStorageCacheConfig {
137            max_cache_size: self.storage_max_cache_size,
138            max_value_entry_size: self.storage_max_value_entry_size,
139            max_find_keys_entry_size: self.storage_max_find_keys_entry_size,
140            max_find_key_values_entry_size: self.storage_max_find_key_values_entry_size,
141            max_cache_entries: self.storage_max_cache_entries,
142            max_cache_value_size: self.storage_max_cache_value_size,
143            max_cache_find_keys_size: self.storage_max_cache_find_keys_size,
144            max_cache_find_key_values_size: self.storage_max_cache_find_key_values_size,
145        }
146    }
147
148    /// Returns options matching the clap-defined defaults.
149    pub fn with_defaults() -> Self {
150        use clap::Parser as _;
151        Self::parse_from(std::iter::empty::<String>())
152    }
153}
154
155/// The configuration of the key value store in use.
156#[derive(Clone, Debug, Deserialize, Serialize)]
157pub enum StoreConfig {
158    /// The memory key value store
159    Memory {
160        config: MemoryStoreConfig,
161        namespace: String,
162        genesis_path: PathBuf,
163    },
164    /// The storage service key-value store
165    #[cfg(feature = "storage-service")]
166    StorageService {
167        config: StorageServiceStoreConfig,
168        namespace: String,
169    },
170    /// The RocksDB key value store
171    #[cfg(feature = "rocksdb")]
172    RocksDb {
173        config: RocksDbStoreConfig,
174        namespace: String,
175    },
176    /// The DynamoDB key value store
177    #[cfg(feature = "dynamodb")]
178    DynamoDb {
179        config: DynamoDbStoreConfig,
180        namespace: String,
181    },
182    /// The ScyllaDB key value store
183    #[cfg(feature = "scylladb")]
184    ScyllaDb {
185        config: ScyllaDbStoreConfig,
186        namespace: String,
187    },
188    #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
189    DualRocksDbScyllaDb {
190        config: DualStoreConfig<RocksDbStoreConfig, ScyllaDbStoreConfig>,
191        namespace: String,
192    },
193}
194
195/// The description of a storage implementation.
196#[derive(Clone, Debug)]
197#[cfg_attr(any(test), derive(Eq, PartialEq))]
198pub enum InnerStorageConfig {
199    /// The memory description.
200    Memory {
201        /// The path to the genesis configuration. This is needed because we reinitialize
202        /// memory databases from the genesis config everytime.
203        genesis_path: PathBuf,
204    },
205    /// The storage service description.
206    #[cfg(feature = "storage-service")]
207    Service {
208        /// The endpoint used.
209        endpoint: String,
210    },
211    /// The RocksDB description.
212    #[cfg(feature = "rocksdb")]
213    RocksDb {
214        /// The path used.
215        path: PathBuf,
216        /// Whether to use `block_in_place` or `spawn_blocking`.
217        spawn_mode: RocksDbSpawnMode,
218    },
219    /// The DynamoDB description.
220    #[cfg(feature = "dynamodb")]
221    DynamoDb {
222        /// Whether to use the DynamoDB Local system
223        use_dynamodb_local: bool,
224    },
225    /// The ScyllaDB description.
226    #[cfg(feature = "scylladb")]
227    ScyllaDb {
228        /// The URI for accessing the database.
229        uri: String,
230    },
231    #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
232    DualRocksDbScyllaDb {
233        /// The path used.
234        path_with_guard: PathWithGuard,
235        /// Whether to use `block_in_place` or `spawn_blocking`.
236        spawn_mode: RocksDbSpawnMode,
237        /// The URI for accessing the database.
238        uri: String,
239    },
240}
241
242/// The description of a storage implementation.
243#[derive(Clone, Debug)]
244#[cfg_attr(any(test), derive(Eq, PartialEq))]
245pub struct StorageConfig {
246    /// The inner storage config.
247    pub inner_storage_config: InnerStorageConfig,
248    /// The namespace used
249    pub namespace: String,
250}
251
252const MEMORY: &str = "memory:";
253#[cfg(feature = "storage-service")]
254const STORAGE_SERVICE: &str = "service:";
255#[cfg(feature = "rocksdb")]
256const ROCKS_DB: &str = "rocksdb:";
257#[cfg(feature = "dynamodb")]
258const DYNAMO_DB: &str = "dynamodb:";
259#[cfg(feature = "scylladb")]
260const SCYLLA_DB: &str = "scylladb:";
261#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
262const DUAL_ROCKS_DB_SCYLLA_DB: &str = "dualrocksdbscylladb:";
263
264impl FromStr for StorageConfig {
265    type Err = anyhow::Error;
266
267    fn from_str(input: &str) -> Result<Self, Self::Err> {
268        if let Some(s) = input.strip_prefix(MEMORY) {
269            let parts = s.split(':').collect::<Vec<_>>();
270            if parts.len() == 1 {
271                let genesis_path = parts[0].to_string().into();
272                let namespace = DEFAULT_NAMESPACE.to_string();
273                let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
274                return Ok(StorageConfig {
275                    inner_storage_config,
276                    namespace,
277                });
278            }
279            if parts.len() != 2 {
280                bail!("We should have one genesis config path and one optional namespace");
281            }
282            let genesis_path = parts[0].to_string().into();
283            let namespace = parts[1].to_string();
284            let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
285            return Ok(StorageConfig {
286                inner_storage_config,
287                namespace,
288            });
289        }
290        #[cfg(feature = "storage-service")]
291        if let Some(s) = input.strip_prefix(STORAGE_SERVICE) {
292            if s.is_empty() {
293                bail!(
294                    "For Storage service, the formatting has to be service:endpoint:namespace,\
295example service:tcp:127.0.0.1:7878:table_do_my_test"
296                );
297            }
298            let parts = s.split(':').collect::<Vec<_>>();
299            if parts.len() != 4 {
300                bail!("We should have one endpoint and one namespace");
301            }
302            let protocol = parts[0];
303            if protocol != "tcp" {
304                bail!("Only allowed protocol is tcp");
305            }
306            let endpoint = parts[1];
307            let port = parts[2];
308            let mut endpoint = endpoint.to_string();
309            endpoint.push(':');
310            endpoint.push_str(port);
311            let endpoint = endpoint.to_string();
312            let namespace = parts[3].to_string();
313            let inner_storage_config = InnerStorageConfig::Service { endpoint };
314            return Ok(StorageConfig {
315                inner_storage_config,
316                namespace,
317            });
318        }
319        #[cfg(feature = "rocksdb")]
320        if let Some(s) = input.strip_prefix(ROCKS_DB) {
321            if s.is_empty() {
322                bail!(
323                    "For RocksDB, the formatting has to be rocksdb:directory or rocksdb:directory:spawn_mode:namespace");
324            }
325            let parts = s.split(':').collect::<Vec<_>>();
326            if parts.len() == 1 {
327                let path = parts[0].to_string().into();
328                let namespace = DEFAULT_NAMESPACE.to_string();
329                let spawn_mode = RocksDbSpawnMode::SpawnBlocking;
330                let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
331                return Ok(StorageConfig {
332                    inner_storage_config,
333                    namespace,
334                });
335            }
336            if parts.len() == 2 || parts.len() == 3 {
337                let path = parts[0].to_string().into();
338                let spawn_mode_str = parts.get(1).expect("length already checked");
339                let spawn_mode = match *spawn_mode_str {
340                    "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
341                    "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
342                    "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
343                    _ => Err(anyhow!(
344                        "Failed to parse {} as a spawn_mode",
345                        spawn_mode_str
346                    )),
347                }?;
348                let namespace = if parts.len() == 2 {
349                    DEFAULT_NAMESPACE.to_string()
350                } else {
351                    (*parts.get(2).expect("length already checked")).to_string()
352                };
353                let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
354                return Ok(StorageConfig {
355                    inner_storage_config,
356                    namespace,
357                });
358            }
359            bail!("We should have one, two or three parts");
360        }
361        #[cfg(feature = "dynamodb")]
362        if let Some(s) = input.strip_prefix(DYNAMO_DB) {
363            let mut parts = s.splitn(2, ':');
364            let namespace = parts
365                .next()
366                .ok_or_else(|| anyhow!("Missing DynamoDB table name, e.g. {DYNAMO_DB}TABLE"))?
367                .to_string();
368            let use_dynamodb_local = match parts.next() {
369                None | Some("env") => false,
370                Some("dynamodb_local") => true,
371                Some(unknown) => {
372                    bail!(
373                        "Invalid DynamoDB endpoint {unknown:?}. \
374                        Expected {DYNAMO_DB}TABLE:[env|dynamodb_local]"
375                    );
376                }
377            };
378            let inner_storage_config = InnerStorageConfig::DynamoDb { use_dynamodb_local };
379            return Ok(StorageConfig {
380                inner_storage_config,
381                namespace,
382            });
383        }
384        #[cfg(feature = "scylladb")]
385        if let Some(s) = input.strip_prefix(SCYLLA_DB) {
386            let mut uri: Option<String> = None;
387            let mut namespace: Option<String> = None;
388            let parse_error: &'static str = "Correct format is tcp:db_hostname:port.";
389            if !s.is_empty() {
390                let mut parts = s.split(':');
391                while let Some(part) = parts.next() {
392                    match part {
393                        "tcp" => {
394                            let address = parts.next().ok_or_else(|| {
395                                anyhow!("Failed to find address for {s}. {parse_error}")
396                            })?;
397                            let port_str = parts.next().ok_or_else(|| {
398                                anyhow!("Failed to find port for {s}. {parse_error}")
399                            })?;
400                            let port = NonZeroU16::from_str(port_str).map_err(|_| {
401                                anyhow!(
402                                    "Failed to find parse port {port_str} for {s}. {parse_error}",
403                                )
404                            })?;
405                            if uri.is_some() {
406                                bail!("The uri has already been assigned");
407                            }
408                            uri = Some(format!("{}:{}", &address, port));
409                        }
410                        _ if part.starts_with("table") => {
411                            if namespace.is_some() {
412                                bail!("The namespace has already been assigned");
413                            }
414                            namespace = Some(part.to_string());
415                        }
416                        _ => {
417                            bail!("the entry \"{part}\" is not matching");
418                        }
419                    }
420                }
421            }
422            let uri = uri.unwrap_or_else(|| "localhost:9042".to_string());
423            let namespace = namespace.unwrap_or_else(|| DEFAULT_NAMESPACE.to_string());
424            let inner_storage_config = InnerStorageConfig::ScyllaDb { uri };
425            debug!("ScyllaDB connection info: {:?}", inner_storage_config);
426            return Ok(StorageConfig {
427                inner_storage_config,
428                namespace,
429            });
430        }
431        #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
432        if let Some(s) = input.strip_prefix(DUAL_ROCKS_DB_SCYLLA_DB) {
433            let parts = s.split(':').collect::<Vec<_>>();
434            if parts.len() != 5 && parts.len() != 6 {
435                bail!(
436                    "For DualRocksDbScyllaDb, the formatting has to be dualrocksdbscylladb:directory:mode:tcp:hostname:port:namespace"
437                );
438            }
439            let path = Path::new(parts[0]);
440            let path = path.to_path_buf();
441            let path_with_guard = PathWithGuard::new(path);
442            let spawn_mode_str = parts.get(1).expect("length already checked");
443            let spawn_mode = match *spawn_mode_str {
444                "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
445                "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
446                "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
447                _ => Err(anyhow!(
448                    "Failed to parse {} as a spawn_mode",
449                    spawn_mode_str
450                )),
451            }?;
452            let protocol = parts[2];
453            if protocol != "tcp" {
454                bail!("The only allowed protocol is tcp");
455            }
456            let address = parts[3];
457            let port_str = parts[4];
458            let port = NonZeroU16::from_str(port_str)
459                .map_err(|_| anyhow!("Failed to find parse port {port_str} for {s}"))?;
460            let uri = format!("{}:{}", &address, port);
461            let inner_storage_config = InnerStorageConfig::DualRocksDbScyllaDb {
462                path_with_guard,
463                spawn_mode,
464                uri,
465            };
466            let namespace = if parts.len() == 5 {
467                DEFAULT_NAMESPACE.to_string()
468            } else {
469                parts[5].to_string()
470            };
471            return Ok(StorageConfig {
472                inner_storage_config,
473                namespace,
474            });
475        }
476        error!("available storage: memory");
477        #[cfg(feature = "storage-service")]
478        error!("Also available is linera-storage-service");
479        #[cfg(feature = "rocksdb")]
480        error!("Also available is RocksDB");
481        #[cfg(feature = "dynamodb")]
482        error!("Also available is DynamoDB");
483        #[cfg(feature = "scylladb")]
484        error!("Also available is ScyllaDB");
485        #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
486        error!("Also available is DualRocksDbScyllaDb");
487        Err(anyhow!("The input has not matched: {input}"))
488    }
489}
490
491impl StorageConfig {
492    #[allow(unused_variables)]
493    pub fn maybe_append_shard_path(&mut self, shard: usize) -> std::io::Result<()> {
494        match &mut self.inner_storage_config {
495            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
496            InnerStorageConfig::DualRocksDbScyllaDb {
497                path_with_guard,
498                spawn_mode: _,
499                uri: _,
500            } => {
501                let shard_str = format!("shard_{shard}");
502                path_with_guard.path_buf.push(shard_str);
503                std::fs::create_dir_all(&path_with_guard.path_buf)
504            }
505            _ => Ok(()),
506        }
507    }
508
509    /// The addition of the common config to get a full configuration
510    pub fn add_common_storage_options(
511        &self,
512        options: &CommonStorageOptions,
513    ) -> Result<StoreConfig, anyhow::Error> {
514        let namespace = self.namespace.clone();
515        match &self.inner_storage_config {
516            InnerStorageConfig::Memory { genesis_path } => {
517                let config = MemoryStoreConfig {
518                    max_stream_queries: options.storage_max_stream_queries,
519                    kill_on_drop: false,
520                };
521                let genesis_path = genesis_path.clone();
522                Ok(StoreConfig::Memory {
523                    config,
524                    namespace,
525                    genesis_path,
526                })
527            }
528            #[cfg(feature = "storage-service")]
529            InnerStorageConfig::Service { endpoint } => {
530                let inner_config = StorageServiceStoreInternalConfig {
531                    endpoint: endpoint.clone(),
532                    max_concurrent_queries: options.storage_max_concurrent_queries,
533                    max_stream_queries: options.storage_max_stream_queries,
534                };
535                let config = StorageServiceStoreConfig {
536                    inner_config,
537                    storage_cache_config: options.storage_cache_config(),
538                };
539                Ok(StoreConfig::StorageService { config, namespace })
540            }
541            #[cfg(feature = "rocksdb")]
542            InnerStorageConfig::RocksDb { path, spawn_mode } => {
543                let path_with_guard = PathWithGuard::new(path.to_path_buf());
544                let inner_config = RocksDbStoreInternalConfig {
545                    spawn_mode: *spawn_mode,
546                    path_with_guard,
547                    max_stream_queries: options.storage_max_stream_queries,
548                };
549                let config = RocksDbStoreConfig {
550                    inner_config,
551                    storage_cache_config: options.storage_cache_config(),
552                };
553                Ok(StoreConfig::RocksDb { config, namespace })
554            }
555            #[cfg(feature = "dynamodb")]
556            InnerStorageConfig::DynamoDb { use_dynamodb_local } => {
557                let inner_config = DynamoDbStoreInternalConfig {
558                    use_dynamodb_local: *use_dynamodb_local,
559                    max_concurrent_queries: options.storage_max_concurrent_queries,
560                    max_stream_queries: options.storage_max_stream_queries,
561                };
562                let config = DynamoDbStoreConfig {
563                    inner_config,
564                    storage_cache_config: options.storage_cache_config(),
565                };
566                Ok(StoreConfig::DynamoDb { config, namespace })
567            }
568            #[cfg(feature = "scylladb")]
569            InnerStorageConfig::ScyllaDb { uri } => {
570                let inner_config = ScyllaDbStoreInternalConfig {
571                    uri: uri.clone(),
572                    max_stream_queries: options.storage_max_stream_queries,
573                    max_concurrent_queries: options.storage_max_concurrent_queries,
574                    replication_factor: options.storage_replication_factor,
575                };
576                let config = ScyllaDbStoreConfig {
577                    inner_config,
578                    storage_cache_config: options.storage_cache_config(),
579                };
580                Ok(StoreConfig::ScyllaDb { config, namespace })
581            }
582            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
583            InnerStorageConfig::DualRocksDbScyllaDb {
584                path_with_guard,
585                spawn_mode,
586                uri,
587            } => {
588                let inner_config = RocksDbStoreInternalConfig {
589                    spawn_mode: *spawn_mode,
590                    path_with_guard: path_with_guard.clone(),
591                    max_stream_queries: options.storage_max_stream_queries,
592                };
593                let first_config = RocksDbStoreConfig {
594                    inner_config,
595                    storage_cache_config: options.storage_cache_config(),
596                };
597
598                let inner_config = ScyllaDbStoreInternalConfig {
599                    uri: uri.clone(),
600                    max_stream_queries: options.storage_max_stream_queries,
601                    max_concurrent_queries: options.storage_max_concurrent_queries,
602                    replication_factor: options.storage_replication_factor,
603                };
604                let second_config = ScyllaDbStoreConfig {
605                    inner_config,
606                    storage_cache_config: options.storage_cache_config(),
607                };
608
609                let config = DualStoreConfig {
610                    first_config,
611                    second_config,
612                };
613                Ok(StoreConfig::DualRocksDbScyllaDb { config, namespace })
614            }
615        }
616    }
617}
618
619impl fmt::Display for StorageConfig {
620    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
621        let namespace = &self.namespace;
622        match &self.inner_storage_config {
623            #[cfg(feature = "storage-service")]
624            InnerStorageConfig::Service { endpoint } => {
625                write!(f, "service:tcp:{endpoint}:{namespace}")
626            }
627            InnerStorageConfig::Memory { genesis_path } => {
628                write!(f, "memory:{}:{namespace}", genesis_path.display())
629            }
630            #[cfg(feature = "rocksdb")]
631            InnerStorageConfig::RocksDb { path, spawn_mode } => {
632                let spawn_mode = spawn_mode.to_string();
633                write!(f, "rocksdb:{}:{spawn_mode}:{namespace}", path.display())
634            }
635            #[cfg(feature = "dynamodb")]
636            InnerStorageConfig::DynamoDb { use_dynamodb_local } => match use_dynamodb_local {
637                true => write!(f, "dynamodb:{namespace}:dynamodb_local"),
638                false => write!(f, "dynamodb:{namespace}:env"),
639            },
640            #[cfg(feature = "scylladb")]
641            InnerStorageConfig::ScyllaDb { uri } => {
642                write!(f, "scylladb:tcp:{uri}:{namespace}")
643            }
644            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
645            InnerStorageConfig::DualRocksDbScyllaDb {
646                path_with_guard,
647                spawn_mode,
648                uri,
649            } => {
650                write!(
651                    f,
652                    "dualrocksdbscylladb:{}:{}:tcp:{}:{}",
653                    path_with_guard.path_buf.display(),
654                    spawn_mode,
655                    uri,
656                    namespace
657                )
658            }
659        }
660    }
661}
662
663#[async_trait]
664pub trait Runnable {
665    type Output;
666
667    async fn run<S>(self, storage: S) -> Self::Output
668    where
669        S: Storage + Clone + Send + Sync + 'static;
670}
671
672#[async_trait]
673pub trait RunnableWithStore {
674    type Output;
675
676    async fn run<D>(
677        self,
678        config: D::Config,
679        namespace: String,
680        cache_sizes: StorageCacheSizes,
681    ) -> Result<Self::Output, anyhow::Error>
682    where
683        D: KeyValueDatabase + Clone + Send + Sync + 'static,
684        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
685        D::Error: Send + Sync;
686}
687
688/// Reads a JSON value from a file at the given path.
689fn read_json<T: serde::de::DeserializeOwned>(path: impl Into<PathBuf>) -> anyhow::Result<T> {
690    Ok(serde_json::from_reader(fs_err::File::open(path.into())?)?)
691}
692
693impl StoreConfig {
694    pub async fn run_with_storage<Job>(
695        self,
696        wasm_runtime: Option<WasmRuntime>,
697        allow_application_logs: bool,
698        cache_sizes: StorageCacheSizes,
699        job: Job,
700    ) -> Result<Job::Output, anyhow::Error>
701    where
702        Job: Runnable,
703    {
704        match self {
705            StoreConfig::Memory {
706                config,
707                namespace,
708                genesis_path,
709            } => {
710                let mut storage = DbStorage::<MemoryDatabase, _>::maybe_create_and_connect(
711                    &config,
712                    &namespace,
713                    wasm_runtime,
714                    cache_sizes,
715                )
716                .await?
717                .with_allow_application_logs(allow_application_logs);
718                let genesis_config = read_json::<GenesisConfig>(genesis_path)?;
719                // Memory storage must be initialized every time.
720                genesis_config.initialize_storage(&mut storage).await?;
721                Ok(job.run(storage).await)
722            }
723            #[cfg(feature = "storage-service")]
724            StoreConfig::StorageService { config, namespace } => {
725                let storage = DbStorage::<StorageServiceDatabase, _>::connect(
726                    &config,
727                    &namespace,
728                    wasm_runtime,
729                    cache_sizes,
730                )
731                .await?
732                .with_allow_application_logs(allow_application_logs);
733                Ok(job.run(storage).await)
734            }
735            #[cfg(feature = "rocksdb")]
736            StoreConfig::RocksDb { config, namespace } => {
737                let storage = DbStorage::<RocksDbDatabase, _>::connect(
738                    &config,
739                    &namespace,
740                    wasm_runtime,
741                    cache_sizes,
742                )
743                .await?
744                .with_allow_application_logs(allow_application_logs);
745                Ok(job.run(storage).await)
746            }
747            #[cfg(feature = "dynamodb")]
748            StoreConfig::DynamoDb { config, namespace } => {
749                let storage = DbStorage::<DynamoDbDatabase, _>::connect(
750                    &config,
751                    &namespace,
752                    wasm_runtime,
753                    cache_sizes,
754                )
755                .await?
756                .with_allow_application_logs(allow_application_logs);
757                Ok(job.run(storage).await)
758            }
759            #[cfg(feature = "scylladb")]
760            StoreConfig::ScyllaDb { config, namespace } => {
761                let storage = DbStorage::<ScyllaDbDatabase, _>::connect(
762                    &config,
763                    &namespace,
764                    wasm_runtime,
765                    cache_sizes,
766                )
767                .await?
768                .with_allow_application_logs(allow_application_logs);
769                Ok(job.run(storage).await)
770            }
771            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
772            StoreConfig::DualRocksDbScyllaDb { config, namespace } => {
773                let storage =
774                    DbStorage::<
775                        DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>,
776                        _,
777                    >::connect(&config, &namespace, wasm_runtime, cache_sizes)
778                    .await?
779                    .with_allow_application_logs(allow_application_logs);
780                Ok(job.run(storage).await)
781            }
782        }
783    }
784
785    #[allow(unused_variables)]
786    pub async fn run_with_store<Job>(
787        self,
788        cache_sizes: StorageCacheSizes,
789        job: Job,
790    ) -> Result<Job::Output, anyhow::Error>
791    where
792        Job: RunnableWithStore,
793    {
794        match self {
795            StoreConfig::Memory { .. } => {
796                Err(anyhow!("Cannot run admin operations on the memory store"))
797            }
798            #[cfg(feature = "storage-service")]
799            StoreConfig::StorageService { config, namespace } => Ok(job
800                .run::<StorageServiceDatabase>(config, namespace, cache_sizes)
801                .await?),
802            #[cfg(feature = "rocksdb")]
803            StoreConfig::RocksDb { config, namespace } => Ok(job
804                .run::<RocksDbDatabase>(config, namespace, cache_sizes)
805                .await?),
806            #[cfg(feature = "dynamodb")]
807            StoreConfig::DynamoDb { config, namespace } => Ok(job
808                .run::<DynamoDbDatabase>(config, namespace, cache_sizes)
809                .await?),
810            #[cfg(feature = "scylladb")]
811            StoreConfig::ScyllaDb { config, namespace } => Ok(job
812                .run::<ScyllaDbDatabase>(config, namespace, cache_sizes)
813                .await?),
814            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
815            StoreConfig::DualRocksDbScyllaDb { config, namespace } => Ok(job
816                .run::<DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>>(
817                    config,
818                    namespace,
819                    cache_sizes,
820                )
821                .await?),
822        }
823    }
824}
825
826pub struct StorageMigration;
827
828#[async_trait]
829impl RunnableWithStore for StorageMigration {
830    type Output = ();
831
832    async fn run<D>(
833        self,
834        config: D::Config,
835        namespace: String,
836        cache_sizes: StorageCacheSizes,
837    ) -> Result<Self::Output, anyhow::Error>
838    where
839        D: KeyValueDatabase + Clone + Send + Sync + 'static,
840        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
841        D::Error: Send + Sync,
842    {
843        if D::exists(&config, &namespace).await? {
844            let wasm_runtime = None;
845            let storage =
846                DbStorage::<D, WallClock>::connect(&config, &namespace, wasm_runtime, cache_sizes)
847                    .await?;
848            storage.migrate_if_needed().await?;
849        }
850        Ok(())
851    }
852}
853
854pub struct AssertStorageV1;
855
856#[async_trait]
857impl RunnableWithStore for AssertStorageV1 {
858    type Output = ();
859
860    async fn run<D>(
861        self,
862        config: D::Config,
863        namespace: String,
864        cache_sizes: StorageCacheSizes,
865    ) -> Result<Self::Output, anyhow::Error>
866    where
867        D: KeyValueDatabase + Clone + Send + Sync + 'static,
868        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
869        D::Error: Send + Sync,
870    {
871        if D::exists(&config, &namespace).await? {
872            let wasm_runtime = None;
873            let storage =
874                DbStorage::<D, WallClock>::connect(&config, &namespace, wasm_runtime, cache_sizes)
875                    .await?;
876            storage.assert_is_migrated_storage().await?;
877        }
878        Ok(())
879    }
880}
881
882#[test]
883fn test_memory_storage_config_from_str() {
884    assert_eq!(
885        StorageConfig::from_str("memory:path/to/genesis.json").unwrap(),
886        StorageConfig {
887            inner_storage_config: InnerStorageConfig::Memory {
888                genesis_path: PathBuf::from("path/to/genesis.json")
889            },
890            namespace: DEFAULT_NAMESPACE.into()
891        }
892    );
893    assert_eq!(
894        StorageConfig::from_str("memory:path/to/genesis.json:namespace").unwrap(),
895        StorageConfig {
896            inner_storage_config: InnerStorageConfig::Memory {
897                genesis_path: PathBuf::from("path/to/genesis.json")
898            },
899            namespace: "namespace".into()
900        }
901    );
902    assert!(StorageConfig::from_str("memory").is_err(),);
903}
904
905#[cfg(feature = "storage-service")]
906#[test]
907fn test_shared_store_config_from_str() {
908    assert_eq!(
909        StorageConfig::from_str("service:tcp:127.0.0.1:8942:linera").unwrap(),
910        StorageConfig {
911            inner_storage_config: InnerStorageConfig::Service {
912                endpoint: "127.0.0.1:8942".to_string()
913            },
914            namespace: "linera".into()
915        }
916    );
917    assert!(StorageConfig::from_str("service:tcp:127.0.0.1:8942").is_err());
918    assert!(StorageConfig::from_str("service:tcp:127.0.0.1:linera").is_err());
919}
920
921#[cfg(feature = "rocksdb")]
922#[test]
923fn test_rocks_db_storage_config_from_str() {
924    assert!(StorageConfig::from_str("rocksdb_foo.db").is_err());
925    assert_eq!(
926        StorageConfig::from_str("rocksdb:foo.db").unwrap(),
927        StorageConfig {
928            inner_storage_config: InnerStorageConfig::RocksDb {
929                path: "foo.db".into(),
930                spawn_mode: RocksDbSpawnMode::SpawnBlocking,
931            },
932            namespace: DEFAULT_NAMESPACE.to_string()
933        }
934    );
935    assert_eq!(
936        StorageConfig::from_str("rocksdb:foo.db:block_in_place").unwrap(),
937        StorageConfig {
938            inner_storage_config: InnerStorageConfig::RocksDb {
939                path: "foo.db".into(),
940                spawn_mode: RocksDbSpawnMode::BlockInPlace,
941            },
942            namespace: DEFAULT_NAMESPACE.to_string()
943        }
944    );
945    assert_eq!(
946        StorageConfig::from_str("rocksdb:foo.db:block_in_place:chosen_namespace").unwrap(),
947        StorageConfig {
948            inner_storage_config: InnerStorageConfig::RocksDb {
949                path: "foo.db".into(),
950                spawn_mode: RocksDbSpawnMode::BlockInPlace,
951            },
952            namespace: "chosen_namespace".into()
953        }
954    );
955}
956
957#[cfg(feature = "dynamodb")]
958#[test]
959fn test_aws_storage_config_from_str() {
960    assert_eq!(
961        StorageConfig::from_str("dynamodb:table").unwrap(),
962        StorageConfig {
963            inner_storage_config: InnerStorageConfig::DynamoDb {
964                use_dynamodb_local: false
965            },
966            namespace: "table".to_string()
967        }
968    );
969    assert_eq!(
970        StorageConfig::from_str("dynamodb:table:env").unwrap(),
971        StorageConfig {
972            inner_storage_config: InnerStorageConfig::DynamoDb {
973                use_dynamodb_local: false
974            },
975            namespace: "table".to_string()
976        }
977    );
978    assert_eq!(
979        StorageConfig::from_str("dynamodb:table:dynamodb_local").unwrap(),
980        StorageConfig {
981            inner_storage_config: InnerStorageConfig::DynamoDb {
982                use_dynamodb_local: true
983            },
984            namespace: "table".to_string()
985        }
986    );
987    assert!(StorageConfig::from_str("dynamodb").is_err());
988    assert!(StorageConfig::from_str("dynamodb:").is_err());
989    assert!(StorageConfig::from_str("dynamodb:1").is_err());
990    assert!(StorageConfig::from_str("dynamodb:wrong:endpoint").is_err());
991}
992
993#[cfg(feature = "scylladb")]
994#[test]
995fn test_scylla_db_storage_config_from_str() {
996    assert_eq!(
997        StorageConfig::from_str("scylladb:").unwrap(),
998        StorageConfig {
999            inner_storage_config: InnerStorageConfig::ScyllaDb {
1000                uri: "localhost:9042".to_string()
1001            },
1002            namespace: DEFAULT_NAMESPACE.to_string()
1003        }
1004    );
1005    assert_eq!(
1006        StorageConfig::from_str("scylladb:tcp:db_hostname:230:table_other_storage").unwrap(),
1007        StorageConfig {
1008            inner_storage_config: InnerStorageConfig::ScyllaDb {
1009                uri: "db_hostname:230".to_string()
1010            },
1011            namespace: "table_other_storage".to_string()
1012        }
1013    );
1014    assert_eq!(
1015        StorageConfig::from_str("scylladb:tcp:db_hostname:230").unwrap(),
1016        StorageConfig {
1017            inner_storage_config: InnerStorageConfig::ScyllaDb {
1018                uri: "db_hostname:230".to_string()
1019            },
1020            namespace: DEFAULT_NAMESPACE.to_string()
1021        }
1022    );
1023    assert!(StorageConfig::from_str("scylladb:-10").is_err());
1024    assert!(StorageConfig::from_str("scylladb:70000").is_err());
1025    assert!(StorageConfig::from_str("scylladb:230:234").is_err());
1026    assert!(StorageConfig::from_str("scylladb:tcp:address1").is_err());
1027    assert!(StorageConfig::from_str("scylladb:tcp:address1:tcp:/address2").is_err());
1028    assert!(StorageConfig::from_str("scylladb:wrong").is_err());
1029}