smoldot/database/
full_sqlite.rs

1// Smoldot
2// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18//! Filesystem-backed database containing all the information about a chain.
19//!
20//! This module handles the persistent storage of the chain on disk.
21//!
22//! # Usage
23//!
24//! Use the [`open()`] function to create a new database or open an existing one. [`open()`]
25//! returns a [`DatabaseOpen`] enum. This enum will contain either a [`SqliteFullDatabase`] object,
26//! representing an access to the database, or a [`DatabaseEmpty`] if the database didn't exist or
27//! is empty. If that is the case, use [`DatabaseEmpty::initialize`] in order to populate it and
28//! obtain a [`SqliteFullDatabase`].
29//!
30//! Use [`SqliteFullDatabase::insert`] to insert a new block in the database. The block is assumed
31//! to have been successfully verified prior to insertion. An error is returned if this block is
32//! already in the database or isn't a descendant or ancestor of the latest finalized block.
33//!
34//! Use [`SqliteFullDatabase::set_finalized`] to mark a block already in the database as finalized.
35//! Any block that isn't an ancestor or descendant will be removed. Reverting finalization is
36//! not supported.
37//!
38//! In order to minimize disk usage, it is not possible to efficiently retrieve the storage items
39//! of blocks that are ancestors of the finalized block. When a block is finalized, the storage of
40//! its ancestors is lost, and the only way to reconstruct it is to execute all blocks starting
41//! from the genesis to the desired one.
42//!
43//! # About errors handling
44//!
45//! Most of the functions and methods in this module return a `Result` containing notably an
46//! [`CorruptedError`]. This kind of errors can happen if the operating system returns an error
47//! when accessing the file system, or if the database has been corrupted, for example by the user
48//! manually modifying it.
49//!
50//! There isn't much that can be done to properly handle an [`CorruptedError`]. The only
51//! reasonable solutions are either to stop the program, or to delete the entire database and
52//! recreate it.
53//!
54//! # Schema
55//!
56//! The SQL schema of the database, with explanatory comments, can be found in `open.rs`.
57//!
58//! # About blocking behavior
59//!
60//! This implementation uses the SQLite library, which isn't Rust-asynchronous-compatible. Many
61//! functions will, with the help of the operating system, put the current thread to sleep while
62//! waiting for an I/O operation to finish. In the context of asynchronous Rust, this is
63//! undesirable.
64//!
65//! For this reason, you are encouraged to isolate the database in its own threads and never
66//! access it directly from an asynchronous context.
67//!
68
69// TODO: better docs
70
71#![cfg(feature = "database-sqlite")]
72
73use crate::{
74    chain::chain_information,
75    executor::{self, host},
76    header, trie,
77};
78
79use alloc::borrow::Cow;
80use core::{fmt, iter};
81use parking_lot::Mutex;
82use rusqlite::OptionalExtension as _;
83
84pub use open::{Config, ConfigTy, DatabaseEmpty, DatabaseOpen, open};
85
86mod open;
87mod tests;
88
89/// Returns an opaque string representing the version number of the SQLite library this binary
90/// is using.
91pub fn sqlite_version() -> &'static str {
92    rusqlite::version()
93}
94
95/// An open database. Holds file descriptors.
96pub struct SqliteFullDatabase {
97    /// The SQLite connection.
98    ///
99    /// The database is constantly within a transaction.
100    /// When the database is opened, `BEGIN TRANSACTION` is immediately run. We periodically
101    /// call `COMMIT; BEGIN_TRANSACTION` when deemed necessary. `COMMIT` is basically the
102    /// equivalent of `fsync`, and must be called carefully in order to not lose too much speed.
103    database: Mutex<rusqlite::Connection>,
104
105    /// Number of bytes used to encode the block number.
106    block_number_bytes: usize,
107}
108
109impl SqliteFullDatabase {
110    /// Returns the hash of the block in the database whose storage is currently accessible.
111    pub fn best_block_hash(&self) -> Result<[u8; 32], CorruptedError> {
112        let connection = self.database.lock();
113
114        let val = meta_get_blob(&connection, "best")?.ok_or(CorruptedError::MissingMetaKey)?;
115        if val.len() == 32 {
116            let mut out = [0; 32];
117            out.copy_from_slice(&val);
118            Ok(out)
119        } else {
120            Err(CorruptedError::InvalidBlockHashLen)
121        }
122    }
123
124    /// Returns the hash of the finalized block in the database.
125    pub fn finalized_block_hash(&self) -> Result<[u8; 32], CorruptedError> {
126        let database = self.database.lock();
127        finalized_hash(&database)
128    }
129
130    /// Returns the SCALE-encoded header of the given block, or `None` if the block is unknown.
131    ///
132    /// > **Note**: If this method is called twice times in a row with the same block hash, it
133    /// >           is possible for the first time to return `Some` and the second time to return
134    /// >           `None`, in case the block has since been removed from the database.
135    pub fn block_scale_encoded_header(
136        &self,
137        block_hash: &[u8; 32],
138    ) -> Result<Option<Vec<u8>>, CorruptedError> {
139        let connection = self.database.lock();
140
141        let out = connection
142            .prepare_cached(r#"SELECT header FROM blocks WHERE hash = ?"#)
143            .map_err(|err| CorruptedError::Internal(InternalError(err)))?
144            .query_row((&block_hash[..],), |row| row.get::<_, Vec<u8>>(0))
145            .optional()
146            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
147
148        Ok(out)
149    }
150
151    /// Returns the hash of the parent of the given block, or `None` if the block is unknown.
152    ///
153    /// > **Note**: If this method is called twice times in a row with the same block hash, it
154    /// >           is possible for the first time to return `Some` and the second time to return
155    /// >           `None`, in case the block has since been removed from the database.
156    pub fn block_parent(&self, block_hash: &[u8; 32]) -> Result<Option<[u8; 32]>, CorruptedError> {
157        let connection = self.database.lock();
158
159        let out = connection
160            .prepare_cached(r#"SELECT parent_hash FROM blocks WHERE hash = ?"#)
161            .map_err(|err| CorruptedError::Internal(InternalError(err)))?
162            .query_row((&block_hash[..],), |row| row.get::<_, [u8; 32]>(0))
163            .optional()
164            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
165
166        Ok(out)
167    }
168
169    /// Returns the list of extrinsics of the given block, or `None` if the block is unknown.
170    ///
171    /// > **Note**: The list of extrinsics of a block is also known as its *body*.
172    ///
173    /// > **Note**: If this method is called twice times in a row with the same block hash, it
174    /// >           is possible for the first time to return `Some` and the second time to return
175    /// >           `None`, in case the block has since been removed from the database.
176    pub fn block_extrinsics(
177        &self,
178        block_hash: &[u8; 32],
179    ) -> Result<Option<impl ExactSizeIterator<Item = Vec<u8>>>, CorruptedError> {
180        let connection = self.database.lock();
181
182        // TODO: doesn't detect if block is absent
183
184        let result = connection
185            .prepare_cached(r#"SELECT extrinsic FROM blocks_body WHERE hash = ? ORDER BY idx ASC"#)
186            .map_err(|err| CorruptedError::Internal(InternalError(err)))?
187            .query_map((&block_hash[..],), |row| row.get::<_, Vec<u8>>(0))
188            .map_err(|err| CorruptedError::Internal(InternalError(err)))?
189            .collect::<Result<Vec<_>, _>>()
190            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
191
192        Ok(Some(result.into_iter()))
193    }
194
195    /// Returns the hashes of the blocks given a block number.
196    pub fn block_hash_by_number(
197        &self,
198        block_number: u64,
199    ) -> Result<impl ExactSizeIterator<Item = [u8; 32]>, CorruptedError> {
200        let connection = self.database.lock();
201        let result = block_hashes_by_number(&connection, block_number)?;
202        Ok(result.into_iter())
203    }
204
205    /// Returns the hash of the block of the best chain given a block number.
206    pub fn best_block_hash_by_number(
207        &self,
208        block_number: u64,
209    ) -> Result<Option<[u8; 32]>, CorruptedError> {
210        let connection = self.database.lock();
211
212        let block_number = match i64::try_from(block_number) {
213            Ok(n) => n,
214            Err(_) => return Ok(None),
215        };
216
217        let result = connection
218            .prepare_cached(r#"SELECT hash FROM blocks WHERE number = ? AND is_best_chain = TRUE"#)
219            .map_err(|err| CorruptedError::Internal(InternalError(err)))?
220            .query_row((block_number,), |row| row.get::<_, Vec<u8>>(0))
221            .optional()
222            .map_err(|err| CorruptedError::Internal(InternalError(err)))
223            .and_then(|value| {
224                let Some(value) = value else { return Ok(None) };
225                Ok(Some(
226                    <[u8; 32]>::try_from(&value[..])
227                        .map_err(|_| CorruptedError::InvalidBlockHashLen)?,
228                ))
229            })?;
230
231        Ok(result)
232    }
233
234    /// Returns a [`chain_information::ChainInformation`] struct containing the information about
235    /// the current finalized state of the chain.
236    ///
237    /// This method is relatively expensive and should preferably not be called repeatedly.
238    ///
239    /// In order to avoid race conditions, the known finalized block hash must be passed as
240    /// parameter. If the finalized block in the database doesn't match the hash passed as
241    /// parameter, most likely because it has been updated in a parallel thread, a
242    /// [`StorageAccessError::IncompleteStorage`] error is returned.
243    // TODO: an IncompleteStorage error doesn't seem appropriate; also, why is it even a problem given that the chain information contains the finalized block anyway
244    pub fn to_chain_information(
245        &self,
246        finalized_block_hash: &[u8; 32],
247    ) -> Result<chain_information::ValidChainInformation, StorageAccessError> {
248        if finalized_hash(&self.database.lock())? != *finalized_block_hash {
249            return Err(StorageAccessError::IncompleteStorage);
250        }
251
252        let mut builder = chain_information::build::ChainInformationBuild::new(
253            chain_information::build::Config {
254                finalized_block_header: chain_information::build::ConfigFinalizedBlockHeader::Any {
255                    scale_encoded_header: self
256                        .block_scale_encoded_header(finalized_block_hash)?
257                        .ok_or(StorageAccessError::UnknownBlock)?, // TODO: inappropriate error
258                    known_finality: None,
259                },
260                runtime: {
261                    let code = match self.block_storage_get(
262                        finalized_block_hash,
263                        iter::empty::<iter::Empty<_>>(),
264                        trie::bytes_to_nibbles(b":code".iter().copied()).map(u8::from),
265                    )? {
266                        Some((code, _)) => code,
267                        None => todo!(),
268                    };
269                    let heap_pages = match self.block_storage_get(
270                        &finalized_block_hash,
271                        iter::empty::<iter::Empty<_>>(),
272                        trie::bytes_to_nibbles(b":heappages".iter().copied()).map(u8::from),
273                    )? {
274                        Some((hp, _)) => Some(hp),
275                        None => None,
276                    };
277                    let Ok(heap_pages) =
278                        executor::storage_heap_pages_to_value(heap_pages.as_deref())
279                    else {
280                        todo!()
281                    };
282                    let Ok(runtime) = host::HostVmPrototype::new(host::Config {
283                        module: code,
284                        heap_pages,
285                        exec_hint:
286                            executor::vm::ExecHint::ExecuteOnceWithNonDeterministicValidation,
287                        allow_unresolved_imports: true,
288                    }) else {
289                        todo!()
290                    };
291                    runtime
292                },
293                block_number_bytes: self.block_number_bytes,
294            },
295        );
296
297        // TODO: this whole code is racy because the database isn't locked
298        loop {
299            match builder {
300                chain_information::build::ChainInformationBuild::Finished {
301                    result: Ok(chain_information),
302                    .. // TODO: runtime thrown away
303                } => return Ok(chain_information),
304                chain_information::build::ChainInformationBuild::Finished {
305                    result: Err(_),
306                    .. // TODO: runtime thrown away
307                } => todo!(),
308                chain_information::build::ChainInformationBuild::InProgress(
309                    chain_information::build::InProgress::StorageGet(val),
310                ) => {
311                    // TODO: child trie support
312                    let value = self.block_storage_get(finalized_block_hash, iter::empty::<iter::Empty<_>>(), trie::bytes_to_nibbles(val.key().as_ref().iter().copied()).map(u8::from))?;
313                    let value = match value {
314                        Some((val, vers)) => {
315                            Some((iter::once(val), chain_information::build::TrieEntryVersion::try_from(vers).map_err(|_| StorageAccessError::Corrupted(CorruptedError::InvalidTrieEntryVersion))?))
316                        }
317                        None => None
318                    };
319                    builder = val.inject_value(value);
320                }
321                chain_information::build::ChainInformationBuild::InProgress(
322                    chain_information::build::InProgress::NextKey(val),
323                ) => {
324                    // TODO: child trie support
325                    let nk = self.block_storage_next_key(finalized_block_hash, iter::empty::<iter::Empty<_>>(), val.key().map(u8::from),val.prefix().map(u8::from), val.branch_nodes())?;
326                    builder = val.inject_key(nk.map(|nibbles| nibbles.into_iter().map(|n| trie::Nibble::try_from(n).unwrap())));
327                }
328                chain_information::build::ChainInformationBuild::InProgress(
329                    chain_information::build::InProgress::ClosestDescendantMerkleValue(val),
330                ) => {
331                    // TODO: child trie support
332                    let mv = self.block_storage_closest_descendant_merkle_value(finalized_block_hash, iter::empty::<iter::Empty<_>>(), val.key().map(u8::from))?;
333                    builder = val.inject_merkle_value(mv.as_deref());
334                }
335            }
336        }
337    }
338
339    /// Insert a new block in the database.
340    ///
341    /// Must pass the header and body of the block.
342    ///
343    /// Blocks must be inserted in the correct order. An error is returned if the parent of the
344    /// newly-inserted block isn't present in the database.
345    ///
346    /// > **Note**: It is not necessary for the newly-inserted block to be a descendant of the
347    /// >           finalized block, unless `is_new_best` is true.
348    ///
349    pub fn insert<'a>(
350        &self,
351        scale_encoded_header: &[u8],
352        is_new_best: bool,
353        body: impl ExactSizeIterator<Item = impl AsRef<[u8]>>,
354    ) -> Result<(), InsertError> {
355        // Calculate the hash of the new best block.
356        let block_hash = header::hash_from_scale_encoded_header(scale_encoded_header);
357
358        // Decode the header, as we will need various information from it.
359        // TODO: this module shouldn't decode headers
360        let header = header::decode(scale_encoded_header, self.block_number_bytes)
361            .map_err(InsertError::BadHeader)?;
362
363        // Locking is performed as late as possible.
364        let mut database = self.database.lock();
365
366        // Start a transaction to insert everything at once.
367        let transaction = database
368            .transaction()
369            .map_err(|err| InsertError::Corrupted(CorruptedError::Internal(InternalError(err))))?;
370
371        // Make sure that the block to insert isn't already in the database.
372        if has_block(&transaction, &block_hash)? {
373            return Err(InsertError::Duplicate);
374        }
375
376        // Make sure that the parent of the block to insert is in the database.
377        if !has_block(&transaction, header.parent_hash)? {
378            return Err(InsertError::MissingParent);
379        }
380
381        transaction
382            .prepare_cached(
383                "INSERT INTO blocks(number, hash, parent_hash, state_trie_root_hash, header, is_best_chain, justification) VALUES (?, ?, ?, ?, ?, FALSE, NULL)",
384            )
385            .map_err(|err| CorruptedError::Internal(InternalError(err)))?
386            .execute((
387                i64::try_from(header.number).unwrap(),
388                &block_hash[..],
389                &header.parent_hash[..],
390                &header.state_root[..],
391                scale_encoded_header
392            ))
393            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
394
395        {
396            let mut statement = transaction
397                .prepare_cached("INSERT INTO blocks_body(hash, idx, extrinsic) VALUES (?, ?, ?)")
398                .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
399            for (index, item) in body.enumerate() {
400                statement
401                    .execute((
402                        &block_hash[..],
403                        i64::try_from(index).unwrap(),
404                        item.as_ref(),
405                    ))
406                    .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
407            }
408        }
409
410        // Change the best chain to be the new block.
411        if is_new_best {
412            // It would be illegal to change the best chain to not overlay with the
413            // finalized chain.
414            if header.number <= finalized_num(&transaction)? {
415                return Err(InsertError::BestNotInFinalizedChain);
416            }
417
418            set_best_chain(&transaction, &block_hash)?;
419        }
420
421        // If everything is successful, we commit.
422        transaction
423            .commit()
424            .map_err(|err| InsertError::Corrupted(CorruptedError::Internal(InternalError(err))))?;
425
426        Ok(())
427    }
428
429    // TODO: needs documentation
430    // TODO: should we refuse inserting disjoint storage nodes?
431    pub fn insert_trie_nodes<'a>(
432        &self,
433        new_trie_nodes: impl Iterator<Item = InsertTrieNode<'a>>,
434        trie_entries_version: u8,
435    ) -> Result<(), CorruptedError> {
436        let mut database = self.database.lock();
437
438        let transaction = database
439            .transaction()
440            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
441
442        {
443            // TODO: should check whether the existing merkle values that are referenced from inserted nodes exist in the parent's storage
444            // TODO: is it correct to have OR IGNORE everywhere?
445            let mut insert_node_statement = transaction
446                .prepare_cached("INSERT OR IGNORE INTO trie_node(hash, partial_key) VALUES(?, ?)")
447                .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
448            let mut insert_node_storage_statement = transaction
449                .prepare_cached("INSERT OR IGNORE INTO trie_node_storage(node_hash, value, trie_root_ref, trie_entry_version) VALUES(?, ?, ?, ?)")
450                .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
451            let mut insert_child_statement = transaction
452                .prepare_cached(
453                    "INSERT OR IGNORE INTO trie_node_child(hash, child_num, child_hash) VALUES(?, ?, ?)",
454                )
455                .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
456            // TODO: if the iterator's `next()` function accesses the database, we deadlock
457            for trie_node in new_trie_nodes {
458                assert!(trie_node.partial_key_nibbles.iter().all(|n| *n < 16)); // TODO: document
459                insert_node_statement
460                    .execute((&trie_node.merkle_value, trie_node.partial_key_nibbles))
461                    .map_err(|err: rusqlite::Error| CorruptedError::Internal(InternalError(err)))?;
462                match trie_node.storage_value {
463                    InsertTrieNodeStorageValue::Value {
464                        value,
465                        references_merkle_value,
466                    } => {
467                        insert_node_storage_statement
468                            .execute((
469                                &trie_node.merkle_value,
470                                if !references_merkle_value {
471                                    Some(&value)
472                                } else {
473                                    None
474                                },
475                                if references_merkle_value {
476                                    Some(&value)
477                                } else {
478                                    None
479                                },
480                                trie_entries_version,
481                            ))
482                            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
483                    }
484                    InsertTrieNodeStorageValue::NoValue => {}
485                }
486                for (child_num, child) in trie_node.children_merkle_values.iter().enumerate() {
487                    if let Some(child) = child {
488                        let child_num =
489                            vec![u8::try_from(child_num).unwrap_or_else(|_| unreachable!())];
490                        insert_child_statement
491                            .execute((&trie_node.merkle_value, child_num, child))
492                            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
493                    }
494                }
495            }
496        }
497
498        transaction
499            .commit()
500            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
501
502        Ok(())
503    }
504
505    /// Returns a list of trie nodes that are missing from the database and that belong to the
506    /// state of a block whose number is superior or equal to the finalized block.
507    ///
508    /// The ordering of the returned trie nodes is unspecified.
509    ///
510    /// > **Note**: This function call is relatively expensive, and the API user is expected to
511    /// >           cache the return value.
512    pub fn finalized_and_above_missing_trie_nodes_unordered(
513        &self,
514    ) -> Result<Vec<MissingTrieNode>, CorruptedError> {
515        let database = self.database.lock();
516
517        let mut statement = database
518            .prepare_cached(
519                r#"
520            WITH RECURSIVE
521                -- List of all block hashes that are equal to the finalized block or above.
522                finalized_and_above_blocks(block_hash) AS (
523                    SELECT blocks.hash
524                    FROM blocks
525                    JOIN meta ON meta.key = "finalized"
526                    WHERE blocks.number >= meta.value_number
527                ),
528
529                -- List of all trie nodes for these blocks.
530                trie_nodes(block_hash, node_hash, node_key, is_present) AS (
531                    SELECT  blocks.hash, blocks.state_trie_root_hash,
532                            CASE WHEN trie_node.partial_key IS NULL THEN X'' ELSE trie_node.partial_key END,
533                            trie_node.hash IS NOT NULL
534                        FROM blocks
535                        JOIN finalized_and_above_blocks
536                            ON blocks.hash = finalized_and_above_blocks.block_hash
537                        LEFT JOIN trie_node
538                            ON trie_node.hash = blocks.state_trie_root_hash
539
540                    UNION ALL
541                    SELECT  trie_nodes.block_hash, trie_node_child.child_hash,
542                            CASE WHEN trie_node.hash IS NULL THEN CAST(trie_nodes.node_key || trie_node_child.child_num AS BLOB)
543                            ELSE CAST(trie_nodes.node_key || trie_node_child.child_num || trie_node.partial_key AS BLOB) END,
544                            trie_node.hash IS NOT NULL
545                        FROM trie_nodes
546                        JOIN trie_node_child
547                            ON trie_nodes.node_hash = trie_node_child.hash
548                        LEFT JOIN trie_node
549                            ON trie_node.hash = trie_node_child.child_hash
550                        WHERE trie_nodes.is_present
551
552                    UNION ALL
553                    SELECT  trie_nodes.block_hash, trie_node_storage.trie_root_ref,
554                            CASE WHEN trie_node.hash IS NULL THEN CAST(trie_nodes.node_key || X'10' AS BLOB)
555                            ELSE CAST(trie_nodes.node_key || X'10' || trie_node.partial_key AS BLOB) END,
556                            trie_node.hash IS NOT NULL
557                        FROM trie_nodes
558                        JOIN trie_node_storage
559                            ON trie_nodes.node_hash = trie_node_storage.node_hash AND trie_node_storage.trie_root_ref IS NOT NULL
560                        LEFT JOIN trie_node
561                            ON trie_node.hash = trie_node_storage.trie_root_ref
562                        WHERE trie_nodes.is_present
563                )
564
565            SELECT group_concat(HEX(trie_nodes.block_hash)), group_concat(CAST(blocks.number as TEXT)), trie_nodes.node_hash, group_concat(HEX(trie_nodes.node_key))
566            FROM trie_nodes
567            JOIN blocks ON blocks.hash = trie_nodes.block_hash
568            WHERE is_present = false
569            GROUP BY trie_nodes.node_hash
570            "#)
571            .map_err(|err| {
572                CorruptedError::Internal(
573                    InternalError(err),
574                )
575            })?;
576
577        let results = statement
578            .query_map((), |row| {
579                let block_hashes = row.get::<_, String>(0)?;
580                let block_numbers = row.get::<_, String>(1)?;
581                let node_hash = row.get::<_, Vec<u8>>(2)?;
582                let node_keys = row.get::<_, String>(3)?;
583                Ok((block_hashes, block_numbers, node_hash, node_keys))
584            })
585            .map_err(|err| CorruptedError::Internal(InternalError(err)))?
586            .map(|row| {
587                let (block_hashes, block_numbers, trie_node_hash, node_keys) = match row {
588                    Ok(r) => r,
589                    Err(err) => return Err(CorruptedError::Internal(InternalError(err))),
590                };
591
592                // The SQL query above uses `group_concat` and `hex` to convert a list of blobs
593                // into a string containing all these blobs. We now convert them back into lists
594                // of blobs.
595                // A panic here indicates a bug in SQLite.
596                let mut block_hashes_iter = block_hashes
597                    .split(',')
598                    .map(|hash| hex::decode(hash).unwrap_or_else(|_| unreachable!()));
599                let mut block_numbers_iter = block_numbers.split(',').map(|n| {
600                    <u64 as core::str::FromStr>::from_str(n).unwrap_or_else(|_| unreachable!())
601                });
602                let mut node_keys_iter = node_keys
603                    .split(',')
604                    .map(|hash| hex::decode(hash).unwrap_or_else(|_| unreachable!()));
605
606                let mut blocks = Vec::with_capacity(32);
607                loop {
608                    match (
609                        block_hashes_iter.next(),
610                        block_numbers_iter.next(),
611                        node_keys_iter.next(),
612                    ) {
613                        (Some(hash), Some(number), Some(node_key)) => {
614                            let hash = <[u8; 32]>::try_from(hash)
615                                .map_err(|_| CorruptedError::InvalidBlockHashLen)?;
616                            let mut trie_node_key_nibbles = Vec::with_capacity(node_key.len());
617                            let mut parent_tries_paths_nibbles = Vec::with_capacity(node_key.len());
618                            for nibble in node_key {
619                                debug_assert!(nibble <= 16);
620                                if nibble == 16 {
621                                    parent_tries_paths_nibbles.push(trie_node_key_nibbles.clone());
622                                    trie_node_key_nibbles.clear();
623                                } else {
624                                    trie_node_key_nibbles.push(nibble);
625                                }
626                            }
627
628                            blocks.push(MissingTrieNodeBlock {
629                                hash,
630                                number,
631                                parent_tries_paths_nibbles,
632                                trie_node_key_nibbles,
633                            })
634                        }
635                        (None, None, None) => break,
636                        _ => {
637                            // The iterators are supposed to have the same number of elements.
638                            debug_assert!(false);
639                            break;
640                        }
641                    }
642                }
643
644                let trie_node_hash = <[u8; 32]>::try_from(trie_node_hash)
645                    .map_err(|_| CorruptedError::InvalidTrieHashLen)?;
646
647                debug_assert!(!blocks.is_empty());
648
649                Ok(MissingTrieNode {
650                    blocks,
651                    trie_node_hash,
652                })
653            })
654            .collect::<Result<Vec<_>, _>>()?;
655
656        Ok(results)
657    }
658
659    /// Changes the finalized block to the given one.
660    ///
661    /// The block must have been previously inserted using [`SqliteFullDatabase::insert`],
662    /// otherwise an error is returned.
663    ///
664    /// Blocks are expected to be valid in context of the chain. Inserting an invalid block can
665    /// result in the database being corrupted.
666    ///
667    /// The block must be a descendant of the current finalized block. Reverting finalization is
668    /// forbidden, as the database intentionally discards some information when finality is
669    /// applied.
670    ///
671    /// > **Note**: This function doesn't remove any block from the database but simply moves
672    /// >           the finalized block "cursor".
673    ///
674    pub fn set_finalized(
675        &self,
676        new_finalized_block_hash: &[u8; 32],
677    ) -> Result<(), SetFinalizedError> {
678        let mut database = self.database.lock();
679
680        // Start a transaction to insert everything at once.
681        let transaction = database.transaction().map_err(|err| {
682            SetFinalizedError::Corrupted(CorruptedError::Internal(InternalError(err)))
683        })?;
684
685        // Fetch the header of the block to finalize.
686        let new_finalized_header = block_header(&transaction, new_finalized_block_hash)?
687            .ok_or(SetFinalizedError::UnknownBlock)?;
688        let new_finalized_header = header::decode(&new_finalized_header, self.block_number_bytes)
689            .map_err(CorruptedError::BlockHeaderCorrupted)
690            .map_err(SetFinalizedError::Corrupted)?;
691
692        // Fetch the current finalized block.
693        let current_finalized = finalized_num(&transaction)?;
694
695        // If the block to finalize is at the same height as the already-finalized
696        // block, considering that the database only contains one block per height on
697        // the finalized chain, and that the presence of the block to finalize in
698        // the database has already been verified, it is guaranteed that the block
699        // to finalize is already the one already finalized.
700        // TODO: this comment is obsolete ^, should also compare the block hashes
701        if new_finalized_header.number == current_finalized {
702            return Ok(());
703        }
704
705        // Cannot set the finalized block to a past block. The database can't support
706        // reverting finalization.
707        if new_finalized_header.number < current_finalized {
708            return Err(SetFinalizedError::RevertForbidden);
709        }
710
711        // At this point, we are sure that the operation will succeed unless the database is
712        // corrupted.
713        // Update the finalized block in meta.
714        meta_set_number(&transaction, "finalized", new_finalized_header.number)?;
715
716        // It is possible that the best block has been pruned.
717        // TODO: ^ yeah, how do we handle that exactly ^ ?
718
719        // If everything went well up to this point, commit the transaction.
720        transaction.commit().map_err(|err| {
721            SetFinalizedError::Corrupted(CorruptedError::Internal(InternalError(err)))
722        })?;
723
724        Ok(())
725    }
726
727    /// Removes from the database all blocks that aren't a descendant of the current finalized
728    /// block.
729    pub fn purge_finality_orphans(&self) -> Result<(), CorruptedError> {
730        let mut database = self.database.lock();
731
732        // TODO: untested
733
734        let transaction = database
735            .transaction()
736            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
737
738        // Temporarily disable foreign key checks in order to make the insertion easier, as we
739        // don't have to make sure that trie nodes are sorted.
740        // Note that this is immediately disabled again when we `COMMIT` later down below.
741        // TODO: is this really necessary?
742        transaction
743            .execute("PRAGMA defer_foreign_keys = ON", ())
744            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
745
746        let current_finalized = finalized_num(&transaction)?;
747
748        let blocks = transaction
749            .prepare_cached(
750                r#"SELECT hash FROM blocks WHERE number <= ? AND is_best_chain = FALSE"#,
751            )
752            .map_err(|err| CorruptedError::Internal(InternalError(err)))?
753            .query_map((current_finalized,), |row| row.get::<_, Vec<u8>>(0))
754            .map_err(|err| CorruptedError::Internal(InternalError(err)))?
755            .collect::<Result<Vec<_>, _>>()
756            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
757
758        for block in blocks {
759            purge_block(&transaction, &block)?;
760        }
761
762        // If everything went well up to this point, commit the transaction.
763        transaction
764            .commit()
765            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
766
767        Ok(())
768    }
769
770    /// Returns the value associated with a node of the trie of the given block.
771    ///
772    /// `parent_tries_paths_nibbles` is a list of keys to follow in order to find the root of the
773    /// trie into which `key_nibbles` should be searched.
774    ///
775    /// Beware that both `parent_tries_paths_nibbles` and `key_nibbles` must yield *nibbles*, in
776    /// other words values strictly inferior to 16.
777    ///
778    /// Returns an error if the block or its storage can't be found in the database.
779    ///
780    /// # Panic
781    ///
782    /// Panics if any of the values yielded by `parent_tries_paths_nibbles` or `key_nibbles` is
783    /// superior or equal to 16.
784    ///
785    pub fn block_storage_get(
786        &self,
787        block_hash: &[u8; 32],
788        parent_tries_paths_nibbles: impl Iterator<Item = impl Iterator<Item = u8>>,
789        key_nibbles: impl Iterator<Item = u8>,
790    ) -> Result<Option<(Vec<u8>, u8)>, StorageAccessError> {
791        // Process the iterators at the very beginning and before locking the database, in order
792        // to avoid a deadlock in case the `next()` function of one of the iterators accesses
793        // the database as well.
794        let key_vectored = parent_tries_paths_nibbles
795            .flat_map(|t| t.inspect(|n| assert!(*n < 16)).chain(iter::once(0x10)))
796            .chain(key_nibbles.inspect(|n| assert!(*n < 16)))
797            .collect::<Vec<_>>();
798
799        let connection = self.database.lock();
800
801        // TODO: could be optimized by having a different request when `parent_tries_paths_nibbles` is empty and when it isn't
802        // TODO: trie_root_ref system untested
803        // TODO: infinite loop if there's a loop in the trie; detect this
804        let mut statement = connection
805            .prepare_cached(
806                r#"
807            WITH RECURSIVE
808                -- At the end of the recursive statement, `node_with_key` must always contain
809                -- one and exactly one item where `search_remain` is either empty or null. Empty
810                -- indicates that we have found a match, while null means that the search has
811                -- been interrupted due to a storage entry not being in the database. If
812                -- `search_remain` is empty, then `node_hash` is either a hash in case of a match
813                -- or null in case there is no entry with the requested key. If `search_remain`
814                -- is null, then `node_hash` is irrelevant.
815                --
816                -- In order to properly handle the situation where the key is empty, the initial
817                -- request of the recursive table building must check whether the partial key of
818                -- the root matches. In other words, all the entries of `node_with_key` (where
819                -- `node_hash` is non-null) contain entries that are known to be in the database
820                -- and after the partial key has already been verified to be correct.
821                node_with_key(node_hash, search_remain) AS (
822                        SELECT
823                            IIF(COALESCE(SUBSTR(:key, 1, LENGTH(trie_node.partial_key)), X'') = trie_node.partial_key, trie_node.hash, NULL),
824                            IIF(trie_node.partial_key IS NULL, NULL, COALESCE(SUBSTR(:key, 1 + LENGTH(trie_node.partial_key)), X''))
825                        FROM blocks
826                        LEFT JOIN trie_node ON blocks.state_trie_root_hash = trie_node.hash
827                        WHERE blocks.hash = :block_hash
828                    UNION ALL
829                    SELECT
830                        CASE
831                            WHEN HEX(SUBSTR(node_with_key.search_remain, 1, 1)) = '10' THEN trie_node_storage.trie_root_ref
832                            WHEN SUBSTR(node_with_key.search_remain, 2, LENGTH(trie_node.partial_key)) = trie_node.partial_key THEN trie_node_child.child_hash
833                            ELSE NULL END,
834                        CASE
835                            WHEN HEX(SUBSTR(node_with_key.search_remain, 1, 1)) = '10' THEN SUBSTR(node_with_key.search_remain, 1)
836                            WHEN trie_node_child.child_hash IS NULL THEN X''
837                            WHEN trie_node.partial_key IS NULL THEN NULL
838                            WHEN SUBSTR(node_with_key.search_remain, 2, LENGTH(trie_node.partial_key)) = trie_node.partial_key THEN SUBSTR(node_with_key.search_remain, 2 + LENGTH(trie_node.partial_key))
839                            ELSE X'' END
840                    FROM node_with_key
841                        LEFT JOIN trie_node_child
842                            ON node_with_key.node_hash = trie_node_child.hash
843                            AND SUBSTR(node_with_key.search_remain, 1, 1) = trie_node_child.child_num
844                        LEFT JOIN trie_node
845                            ON trie_node.hash = trie_node_child.child_hash
846                        LEFT JOIN trie_node_storage
847                            ON node_with_key.node_hash = trie_node_storage.node_hash
848                        WHERE LENGTH(node_with_key.search_remain) >= 1
849                )
850            SELECT COUNT(blocks.hash) >= 1, node_with_key.search_remain IS NULL, COALESCE(trie_node_storage.value, trie_node_storage.trie_root_ref), trie_node_storage.trie_entry_version
851            FROM blocks
852            JOIN node_with_key ON LENGTH(node_with_key.search_remain) = 0 OR node_with_key.search_remain IS NULL
853            LEFT JOIN trie_node_storage ON node_with_key.node_hash = trie_node_storage.node_hash AND node_with_key.search_remain IS NOT NULL
854            WHERE blocks.hash = :block_hash;
855            "#)
856            .map_err(|err| {
857                StorageAccessError::Corrupted(CorruptedError::Internal(
858                    InternalError(err),
859                ))
860            })?;
861
862        // In order to debug the SQL query above (for example in case of a failing test),
863        // uncomment this block:
864        //
865        /*println!("{:?}", {
866            let mut statement = connection
867                    .prepare_cached(
868                        r#"
869                    WITH RECURSIVE
870                        copy-paste the definition of node_with_key here
871
872                    SELECT * FROM node_with_key"#).unwrap();
873            statement
874                .query_map(
875                    rusqlite::named_params! {
876                        ":block_hash": &block_hash[..],
877                        ":key": key_vectored,
878                    },
879                    |row| {
880                        let node_hash = row.get::<_, Option<Vec<u8>>>(0)?.map(hex::encode);
881                        let search_remain = row.get::<_, Option<Vec<u8>>>(1)?;
882                        Ok((node_hash, search_remain))
883                    },
884                )
885                .unwrap()
886                .collect::<Vec<_>>()
887        });*/
888
889        let (has_block, incomplete_storage, value, trie_entry_version) = statement
890            .query_row(
891                rusqlite::named_params! {
892                    ":block_hash": &block_hash[..],
893                    ":key": key_vectored,
894                },
895                |row| {
896                    let has_block = row.get::<_, i64>(0)? != 0;
897                    let incomplete_storage = row.get::<_, i64>(1)? != 0;
898                    let value = row.get::<_, Option<Vec<u8>>>(2)?;
899                    let trie_entry_version = row.get::<_, Option<i64>>(3)?;
900                    Ok((has_block, incomplete_storage, value, trie_entry_version))
901                },
902            )
903            .map_err(|err| {
904                StorageAccessError::Corrupted(CorruptedError::Internal(InternalError(err)))
905            })?;
906
907        if !has_block {
908            return Err(StorageAccessError::UnknownBlock);
909        }
910
911        if incomplete_storage {
912            return Err(StorageAccessError::IncompleteStorage);
913        }
914
915        let Some(value) = value else { return Ok(None) };
916
917        let trie_entry_version = u8::try_from(trie_entry_version.unwrap())
918            .map_err(|_| CorruptedError::InvalidTrieEntryVersion)
919            .map_err(StorageAccessError::Corrupted)?;
920        Ok(Some((value, trie_entry_version)))
921    }
922
923    /// Returns the key in the storage that immediately follows or is equal to the key passed as
924    /// parameter in the storage of the block.
925    ///
926    /// `key_nibbles` must be an iterator to the **nibbles** of the key.
927    ///
928    /// `prefix_nibbles` must be an iterator to nibbles. If the result of the function wouldn't
929    /// start with this specific list of bytes, `None` is returned.
930    ///
931    /// `parent_tries_paths_nibbles` is a list of keys to follow in order to find the root of the
932    /// trie into which `key_nibbles` should be searched.
933    ///
934    /// Returns `None` if `parent_tries_paths_nibbles` didn't lead to any trie, or if there is no
935    /// next key.
936    ///
937    /// The key is returned in the same format as `key_nibbles`.
938    ///
939    /// If `branch_nodes` is `false`, then branch nodes (i.e. nodes with no value associated to
940    /// them) are ignored during the search.
941    ///
942    /// > **Note**: Contrary to many other similar functions in smoldot, there is no `or_equal`
943    /// >           parameter to this function. Instead, `or_equal` is implicitly `true`, and a
944    /// >           value of `false` can be easily emulated by appending a `0` at the end
945    /// >           of `key_nibbles`.
946    ///
947    /// # Panics
948    ///
949    /// Panics if any of the values yielded by `parent_tries_paths_nibbles`, `key_nibbles`, or
950    /// `prefix_nibbles` is superior or equal to 16.
951    ///
952    pub fn block_storage_next_key(
953        &self,
954        block_hash: &[u8; 32],
955        parent_tries_paths_nibbles: impl Iterator<Item = impl Iterator<Item = u8>>,
956        key_nibbles: impl Iterator<Item = u8>,
957        prefix_nibbles: impl Iterator<Item = u8>,
958        branch_nodes: bool,
959    ) -> Result<Option<Vec<u8>>, StorageAccessError> {
960        // Process the iterators at the very beginning and before locking the database, in order
961        // to avoid a deadlock in case the `next()` function of one of the iterators accesses
962        // the database as well.
963        let parent_tries_paths_nibbles = parent_tries_paths_nibbles
964            .flat_map(|t| t.inspect(|n| assert!(*n < 16)).chain(iter::once(0x10)))
965            .collect::<Vec<_>>();
966        let parent_tries_paths_nibbles_length = parent_tries_paths_nibbles.len();
967        let key_nibbles = {
968            let mut v = parent_tries_paths_nibbles.clone();
969            v.extend(key_nibbles.inspect(|n| assert!(*n < 16)));
970            v
971        };
972        let prefix_nibbles = {
973            let mut v = parent_tries_paths_nibbles;
974            v.extend(prefix_nibbles.inspect(|n| assert!(*n < 16)));
975            v
976        };
977
978        let connection = self.database.lock();
979
980        // Sorry for that extremely complicated SQL statement. While the logic isn't actually very
981        // complicated, we have to jump through many hoops in order to go around quirks in the
982        // SQL language.
983        // If you want to work on this SQL code, there is no miracle: write tests, and if a test
984        // fails debug the content of `next_key` to find out where the iteration doesn't behave
985        // as expected.
986        // TODO: this algorithm relies the fact that leaf nodes always have a storage value, which isn't exactly clear in the schema ; however not relying on this makes it way harder to write
987        // TODO: trie_root_ref system untested and most likely not working
988        // TODO: infinite loop if there's a loop in the trie; detect this
989        // TODO: could also check the prefix while iterating instead of only at the very end, which could maybe save many lookups
990        let mut statement = connection
991            .prepare_cached(
992                r#"
993            WITH RECURSIVE
994                -- We build a temporary table `next_key`, inserting entries one after one as we
995                -- descend the trie by trying to match entries with `:key`.
996                -- At each iteration, `node_hash` is the root where to continue the search,
997                -- `node_is_branch` is true if `node_hash` is a branch node, `node_full_key` is
998                -- the key of `node_hash` (that we build along the way) and serves as the final
999                -- result, and `key_search_remain` contains the `:key` that remains to be matched.
1000                -- Can also be NULL to indicate that the search ended because the node necessary to
1001                -- continue was missing from the database, in which case the values of
1002                -- `node_hash` and `node_is_branch` have irrelevant values, and the value of
1003                -- `node_full_key` is the "best known key".
1004                -- If `:skip_branches` is false, the search ends when `key_search_remain` is null
1005                -- or empty. If `:skip_branches` is true, the search ends when `key_search_remain`
1006                -- is null or empty and that `node_is_branch` is false.
1007                --
1008                -- `next_key` has zero elements if the block can't be found in the database or if
1009                -- the trie has no next key at all. These two situations need to be differentiated
1010                -- in the final SELECT statement.
1011                --
1012                -- When encountering a node, we follow both the child that exactly matches `:key`
1013                -- and also the first child that is strictly superior to `:key`. This is necessary
1014                -- because `:key` might be equal to something like `ffffffff...`, in which case the
1015                -- result will be after any equal match.
1016                -- This means that the number of entries in `next_key` at the end of the recursion
1017                -- is something like `2 * depth_in_trie(key)`.
1018                -- In order to obtain the final result, we take the entry in `next_key` with the
1019                -- minimal `node_full_key` amongst the ones that have finished the search.
1020                --
1021                -- Note that in the code below we do a lot of `COALESCE(SUBSTR(...), X'')`. This
1022                -- is because, for some reason, `SUBSTR(X'', ...)` always produces `NULL`. For this
1023                -- reason, it is also not possible to automatically pass NULL values
1024                -- through `SUSBTR`, and we have to use CASE/IIFs instead.
1025                next_key(node_hash, node_is_branch, node_full_key, key_search_remain) AS (
1026                        SELECT
1027                            CASE
1028                                WHEN trie_node.hash IS NULL
1029                                    THEN NULL
1030                                WHEN COALESCE(SUBSTR(:key, 1, LENGTH(trie_node.partial_key)), X'') <= trie_node.partial_key
1031                                    THEN trie_node.hash
1032                                ELSE
1033                                    NULL
1034                            END,
1035                            trie_node_storage.value IS NULL AND trie_node_storage.trie_root_ref IS NULL,
1036                            COALESCE(trie_node.partial_key, X''),
1037                            CASE
1038                                WHEN trie_node.partial_key IS NULL
1039                                    THEN NULL
1040                                WHEN COALESCE(SUBSTR(:key, 1, LENGTH(trie_node.partial_key)), X'') <= trie_node.partial_key
1041                                    THEN COALESCE(SUBSTR(:key, 1 + LENGTH(trie_node.partial_key)), X'')
1042                                ELSE
1043                                    X''   -- The partial key is strictly inferior to `:key`
1044                            END
1045                        FROM blocks
1046                        LEFT JOIN trie_node ON trie_node.hash = blocks.state_trie_root_hash
1047                        LEFT JOIN trie_node_storage ON trie_node_storage.node_hash = trie_node.hash
1048                        WHERE blocks.hash = :block_hash
1049
1050                    UNION ALL
1051                        SELECT
1052                            COALESCE(trie_node.hash, trie_node_trieref.hash),
1053                            trie_node_storage.value IS NULL AND trie_node_storage.trie_root_ref IS NULL,
1054                            CASE
1055                                WHEN trie_node_child.child_num IS NULL
1056                                    THEN next_key.node_full_key
1057                                WHEN trie_node.partial_key IS NULL AND trie_node_trieref.partial_key IS NULL
1058                                    THEN CAST(next_key.node_full_key || trie_node_child.child_num AS BLOB)
1059                                ELSE
1060                                    CAST(next_key.node_full_key || trie_node_child.child_num || COALESCE(trie_node.partial_key, trie_node_trieref.partial_key) AS BLOB)
1061                            END,
1062                            CASE
1063                                WHEN trie_node_child.child_num IS NOT NULL AND trie_node.partial_key IS NULL
1064                                    THEN NULL    -- Child exists but is missing from database
1065                                WHEN HEX(SUBSTR(next_key.key_search_remain, 1, 1)) = '10' AND trie_node_trieref.hash IS NULL
1066                                    THEN NULL    -- Trie reference exists but is missing from database
1067                                WHEN SUBSTR(next_key.key_search_remain, 1, 1) = trie_node_child.child_num AND SUBSTR(next_key.key_search_remain, 2, LENGTH(trie_node.partial_key)) = trie_node.partial_key
1068                                    THEN SUBSTR(next_key.key_search_remain, 2 + LENGTH(trie_node.partial_key))    -- Equal match, continue iterating
1069                                WHEN SUBSTR(next_key.key_search_remain, 1, 1) = trie_node_child.child_num AND SUBSTR(next_key.key_search_remain, 2, LENGTH(trie_node.partial_key)) < trie_node.partial_key
1070                                    THEN X''     -- Searched key is before the node we are iterating to, thus we cut the search short
1071                                WHEN HEX(SUBSTR(next_key.key_search_remain, 1, 1)) = '10' AND COALESCE(SUBSTR(next_key.key_search_remain, 2, LENGTH(trie_node_trieref.partial_key)), X'') = trie_node_trieref.partial_key
1072                                    THEN COALESCE(SUBSTR(next_key.key_search_remain, 2 + LENGTH(trie_node_trieref.partial_key)), X'')
1073                                ELSE
1074                                    X''          -- Shouldn't be reachable.
1075                            END
1076                        FROM next_key
1077
1078                        LEFT JOIN trie_node_child
1079                            ON next_key.node_hash = trie_node_child.hash
1080                            AND CASE WHEN LENGTH(next_key.key_search_remain) = 0 THEN TRUE
1081                                ELSE SUBSTR(next_key.key_search_remain, 1, 1) <= trie_node_child.child_num END
1082                        LEFT JOIN trie_node ON trie_node.hash = trie_node_child.child_hash
1083
1084                        -- We want to keep only situations where `trie_node_child` is either
1085                        -- equal to the key, or the first child strictly superior to the key. In
1086                        -- order to do that, we try to find another child that is strictly
1087                        -- in-between the key and `trie_node_child`. In the `WHERE` clause at the
1088                        -- bottom, we only keep rows where `trie_node_child_before` is NULL.
1089                        LEFT JOIN trie_node_child AS trie_node_child_before
1090                            ON next_key.node_hash = trie_node_child_before.hash
1091                            AND trie_node_child_before.child_num < trie_node_child.child_num
1092                            AND (next_key.key_search_remain = X'' OR trie_node_child_before.child_num > SUBSTR(next_key.key_search_remain, 1, 1))
1093
1094                        LEFT JOIN trie_node_storage AS trie_node_storage_trieref
1095                            ON HEX(SUBSTR(next_key.key_search_remain, 1, 1)) = '10' AND next_key.node_hash = trie_node_storage_trieref.node_hash AND trie_node_storage_trieref.trie_root_ref IS NOT NULL
1096                        LEFT JOIN trie_node AS trie_node_trieref
1097                            ON trie_node_trieref.hash = trie_node_storage_trieref.node_hash
1098                            AND COALESCE(SUBSTR(next_key.key_search_remain, 2, LENGTH(trie_node_trieref.partial_key)), X'') <= trie_node_trieref.partial_key
1099
1100                        LEFT JOIN trie_node_storage
1101                            ON trie_node_storage.node_hash = COALESCE(trie_node.hash, trie_node_trieref.hash)
1102
1103                        WHERE
1104                            -- Don't pull items that have already finished searching.
1105                            next_key.node_hash IS NOT NULL AND next_key.key_search_remain IS NOT NULL AND (next_key.key_search_remain != X'' OR (next_key.node_is_branch AND :skip_branches))
1106                            -- See explanation above.
1107                            AND trie_node_child_before.hash IS NULL
1108                            -- Don't generate an item if there's nowhere to go to.
1109                            AND (HEX(SUBSTR(next_key.key_search_remain, 1, 1)) = '10' OR trie_node_child.child_num IS NOT NULL)
1110                            -- Stop iterating if the child's partial key is before the searched key.
1111                            AND (trie_node.hash IS NULL OR NOT (COALESCE(SUBSTR(next_key.key_search_remain, 1, 1), X'') = trie_node_child.child_num AND COALESCE(SUBSTR(next_key.key_search_remain, 2, LENGTH(trie_node.partial_key)), X'') > trie_node.partial_key))
1112                ),
1113
1114                -- Now keep only the entries of `next_key` which have finished iterating.
1115                terminal_next_key(incomplete_storage, node_full_key, output) AS (
1116                    SELECT
1117                        CASE
1118                            WHEN COALESCE(SUBSTR(node_full_key, 1, LENGTH(:prefix)), X'') != :prefix THEN FALSE
1119                            ELSE key_search_remain IS NULL
1120                        END,
1121                        node_full_key,
1122                        CASE
1123                            WHEN node_hash IS NULL THEN NULL
1124                            WHEN COALESCE(SUBSTR(node_full_key, 1, LENGTH(:prefix)), X'') = :prefix THEN node_full_key
1125                            ELSE NULL
1126                        END
1127                    FROM next_key
1128                    WHERE key_search_remain IS NULL OR (LENGTH(key_search_remain) = 0 AND (NOT :skip_branches OR NOT node_is_branch))
1129                )
1130
1131            SELECT
1132                COUNT(blocks.hash) >= 1,
1133                COALESCE(terminal_next_key.incomplete_storage, FALSE),
1134                terminal_next_key.output
1135            FROM blocks
1136            LEFT JOIN terminal_next_key
1137            WHERE blocks.hash = :block_hash
1138                -- We pick the entry of `terminal_next_key` with the smallest full key. Note that
1139                -- it might seem like a good idea to not using any GROUP BY and instead just do
1140                -- `ORDER BY node_full_key ASC LIMIT 1`, but doing so sometimes leads to SQLite
1141                -- not picking the entry with the smallest full key for a reason I couldn't
1142                -- figure out.
1143                AND (terminal_next_key.node_full_key IS NULL OR terminal_next_key.node_full_key = (SELECT MIN(node_full_key) FROM terminal_next_key))
1144            LIMIT 1"#,
1145            )
1146            .map_err(|err| {
1147                StorageAccessError::Corrupted(CorruptedError::Internal(
1148                    InternalError(err),
1149                ))
1150            })?;
1151
1152        // In order to debug the SQL query above (for example in case of a failing test),
1153        // uncomment this block:
1154        //
1155        /*println!("{:?}", {
1156            let mut statement = connection
1157                    .prepare_cached(
1158                        r#"
1159                    WITH RECURSIVE
1160                        copy-paste the definition of next_key here
1161
1162                    SELECT * FROM next_key"#).unwrap();
1163            statement
1164                .query_map(
1165                    rusqlite::named_params! {
1166                        ":block_hash": &block_hash[..],
1167                        ":key": key_nibbles,
1168                        //":prefix": prefix_nibbles,
1169                        ":skip_branches": !branch_nodes
1170                    },
1171                    |row| {
1172                        let node_hash = row.get::<_, Option<Vec<u8>>>(0)?.map(hex::encode);
1173                        let node_is_branch = row.get::<_, Option<i64>>(1)?.map(|n| n != 0);
1174                        let node_full_key = row.get::<_, Option<Vec<u8>>>(2)?;
1175                        let search_remain = row.get::<_, Option<Vec<u8>>>(3)?;
1176                        Ok((node_hash, node_is_branch, node_full_key, search_remain))
1177                    },
1178                )
1179                .unwrap()
1180                .collect::<Vec<_>>()
1181        });*/
1182
1183        let result = statement
1184            .query_row(
1185                rusqlite::named_params! {
1186                    ":block_hash": &block_hash[..],
1187                    ":key": key_nibbles,
1188                    ":prefix": prefix_nibbles,
1189                    ":skip_branches": !branch_nodes
1190                },
1191                |row| {
1192                    let block_is_known = row.get::<_, i64>(0)? != 0;
1193                    let incomplete_storage = row.get::<_, i64>(1)? != 0;
1194                    let next_key = row.get::<_, Option<Vec<u8>>>(2)?;
1195                    Ok((block_is_known, incomplete_storage, next_key))
1196                },
1197            )
1198            .optional()
1199            .map_err(|err| {
1200                StorageAccessError::Corrupted(CorruptedError::Internal(InternalError(err)))
1201            })?;
1202
1203        let Some((block_is_known, incomplete_storage, mut next_key)) = result else {
1204            return Ok(None);
1205        };
1206
1207        if !block_is_known {
1208            return Err(StorageAccessError::UnknownBlock);
1209        }
1210
1211        if incomplete_storage {
1212            return Err(StorageAccessError::IncompleteStorage);
1213        }
1214
1215        if parent_tries_paths_nibbles_length != 0 {
1216            next_key = next_key.map(|nk| nk[parent_tries_paths_nibbles_length..].to_vec());
1217        }
1218
1219        Ok(next_key)
1220    }
1221
1222    /// Returns the Merkle value of the trie node in the storage that is the closest descendant
1223    /// of the provided key.
1224    ///
1225    /// `key_nibbles` must be an iterator to the **nibbles** of the key.
1226    ///
1227    /// `parent_tries_paths_nibbles` is a list of keys to follow in order to find the root of the
1228    /// trie into which `key_nibbles` should be searched.
1229    ///
1230    /// Returns `None` if `parent_tries_paths_nibbles` didn't lead to any trie, or if there is no
1231    /// such descendant.
1232    ///
1233    /// # Panics
1234    ///
1235    /// Panics if any of the values yielded by `parent_tries_paths_nibbles` or `key_nibbles` is
1236    /// superior or equal to 16.
1237    ///
1238    pub fn block_storage_closest_descendant_merkle_value(
1239        &self,
1240        block_hash: &[u8; 32],
1241        parent_tries_paths_nibbles: impl Iterator<Item = impl Iterator<Item = u8>>,
1242        key_nibbles: impl Iterator<Item = u8>,
1243    ) -> Result<Option<Vec<u8>>, StorageAccessError> {
1244        // Process the iterators at the very beginning and before locking the database, in order
1245        // to avoid a deadlock in case the `next()` function of one of the iterators accesses
1246        // the database as well.
1247        let key_vectored = parent_tries_paths_nibbles
1248            .flat_map(|t| t.inspect(|n| assert!(*n < 16)).chain(iter::once(0x10)))
1249            .chain(key_nibbles.inspect(|n| assert!(*n < 16)))
1250            .collect::<Vec<_>>();
1251
1252        let connection = self.database.lock();
1253
1254        // TODO: trie_root_ref system untested
1255        // TODO: infinite loop if there's a loop in the trie; detect this
1256        let mut statement = connection
1257            .prepare_cached(
1258                r#"
1259            WITH RECURSIVE
1260                -- At the end of the recursive statement, `closest_descendant` must always contain
1261                -- at most one item where `search_remain` is either empty or null. Empty
1262                -- indicates that we have found a match, while null means that the search has
1263                -- been interrupted due to a storage entry not being in the database. If
1264                -- `search_remain` is null, then `node_hash` is irrelevant.
1265                -- If `closest_descendant` doesn't have any entry where `search_remain` is empty
1266                -- or null, then the request key doesn't have any descendant.
1267                closest_descendant(node_hash, search_remain) AS (
1268                    SELECT
1269                            blocks.state_trie_root_hash,
1270                            CASE
1271                                WHEN trie_node.partial_key IS NULL AND LENGTH(:key) = 0
1272                                    THEN X''   -- Trie root node isn't in database, but since key is empty we have a match anyway
1273                                WHEN trie_node.partial_key IS NULL AND LENGTH(:key) != 0
1274                                    THEN NULL  -- Trie root node isn't in database and we can't iterate further
1275                                ELSE
1276                                    COALESCE(SUBSTR(:key, 1 + LENGTH(trie_node.partial_key)), X'')
1277                            END
1278                        FROM blocks
1279                        LEFT JOIN trie_node ON blocks.state_trie_root_hash = trie_node.hash
1280                        WHERE blocks.hash = :block_hash
1281                            AND (
1282                                trie_node.partial_key IS NULL
1283                                OR COALESCE(SUBSTR(trie_node.partial_key, 1, LENGTH(:key)), X'') = :key
1284                                OR COALESCE(SUBSTR(:key, 1, LENGTH(trie_node.partial_key)), X'') = trie_node.partial_key
1285                            )
1286
1287                    UNION ALL
1288                    SELECT
1289                            COALESCE(trie_node_child.child_hash, trie_node_storage.trie_root_ref),
1290                            CASE
1291                                WHEN trie_node_child.child_hash IS NULL AND HEX(SUBSTR(closest_descendant.search_remain, 1, 1)) != '10'
1292                                    THEN X''      -- No child matching the key.
1293                                WHEN trie_node_child.child_hash IS NOT NULL AND trie_node.hash IS NULL AND LENGTH(closest_descendant.search_remain) = 1
1294                                    THEN X''      -- Descendant node not in trie but we know that it's the result.
1295                                WHEN trie_node_child.child_hash IS NOT NULL AND trie_node.hash IS NULL
1296                                    THEN NULL     -- Descendant node not in trie.
1297                                WHEN COALESCE(SUBSTR(trie_node.partial_key, 1, LENGTH(closest_descendant.search_remain) - 1), X'') = COALESCE(SUBSTR(closest_descendant.search_remain, 2), X'')
1298                                        OR COALESCE(SUBSTR(closest_descendant.search_remain, 2, LENGTH(trie_node.partial_key)), X'') = trie_node.partial_key
1299                                    THEN SUBSTR(closest_descendant.search_remain, 2 + LENGTH(trie_node.partial_key))
1300                                ELSE
1301                                    X''           -- Unreachable.
1302                            END
1303                        FROM closest_descendant
1304                        LEFT JOIN trie_node_child ON closest_descendant.node_hash = trie_node_child.hash
1305                            AND SUBSTR(closest_descendant.search_remain, 1, 1) = trie_node_child.child_num
1306                        LEFT JOIN trie_node ON trie_node.hash = trie_node_child.child_hash
1307                        LEFT JOIN trie_node_storage
1308                            ON closest_descendant.node_hash = trie_node_storage.node_hash
1309                            AND HEX(SUBSTR(closest_descendant.search_remain, 1, 1)) = '10'
1310                            AND trie_node_storage.trie_root_ref IS NOT NULL
1311                        WHERE
1312                            LENGTH(closest_descendant.search_remain) >= 1
1313                            AND (
1314                                trie_node.hash IS NULL
1315                                OR COALESCE(SUBSTR(trie_node.partial_key, 1, LENGTH(closest_descendant.search_remain) - 1), X'') = COALESCE(SUBSTR(closest_descendant.search_remain, 2), X'')
1316                                OR COALESCE(SUBSTR(closest_descendant.search_remain, 2, LENGTH(trie_node.partial_key)), X'') = trie_node.partial_key
1317                            )
1318                )
1319            SELECT COUNT(blocks.hash) >= 1, closest_descendant.node_hash IS NOT NULL AND closest_descendant.search_remain IS NULL, closest_descendant.node_hash
1320            FROM blocks
1321            LEFT JOIN closest_descendant ON LENGTH(closest_descendant.search_remain) = 0 OR closest_descendant.search_remain IS NULL
1322            WHERE blocks.hash = :block_hash
1323            LIMIT 1"#,
1324            )
1325            .map_err(|err| {
1326                StorageAccessError::Corrupted(CorruptedError::Internal(
1327                    InternalError(err),
1328                ))
1329            })?;
1330
1331        // In order to debug the SQL query above (for example in case of a failing test),
1332        // uncomment this block:
1333        //
1334        /*println!("{:?}", {
1335            let mut statement = connection
1336                    .prepare_cached(
1337                        r#"
1338                    WITH RECURSIVE
1339                        copy-paste the definition of closest_descendant here
1340
1341                    SELECT * FROM closest_descendant"#).unwrap();
1342            statement
1343                .query_map(
1344                    rusqlite::named_params! {
1345                        ":block_hash": &block_hash[..],
1346                        ":key": key_vectored,
1347                    },
1348                    |row| {
1349                        let node_hash = row.get::<_, Option<Vec<u8>>>(0)?.map(hex::encode);
1350                        let search_remain = row.get::<_, Option<Vec<u8>>>(1)?;
1351                        Ok((node_hash, search_remain))
1352                    },
1353                )
1354                .unwrap()
1355                .collect::<Vec<_>>()
1356        });*/
1357
1358        let (has_block, incomplete_storage, merkle_value) = statement
1359            .query_row(
1360                rusqlite::named_params! {
1361                    ":block_hash": &block_hash[..],
1362                    ":key": key_vectored,
1363                },
1364                |row| {
1365                    let has_block = row.get::<_, i64>(0)? != 0;
1366                    let incomplete_storage = row.get::<_, i64>(1)? != 0;
1367                    let merkle_value = row.get::<_, Option<Vec<u8>>>(2)?;
1368                    Ok((has_block, incomplete_storage, merkle_value))
1369                },
1370            )
1371            .map_err(|err| {
1372                StorageAccessError::Corrupted(CorruptedError::Internal(InternalError(err)))
1373            })?;
1374
1375        if !has_block {
1376            return Err(StorageAccessError::UnknownBlock);
1377        }
1378
1379        if incomplete_storage {
1380            return Err(StorageAccessError::IncompleteStorage);
1381        }
1382
1383        Ok(merkle_value)
1384    }
1385
1386    /// Inserts a block in the database and sets it as the finalized block.
1387    ///
1388    /// The parent of the block doesn't need to be present in the database.
1389    ///
1390    /// If the block is already in the database, it is replaced by the one provided.
1391    pub fn reset<'a>(
1392        &self,
1393        finalized_block_header: &[u8],
1394        finalized_block_body: impl ExactSizeIterator<Item = &'a [u8]>,
1395        finalized_block_justification: Option<Vec<u8>>,
1396    ) -> Result<(), CorruptedError> {
1397        // Start a transaction to insert everything in one go.
1398        let mut database = self.database.lock();
1399        let transaction = database
1400            .transaction()
1401            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1402
1403        // Temporarily disable foreign key checks in order to make the initial insertion easier,
1404        // as we don't have to make sure that trie nodes are sorted.
1405        // Note that this is immediately disabled again when we `COMMIT`.
1406        transaction
1407            .execute("PRAGMA defer_foreign_keys = ON", ())
1408            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1409
1410        let finalized_block_hash = header::hash_from_scale_encoded_header(finalized_block_header);
1411        // TODO: this module shouldn't decode blocks
1412        let decoded = header::decode(finalized_block_header, self.block_number_bytes).unwrap();
1413
1414        transaction
1415            .prepare_cached(
1416                "INSERT OR REPLACE INTO blocks(hash, parent_hash, state_trie_root_hash, number, header, is_best_chain, justification) VALUES(?, ?, ?, ?, ?, TRUE, ?)",
1417            )
1418            .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1419            .execute((
1420                &finalized_block_hash[..],
1421                if decoded.number != 0 {
1422                    Some(&decoded.parent_hash[..])
1423                } else { None },
1424                &decoded.state_root[..],
1425                i64::try_from(decoded.number).unwrap(),
1426                finalized_block_header,
1427                finalized_block_justification.as_deref(),
1428            ))
1429            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1430
1431        transaction
1432            .execute(
1433                "DELETE FROM blocks_body WHERE hash = ?",
1434                (&finalized_block_hash[..],),
1435            )
1436            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1437
1438        {
1439            let mut statement = transaction
1440                .prepare_cached(
1441                    "INSERT OR IGNORE INTO blocks_body(hash, idx, extrinsic) VALUES(?, ?, ?)",
1442                )
1443                .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1444            for (index, item) in finalized_block_body.enumerate() {
1445                statement
1446                    .execute((
1447                        &finalized_block_hash[..],
1448                        i64::try_from(index).unwrap(),
1449                        item,
1450                    ))
1451                    .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1452            }
1453        }
1454
1455        meta_set_blob(&transaction, "best", &finalized_block_hash[..])?;
1456        meta_set_number(&transaction, "finalized", decoded.number)?;
1457
1458        transaction
1459            .commit()
1460            .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1461
1462        Ok(())
1463    }
1464}
1465
1466impl fmt::Debug for SqliteFullDatabase {
1467    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1468        f.debug_tuple("SqliteFullDatabase").finish()
1469    }
1470}
1471
1472impl Drop for SqliteFullDatabase {
1473    fn drop(&mut self) {
1474        if !std::thread::panicking() {
1475            // The SQLite documentation recommends running `PRAGMA optimize` when the database
1476            // closes.
1477            // TODO: it is also recommended to do this every 2 hours
1478            let _ = self.database.get_mut().execute("PRAGMA optimize", ());
1479        }
1480    }
1481}
1482
1483/// See [`SqliteFullDatabase::finalized_and_above_missing_trie_nodes_unordered`].
1484#[derive(Debug)]
1485pub struct MissingTrieNode {
1486    /// Blocks the trie node is known to belong to.
1487    ///
1488    /// Guaranteed to never be empty.
1489    ///
1490    /// Only contains blocks whose number is superior or equal to the latest finalized block
1491    /// number.
1492    pub blocks: Vec<MissingTrieNodeBlock>,
1493    /// Hash of the missing trie node.
1494    pub trie_node_hash: [u8; 32],
1495}
1496
1497/// See [`MissingTrieNode::blocks`].
1498#[derive(Debug)]
1499pub struct MissingTrieNodeBlock {
1500    /// Hash of the block.
1501    pub hash: [u8; 32],
1502    /// Height of the block.
1503    pub number: u64,
1504    /// Path of the parent tries leading to the trie node.
1505    pub parent_tries_paths_nibbles: Vec<Vec<u8>>,
1506    /// Nibbles that compose the key of the trie node.
1507    pub trie_node_key_nibbles: Vec<u8>,
1508}
1509
1510pub struct InsertTrieNode<'a> {
1511    pub merkle_value: Cow<'a, [u8]>,
1512    pub partial_key_nibbles: Cow<'a, [u8]>,
1513    pub children_merkle_values: [Option<Cow<'a, [u8]>>; 16],
1514    pub storage_value: InsertTrieNodeStorageValue<'a>,
1515}
1516
1517pub enum InsertTrieNodeStorageValue<'a> {
1518    NoValue,
1519    Value {
1520        value: Cow<'a, [u8]>,
1521        /// If `true`, the value is equal to the Merkle value of the root of another trie.
1522        references_merkle_value: bool,
1523    },
1524}
1525
1526/// Error while calling [`SqliteFullDatabase::insert`].
1527#[derive(Debug, derive_more::Display, derive_more::Error, derive_more::From)]
1528pub enum InsertError {
1529    /// Error accessing the database.
1530    #[display("{_0}")]
1531    Corrupted(CorruptedError),
1532    /// Block was already in the database.
1533    Duplicate,
1534    /// Error when decoding the header to import.
1535    #[display("Failed to decode header: {_0}")]
1536    BadHeader(header::Error),
1537    /// Parent of the block to insert isn't in the database.
1538    MissingParent,
1539    /// The new best block would be outside of the finalized chain.
1540    BestNotInFinalizedChain,
1541}
1542
1543/// Error while calling [`SqliteFullDatabase::set_finalized`].
1544#[derive(Debug, derive_more::Display, derive_more::Error, derive_more::From)]
1545pub enum SetFinalizedError {
1546    /// Error accessing the database.
1547    Corrupted(CorruptedError),
1548    /// New finalized block isn't in the database.
1549    UnknownBlock,
1550    /// New finalized block must be a child of the previous finalized block.
1551    RevertForbidden,
1552}
1553
1554/// Error while accessing the storage of the finalized block.
1555#[derive(Debug, derive_more::Display, derive_more::Error, derive_more::From)]
1556pub enum StorageAccessError {
1557    /// Error accessing the database.
1558    Corrupted(CorruptedError),
1559    /// Some trie nodes of the storage of the requested block hash are missing.
1560    IncompleteStorage,
1561    /// Requested block couldn't be found in the database.
1562    UnknownBlock,
1563}
1564
1565/// Error in the content of the database.
1566// TODO: document and see if any entry is unused
1567#[derive(Debug, derive_more::Display, derive_more::Error)]
1568pub enum CorruptedError {
1569    /// Block numbers are expected to be 64 bits.
1570    // TODO: remove this and use stronger schema
1571    InvalidNumber,
1572    /// Finalized block number stored in the database doesn't match any block.
1573    InvalidFinalizedNum,
1574    /// A block hash is expected to be 32 bytes. This isn't the case.
1575    InvalidBlockHashLen,
1576    /// A trie hash is expected to be 32 bytes. This isn't the case.
1577    InvalidTrieHashLen,
1578    /// The parent of a block in the database couldn't be found in that same database.
1579    BrokenChain,
1580    /// Missing a key in the `meta` table.
1581    MissingMetaKey,
1582    /// Some parts of the database refer to a block by its hash, but the block's constituents
1583    /// couldn't be found.
1584    MissingBlockHeader,
1585    /// The header of a block in the database has failed to decode.
1586    #[display("Corrupted block header: {_0}")]
1587    BlockHeaderCorrupted(header::Error),
1588    /// The version information about a storage entry has failed to decode.
1589    InvalidTrieEntryVersion,
1590    #[display("Internal error: {_0}")]
1591    Internal(InternalError),
1592}
1593
1594/// Low-level database error, such as an error while accessing the file system.
1595#[derive(Debug, derive_more::Display, derive_more::Error)]
1596pub struct InternalError(rusqlite::Error);
1597
1598fn meta_get_blob(
1599    database: &rusqlite::Connection,
1600    key: &str,
1601) -> Result<Option<Vec<u8>>, CorruptedError> {
1602    let value = database
1603        .prepare_cached(r#"SELECT value_blob FROM meta WHERE key = ?"#)
1604        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1605        .query_row((key,), |row| row.get::<_, Vec<u8>>(0))
1606        .optional()
1607        .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1608    Ok(value)
1609}
1610
1611fn meta_get_number(
1612    database: &rusqlite::Connection,
1613    key: &str,
1614) -> Result<Option<u64>, CorruptedError> {
1615    let value = database
1616        .prepare_cached(r#"SELECT value_number FROM meta WHERE key = ?"#)
1617        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1618        .query_row((key,), |row| row.get::<_, i64>(0))
1619        .optional()
1620        .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1621    Ok(value.map(|value| u64::from_ne_bytes(value.to_ne_bytes())))
1622}
1623
1624fn meta_set_blob(
1625    database: &rusqlite::Connection,
1626    key: &str,
1627    value: &[u8],
1628) -> Result<(), CorruptedError> {
1629    database
1630        .prepare_cached(r#"INSERT OR REPLACE INTO meta(key, value_blob) VALUES (?, ?)"#)
1631        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1632        .execute((key, value))
1633        .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1634    Ok(())
1635}
1636
1637fn meta_set_number(
1638    database: &rusqlite::Connection,
1639    key: &str,
1640    value: u64,
1641) -> Result<(), CorruptedError> {
1642    database
1643        .prepare_cached(r#"INSERT OR REPLACE INTO meta(key, value_number) VALUES (?, ?)"#)
1644        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1645        .execute((key, i64::from_ne_bytes(value.to_ne_bytes())))
1646        .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1647    Ok(())
1648}
1649
1650fn has_block(database: &rusqlite::Connection, hash: &[u8]) -> Result<bool, CorruptedError> {
1651    database
1652        .prepare_cached(r#"SELECT COUNT(*) FROM blocks WHERE hash = ?"#)
1653        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1654        .query_row((hash,), |row| Ok(row.get_unwrap::<_, i64>(0) != 0))
1655        .map_err(|err| CorruptedError::Internal(InternalError(err)))
1656}
1657
1658// TODO: the fact that the meta table stores blobs makes it impossible to use joins ; fix that
1659fn finalized_num(database: &rusqlite::Connection) -> Result<u64, CorruptedError> {
1660    meta_get_number(database, "finalized")?.ok_or(CorruptedError::MissingMetaKey)
1661}
1662
1663fn finalized_hash(database: &rusqlite::Connection) -> Result<[u8; 32], CorruptedError> {
1664    let value = database
1665        .prepare_cached(r#"SELECT hash FROM blocks WHERE number = (SELECT value_number FROM meta WHERE key = "finalized")"#)
1666        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1667        .query_row((), |row| row.get::<_, Vec<u8>>(0))
1668        .optional()
1669        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1670        .ok_or(CorruptedError::InvalidFinalizedNum)?;
1671
1672    if value.len() == 32 {
1673        let mut out = [0; 32];
1674        out.copy_from_slice(&value);
1675        Ok(out)
1676    } else {
1677        Err(CorruptedError::InvalidBlockHashLen)
1678    }
1679}
1680
1681fn block_hashes_by_number(
1682    database: &rusqlite::Connection,
1683    number: u64,
1684) -> Result<Vec<[u8; 32]>, CorruptedError> {
1685    let number = match i64::try_from(number) {
1686        Ok(n) => n,
1687        Err(_) => return Ok(Vec::new()),
1688    };
1689
1690    database
1691        .prepare_cached(r#"SELECT hash FROM blocks WHERE number = ?"#)
1692        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1693        .query_map((number,), |row| row.get::<_, Vec<u8>>(0))
1694        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1695        .map(|value| {
1696            let value = value.map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1697            <[u8; 32]>::try_from(&value[..]).map_err(|_| CorruptedError::InvalidBlockHashLen)
1698        })
1699        .collect::<Result<Vec<_>, _>>()
1700}
1701
1702fn block_header(
1703    database: &rusqlite::Connection,
1704    hash: &[u8; 32],
1705) -> Result<Option<Vec<u8>>, CorruptedError> {
1706    database
1707        .prepare_cached(r#"SELECT header FROM blocks WHERE hash = ?"#)
1708        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1709        .query_row((&hash[..],), |row| row.get::<_, Vec<u8>>(0))
1710        .optional()
1711        .map_err(|err| CorruptedError::Internal(InternalError(err)))
1712}
1713
1714fn set_best_chain(
1715    database: &rusqlite::Connection,
1716    new_best_block_hash: &[u8],
1717) -> Result<(), CorruptedError> {
1718    // TODO: can this not be embedded in the SQL statement below?
1719    let current_best = meta_get_blob(database, "best")?.ok_or(CorruptedError::MissingMetaKey)?;
1720
1721    // TODO: untested except in the most basic situation
1722    // In the SQL below, the temporary table `changes` is built by walking down (highest to lowest
1723    // block number) the new best chain and old best chain. While walking down, the iteration
1724    // keeps track of the block hashes and their number. If the new best chain has a higher number
1725    // than the old best chain, then only the new best chain is iterated, and vice versa. If the
1726    // new and old best chain have the same number, they are both iterated, and it is possible to
1727    // compare the block hashes in order to know when to stop iterating. In the context of this
1728    // algorithm, a `NULL` block hash represents "one past the new/old best block", which allows
1729    // to not include the new/old best block in the temporary table until it needs to be included.
1730    database
1731        .prepare_cached(
1732            r#"
1733        WITH RECURSIVE
1734            changes(block_to_include, block_to_retract, block_to_include_number, block_to_retract_number) AS (
1735                SELECT NULL, NULL, blocks_inc.number + 1, blocks_ret.number + 1
1736                FROM blocks AS blocks_inc, blocks as blocks_ret
1737                WHERE blocks_inc.hash = :new_best AND blocks_ret.hash = :current_best
1738            UNION ALL
1739                SELECT
1740                    CASE WHEN changes.block_to_include_number >= changes.block_to_retract_number THEN
1741                        COALESCE(blocks_inc.parent_hash, :new_best)
1742                    ELSE
1743                        changes.block_to_include
1744                    END,
1745                    CASE WHEN changes.block_to_retract_number >= changes.block_to_include_number THEN
1746                        COALESCE(blocks_ret.parent_hash, :current_best)
1747                    ELSE
1748                        changes.block_to_retract
1749                    END,
1750                    CASE WHEN changes.block_to_include_number >= block_to_retract_number THEN changes.block_to_include_number - 1
1751                    ELSE changes.block_to_include_number END,
1752                    CASE WHEN changes.block_to_retract_number >= changes.block_to_include_number THEN changes.block_to_retract_number - 1
1753                    ELSE changes.block_to_retract_number END
1754                FROM changes
1755                LEFT JOIN blocks AS blocks_inc ON blocks_inc.hash = changes.block_to_include
1756                LEFT JOIN blocks AS blocks_ret ON blocks_ret.hash = changes.block_to_retract
1757                WHERE changes.block_to_include_number != changes.block_to_retract_number
1758                    OR COALESCE(blocks_inc.parent_hash, :new_best) != COALESCE(blocks_ret.parent_hash, :current_best)
1759            )
1760        UPDATE blocks SET is_best_chain = (blocks.hash = changes.block_to_include)
1761        FROM changes
1762        WHERE blocks.hash = changes.block_to_include OR blocks.hash = changes.block_to_retract;
1763            "#,
1764        )
1765        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1766        .execute(rusqlite::named_params! {
1767            ":current_best": current_best,
1768            ":new_best": new_best_block_hash
1769        })
1770        .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1771
1772    meta_set_blob(database, "best", new_best_block_hash)?;
1773    Ok(())
1774}
1775
1776fn purge_block(database: &rusqlite::Connection, hash: &[u8]) -> Result<(), CorruptedError> {
1777    purge_block_storage(database, hash)?;
1778    database
1779        .prepare_cached("DELETE FROM blocks_body WHERE hash = ?")
1780        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1781        .execute((hash,))
1782        .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1783    database
1784        .prepare_cached("DELETE FROM blocks WHERE hash = ?")
1785        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1786        .execute((hash,))
1787        .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1788    Ok(())
1789}
1790
1791fn purge_block_storage(database: &rusqlite::Connection, hash: &[u8]) -> Result<(), CorruptedError> {
1792    // TODO: untested
1793
1794    let state_trie_root_hash = database
1795        .prepare_cached(r#"SELECT state_trie_root_hash FROM blocks WHERE hash = ?"#)
1796        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1797        .query_row((hash,), |row| row.get::<_, Vec<u8>>(0))
1798        .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1799
1800    database
1801        .prepare_cached(
1802            r#"
1803            UPDATE blocks SET state_trie_root_hash = NULL
1804            WHERE hash = :block_hash
1805        "#,
1806        )
1807        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1808        .execute(rusqlite::named_params! {
1809            ":block_hash": hash,
1810        })
1811        .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1812
1813    // TODO: doesn't delete everything in the situation where a single node with a merkle value is referenced multiple times from the same trie
1814    // TODO: currently doesn't follow `trie_root_ref`
1815    database
1816        .prepare_cached(r#"
1817            WITH RECURSIVE
1818                to_delete(node_hash) AS (
1819                    SELECT trie_node.hash
1820                        FROM trie_node
1821                        LEFT JOIN blocks ON blocks.hash != :block_hash AND blocks.state_trie_root_hash = trie_node.hash
1822                        LEFT JOIN trie_node_storage ON trie_node_storage.trie_root_ref = trie_node.hash
1823                        WHERE trie_node.hash = :state_trie_root_hash AND blocks.hash IS NULL AND trie_node_storage.node_hash IS NULL
1824                    UNION ALL
1825                    SELECT trie_node_child.child_hash
1826                        FROM to_delete
1827                        JOIN trie_node_child ON trie_node_child.hash = to_delete.node_hash
1828                        LEFT JOIN blocks ON blocks.state_trie_root_hash = trie_node_child.child_hash
1829                        LEFT JOIN trie_node_storage ON trie_node_storage.trie_root_ref = to_delete.node_hash
1830                        WHERE blocks.hash IS NULL AND trie_node_storage.node_hash IS NULL
1831                )
1832            DELETE FROM trie_node
1833            WHERE hash IN (SELECT node_hash FROM to_delete)
1834        "#)
1835        .map_err(|err| CorruptedError::Internal(InternalError(err)))?
1836        .execute(rusqlite::named_params! {
1837            ":state_trie_root_hash": &state_trie_root_hash,
1838            ":block_hash": hash,
1839        })
1840        .map_err(|err| CorruptedError::Internal(InternalError(err)))?;
1841    Ok(())
1842}