1mod db_storage;
7mod migration;
8
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use itertools::Itertools;
13use linera_base::{
14 crypto::CryptoHash,
15 data_types::{
16 ApplicationDescription, Blob, BlockHeight, ChainDescription, CompressedBytecode,
17 NetworkDescription, TimeDelta, Timestamp,
18 },
19 identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
20 vm::VmRuntime,
21};
22use linera_chain::{
23 types::{ConfirmedBlock, ConfirmedBlockCertificate},
24 ChainError, ChainStateView,
25};
26#[cfg(with_revm)]
27use linera_execution::{
28 evm::revm::{EvmContractModule, EvmServiceModule},
29 EvmRuntime,
30};
31use linera_execution::{
32 BlobState, ExecutionError, ExecutionRuntimeConfig, ExecutionRuntimeContext, TransactionTracker,
33 UserContractCode, UserServiceCode, WasmRuntime,
34};
35#[cfg(with_wasm_runtime)]
36use linera_execution::{WasmContractModule, WasmServiceModule};
37use linera_views::{context::Context, views::RootView, ViewError};
38
39#[cfg(with_metrics)]
40pub use crate::db_storage::metrics;
41#[cfg(with_testing)]
42pub use crate::db_storage::TestClock;
43pub use crate::db_storage::{ChainStatesFirstAssignment, DbStorage, WallClock};
44
45pub const DEFAULT_NAMESPACE: &str = "table_linera";
47
48#[cfg_attr(not(web), async_trait)]
50#[cfg_attr(web, async_trait(?Send))]
51pub trait Storage: linera_base::util::traits::AutoTraits + Sized {
52 type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + 'static;
54
55 type Clock: Clock + Clone + Send + Sync;
57
58 type BlockExporterContext: Context<Extra = u32> + Clone;
60
61 fn clock(&self) -> &Self::Clock;
63
64 fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool>;
65
66 async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
74
75 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
77
78 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
80
81 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
83
84 async fn read_confirmed_block(
86 &self,
87 hash: CryptoHash,
88 ) -> Result<Option<ConfirmedBlock>, ViewError>;
89
90 async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
92 &self,
93 hashes: I,
94 ) -> Result<Vec<Option<ConfirmedBlock>>, ViewError>;
95
96 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError>;
98
99 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError>;
101
102 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
104
105 async fn read_blob_states(
107 &self,
108 blob_ids: &[BlobId],
109 ) -> Result<Vec<Option<BlobState>>, ViewError>;
110
111 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
113
114 async fn write_blobs_and_certificate(
116 &self,
117 blobs: &[Blob],
118 certificate: &ConfirmedBlockCertificate,
119 ) -> Result<(), ViewError>;
120
121 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
124
125 async fn maybe_write_blob_states(
127 &self,
128 blob_ids: &[BlobId],
129 blob_state: BlobState,
130 ) -> Result<(), ViewError>;
131
132 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
134
135 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
137
138 async fn read_certificate(
140 &self,
141 hash: CryptoHash,
142 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError>;
143
144 async fn read_certificates(
146 &self,
147 hashes: &[CryptoHash],
148 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError>;
149
150 async fn read_certificates_raw(
156 &self,
157 hashes: &[CryptoHash],
158 ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError>;
159
160 async fn read_certificates_by_heights(
164 &self,
165 chain_id: ChainId,
166 heights: &[BlockHeight],
167 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError>;
168
169 async fn read_certificates_by_heights_raw(
174 &self,
175 chain_id: ChainId,
176 heights: &[BlockHeight],
177 ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError>;
178
179 async fn read_certificate_hashes_by_heights(
183 &self,
184 chain_id: ChainId,
185 heights: &[BlockHeight],
186 ) -> Result<Vec<Option<CryptoHash>>, ViewError>;
187
188 async fn write_certificate_height_indices(
192 &self,
193 chain_id: ChainId,
194 indices: &[(BlockHeight, CryptoHash)],
195 ) -> Result<(), ViewError>;
196
197 async fn read_event(&self, id: EventId) -> Result<Option<Vec<u8>>, ViewError>;
199
200 async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
202
203 async fn read_events_from_index(
205 &self,
206 chain_id: &ChainId,
207 stream_id: &StreamId,
208 start_index: u32,
209 ) -> Result<Vec<IndexAndEvent>, ViewError>;
210
211 async fn write_events(
213 &self,
214 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
215 ) -> Result<(), ViewError>;
216
217 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
219
220 async fn write_network_description(
222 &self,
223 information: &NetworkDescription,
224 ) -> Result<(), ViewError>;
225
226 async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
234 where
235 ChainRuntimeContext<Self>: ExecutionRuntimeContext,
236 {
237 let id = description.id();
238 self.write_blob(&Blob::new_chain_description(&description))
240 .await?;
241 let mut chain = self.load_chain(id).await?;
242 assert!(!chain.is_active(), "Attempting to create a chain twice");
243 let current_time = self.clock().current_time();
244 chain.initialize_if_needed(current_time).await?;
245 chain.save().await?;
246 Ok(())
247 }
248
249 fn wasm_runtime(&self) -> Option<WasmRuntime>;
251
252 async fn load_contract(
255 &self,
256 application_description: &ApplicationDescription,
257 txn_tracker: &TransactionTracker,
258 ) -> Result<UserContractCode, ExecutionError> {
259 let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
260 let content = match txn_tracker.get_blob_content(&contract_bytecode_blob_id) {
261 Some(content) => content.clone(),
262 None => self
263 .read_blob(contract_bytecode_blob_id)
264 .await?
265 .ok_or(ExecutionError::BlobsNotFound(vec![
266 contract_bytecode_blob_id,
267 ]))?
268 .into_content(),
269 };
270 let compressed_contract_bytecode = CompressedBytecode {
271 compressed_bytes: content.into_arc_bytes(),
272 };
273 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
274 let contract_bytecode = self
275 .thread_pool()
276 .run_send((), move |()| async move {
277 compressed_contract_bytecode.decompress()
278 })
279 .await
280 .await??;
281 match application_description.module_id.vm_runtime {
282 VmRuntime::Wasm => {
283 cfg_if::cfg_if! {
284 if #[cfg(with_wasm_runtime)] {
285 let Some(wasm_runtime) = self.wasm_runtime() else {
286 panic!("A Wasm runtime is required to load user applications.");
287 };
288 Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
289 .await?
290 .into())
291 } else {
292 panic!(
293 "A Wasm runtime is required to load user applications. \
294 Please enable the `wasmer` or the `wasmtime` feature flags \
295 when compiling `linera-storage`."
296 );
297 }
298 }
299 }
300 VmRuntime::Evm => {
301 cfg_if::cfg_if! {
302 if #[cfg(with_revm)] {
303 let evm_runtime = EvmRuntime::Revm;
304 Ok(EvmContractModule::new(contract_bytecode, evm_runtime)?
305 .into())
306 } else {
307 panic!(
308 "An Evm runtime is required to load user applications. \
309 Please enable the `revm` feature flag \
310 when compiling `linera-storage`."
311 );
312 }
313 }
314 }
315 }
316 }
317
318 async fn load_service(
321 &self,
322 application_description: &ApplicationDescription,
323 txn_tracker: &TransactionTracker,
324 ) -> Result<UserServiceCode, ExecutionError> {
325 let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
326 let content = match txn_tracker.get_blob_content(&service_bytecode_blob_id) {
327 Some(content) => content.clone(),
328 None => self
329 .read_blob(service_bytecode_blob_id)
330 .await?
331 .ok_or(ExecutionError::BlobsNotFound(vec![
332 service_bytecode_blob_id,
333 ]))?
334 .into_content(),
335 };
336 let compressed_service_bytecode = CompressedBytecode {
337 compressed_bytes: content.into_arc_bytes(),
338 };
339 #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
340 let service_bytecode = self
341 .thread_pool()
342 .run_send((), move |()| async move {
343 compressed_service_bytecode.decompress()
344 })
345 .await
346 .await??;
347 match application_description.module_id.vm_runtime {
348 VmRuntime::Wasm => {
349 cfg_if::cfg_if! {
350 if #[cfg(with_wasm_runtime)] {
351 let Some(wasm_runtime) = self.wasm_runtime() else {
352 panic!("A Wasm runtime is required to load user applications.");
353 };
354 Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
355 .await?
356 .into())
357 } else {
358 panic!(
359 "A Wasm runtime is required to load user applications. \
360 Please enable the `wasmer` or the `wasmtime` feature flags \
361 when compiling `linera-storage`."
362 );
363 }
364 }
365 }
366 VmRuntime::Evm => {
367 cfg_if::cfg_if! {
368 if #[cfg(with_revm)] {
369 let evm_runtime = EvmRuntime::Revm;
370 Ok(EvmServiceModule::new(service_bytecode, evm_runtime)?
371 .into())
372 } else {
373 panic!(
374 "An Evm runtime is required to load user applications. \
375 Please enable the `revm` feature flag \
376 when compiling `linera-storage`."
377 );
378 }
379 }
380 }
381 }
382 }
383
384 async fn block_exporter_context(
385 &self,
386 block_exporter_id: u32,
387 ) -> Result<Self::BlockExporterContext, ViewError>;
388}
389
390pub enum ResultReadCertificates {
392 Certificates(Vec<ConfirmedBlockCertificate>),
393 InvalidHashes(Vec<CryptoHash>),
394}
395
396impl ResultReadCertificates {
397 pub fn new(
399 certificates: Vec<Option<ConfirmedBlockCertificate>>,
400 hashes: Vec<CryptoHash>,
401 ) -> Self {
402 let (certificates, invalid_hashes) = certificates
403 .into_iter()
404 .zip(hashes)
405 .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(certificate, hash)| match certificate {
406 Some(cert) => itertools::Either::Left(cert),
407 None => itertools::Either::Right(hash),
408 });
409 if invalid_hashes.is_empty() {
410 Self::Certificates(certificates)
411 } else {
412 Self::InvalidHashes(invalid_hashes)
413 }
414 }
415}
416
417pub enum ResultReadConfirmedBlocks {
419 Blocks(Vec<ConfirmedBlock>),
420 InvalidHashes(Vec<CryptoHash>),
421}
422
423impl ResultReadConfirmedBlocks {
424 pub fn new(blocks: Vec<Option<ConfirmedBlock>>, hashes: Vec<CryptoHash>) -> Self {
426 let (blocks, invalid_hashes) = blocks
427 .into_iter()
428 .zip(hashes)
429 .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(block, hash)| match block {
430 Some(block) => itertools::Either::Left(block),
431 None => itertools::Either::Right(hash),
432 });
433 if invalid_hashes.is_empty() {
434 Self::Blocks(blocks)
435 } else {
436 Self::InvalidHashes(invalid_hashes)
437 }
438 }
439}
440
441#[derive(Clone)]
443pub struct ChainRuntimeContext<S> {
444 storage: S,
445 chain_id: ChainId,
446 thread_pool: Arc<linera_execution::ThreadPool>,
447 execution_runtime_config: ExecutionRuntimeConfig,
448 user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
449 user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
450}
451
452#[cfg_attr(not(web), async_trait)]
453#[cfg_attr(web, async_trait(?Send))]
454impl<S: Storage> ExecutionRuntimeContext for ChainRuntimeContext<S> {
455 fn chain_id(&self) -> ChainId {
456 self.chain_id
457 }
458
459 fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
460 &self.thread_pool
461 }
462
463 fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
464 self.execution_runtime_config
465 }
466
467 fn user_contracts(&self) -> &Arc<papaya::HashMap<ApplicationId, UserContractCode>> {
468 &self.user_contracts
469 }
470
471 fn user_services(&self) -> &Arc<papaya::HashMap<ApplicationId, UserServiceCode>> {
472 &self.user_services
473 }
474
475 async fn get_user_contract(
476 &self,
477 description: &ApplicationDescription,
478 txn_tracker: &TransactionTracker,
479 ) -> Result<UserContractCode, ExecutionError> {
480 let application_id = description.into();
481 let pinned = self.user_contracts.pin_owned();
482 if let Some(contract) = pinned.get(&application_id) {
483 return Ok(contract.clone());
484 }
485 let contract = self.storage.load_contract(description, txn_tracker).await?;
486 pinned.insert(application_id, contract.clone());
487 Ok(contract)
488 }
489
490 async fn get_user_service(
491 &self,
492 description: &ApplicationDescription,
493 txn_tracker: &TransactionTracker,
494 ) -> Result<UserServiceCode, ExecutionError> {
495 let application_id = description.into();
496 let pinned = self.user_services.pin_owned();
497 if let Some(service) = pinned.get(&application_id) {
498 return Ok(service.clone());
499 }
500 let service = self.storage.load_service(description, txn_tracker).await?;
501 pinned.insert(application_id, service.clone());
502 Ok(service)
503 }
504
505 async fn get_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
506 self.storage.read_blob(blob_id).await
507 }
508
509 async fn get_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
510 self.storage.read_event(event_id).await
511 }
512
513 async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
514 self.storage.read_network_description().await
515 }
516
517 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
518 self.storage.contains_blob(blob_id).await
519 }
520
521 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
522 self.storage.contains_event(event_id).await
523 }
524
525 #[cfg(with_testing)]
526 async fn add_blobs(
527 &self,
528 blobs: impl IntoIterator<Item = Blob> + Send,
529 ) -> Result<(), ViewError> {
530 let blobs = Vec::from_iter(blobs);
531 self.storage.write_blobs(&blobs).await
532 }
533
534 #[cfg(with_testing)]
535 async fn add_events(
536 &self,
537 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
538 ) -> Result<(), ViewError> {
539 self.storage.write_events(events).await
540 }
541}
542
543#[cfg_attr(not(web), async_trait)]
545#[cfg_attr(web, async_trait(?Send))]
546pub trait Clock {
547 fn current_time(&self) -> Timestamp;
548
549 async fn sleep(&self, delta: TimeDelta);
550
551 async fn sleep_until(&self, timestamp: Timestamp);
552}