sov_db/ledger_db/
mod.rs

1use std::path::Path;
2use std::sync::{Arc, Mutex};
3
4use serde::Serialize;
5use sov_rollup_interface::services::da::SlotData;
6use sov_rollup_interface::stf::{BatchReceipt, Event};
7use sov_schema_db::{Schema, SchemaBatch, SeekKeyEncoder, DB};
8
9use crate::rocks_db_config::gen_rocksdb_options;
10use crate::schema::tables::{
11    BatchByHash, BatchByNumber, EventByKey, EventByNumber, SlotByHash, SlotByNumber, TxByHash,
12    TxByNumber, LEDGER_TABLES,
13};
14use crate::schema::types::{
15    split_tx_for_storage, BatchNumber, EventNumber, SlotNumber, StoredBatch, StoredSlot,
16    StoredTransaction, TxNumber,
17};
18
19mod rpc;
20
21const LEDGER_DB_PATH_SUFFIX: &str = "ledger";
22
23#[derive(Clone, Debug)]
24/// A database which stores the ledger history (slots, transactions, events, etc).
25/// Ledger data is first ingested into an in-memory map before being fed to the state-transition function.
26/// Once the state-transition function has been executed and finalized, the results are committed to the final db
27pub struct LedgerDB {
28    /// The database which stores the committed ledger. Uses an optimized layout which
29    /// requires transactions to be executed before being committed.
30    db: Arc<DB>,
31    next_item_numbers: Arc<Mutex<ItemNumbers>>,
32    slot_subscriptions: tokio::sync::broadcast::Sender<u64>,
33}
34
35/// A SlotNumber, BatchNumber, TxNumber, and EventNumber which are grouped together, typically representing
36/// the respective heights at the start or end of slot processing.
37#[derive(Default, Clone, Debug)]
38#[cfg_attr(feature = "arbitrary", derive(proptest_derive::Arbitrary))]
39pub struct ItemNumbers {
40    /// The slot number
41    pub slot_number: u64,
42    /// The batch number
43    pub batch_number: u64,
44    /// The transaction number
45    pub tx_number: u64,
46    /// The event number
47    pub event_number: u64,
48}
49
50/// All of the data to be committed to the ledger db for a single slot.
51#[derive(Debug)]
52pub struct SlotCommit<S: SlotData, B, T> {
53    slot_data: S,
54    batch_receipts: Vec<BatchReceipt<B, T>>,
55    num_txs: usize,
56    num_events: usize,
57}
58
59impl<S: SlotData, B, T> SlotCommit<S, B, T> {
60    /// Returns a reference to the commit's slot_data
61    pub fn slot_data(&self) -> &S {
62        &self.slot_data
63    }
64
65    /// Returns a reference to the commit's batch_receipts
66    pub fn batch_receipts(&self) -> &[BatchReceipt<B, T>] {
67        &self.batch_receipts
68    }
69
70    /// Create a new SlotCommit from the given slot data
71    pub fn new(slot_data: S) -> Self {
72        Self {
73            slot_data,
74            batch_receipts: vec![],
75            num_txs: 0,
76            num_events: 0,
77        }
78    }
79    /// Add a `batch` (of transactions) to the commit
80    pub fn add_batch(&mut self, batch: BatchReceipt<B, T>) {
81        self.num_txs += batch.tx_receipts.len();
82        let events_this_batch: usize = batch.tx_receipts.iter().map(|r| r.events.len()).sum();
83        self.batch_receipts.push(batch);
84        self.num_events += events_this_batch;
85    }
86}
87
88impl LedgerDB {
89    /// Open a [`LedgerDB`] (backed by RocksDB) at the specified path.
90    /// The returned instance will be at the path `{path}/ledger-db`.
91    pub fn with_path(path: impl AsRef<Path>) -> Result<Self, anyhow::Error> {
92        let path = path.as_ref().join(LEDGER_DB_PATH_SUFFIX);
93        let inner = DB::open(
94            path,
95            "ledger-db",
96            LEDGER_TABLES.iter().copied(),
97            &gen_rocksdb_options(&Default::default(), false),
98        )?;
99
100        let next_item_numbers = ItemNumbers {
101            slot_number: Self::last_version_written(&inner, SlotByNumber)?.unwrap_or_default() + 1,
102            batch_number: Self::last_version_written(&inner, BatchByNumber)?.unwrap_or_default()
103                + 1,
104            tx_number: Self::last_version_written(&inner, TxByNumber)?.unwrap_or_default() + 1,
105            event_number: Self::last_version_written(&inner, EventByNumber)?.unwrap_or_default()
106                + 1,
107        };
108
109        Ok(Self {
110            db: Arc::new(inner),
111            next_item_numbers: Arc::new(Mutex::new(next_item_numbers)),
112            slot_subscriptions: tokio::sync::broadcast::channel(10).0,
113        })
114    }
115
116    /// Get the next slot, block, transaction, and event numbers
117    pub fn get_next_items_numbers(&self) -> ItemNumbers {
118        self.next_item_numbers.lock().unwrap().clone()
119    }
120
121    /// Gets all slots with numbers `range.start` to `range.end`. If `range.end` is outside
122    /// the range of the database, the result will smaller than the requested range.
123    /// Note that this method blindly preallocates for the requested range, so it should not be exposed
124    /// directly via rpc.
125    pub(crate) fn _get_slot_range(
126        &self,
127        range: &std::ops::Range<SlotNumber>,
128    ) -> Result<Vec<StoredSlot>, anyhow::Error> {
129        self.get_data_range::<SlotByNumber, _, _>(range)
130    }
131
132    /// Gets all batches with numbers `range.start` to `range.end`. If `range.end` is outside
133    /// the range of the database, the result will smaller than the requested range.
134    /// Note that this method blindly preallocates for the requested range, so it should not be exposed
135    /// directly via rpc.
136    pub(crate) fn get_batch_range(
137        &self,
138        range: &std::ops::Range<BatchNumber>,
139    ) -> Result<Vec<StoredBatch>, anyhow::Error> {
140        self.get_data_range::<BatchByNumber, _, _>(range)
141    }
142
143    /// Gets all transactions with numbers `range.start` to `range.end`. If `range.end` is outside
144    /// the range of the database, the result will smaller than the requested range.
145    /// Note that this method blindly preallocates for the requested range, so it should not be exposed
146    /// directly via rpc.
147    pub(crate) fn get_tx_range(
148        &self,
149        range: &std::ops::Range<TxNumber>,
150    ) -> Result<Vec<StoredTransaction>, anyhow::Error> {
151        self.get_data_range::<TxByNumber, _, _>(range)
152    }
153
154    /// Gets all data with identifier in `range.start` to `range.end`. If `range.end` is outside
155    /// the range of the database, the result will smaller than the requested range.
156    /// Note that this method blindly preallocates for the requested range, so it should not be exposed
157    /// directly via rpc.
158    fn get_data_range<T, K, V>(&self, range: &std::ops::Range<K>) -> Result<Vec<V>, anyhow::Error>
159    where
160        T: Schema<Key = K, Value = V>,
161        K: Into<u64> + Copy + SeekKeyEncoder<T>,
162    {
163        let mut raw_iter = self.db.iter()?;
164        let max_items = (range.end.into() - range.start.into()) as usize;
165        raw_iter.seek(&range.start)?;
166        let iter = raw_iter.take(max_items);
167        let mut out = Vec::with_capacity(max_items);
168        for res in iter {
169            let (_, batch) = res?;
170            out.push(batch)
171        }
172        Ok(out)
173    }
174
175    fn put_slot(
176        &self,
177        slot: &StoredSlot,
178        slot_number: &SlotNumber,
179        schema_batch: &mut SchemaBatch,
180    ) -> Result<(), anyhow::Error> {
181        schema_batch.put::<SlotByNumber>(slot_number, slot)?;
182        schema_batch.put::<SlotByHash>(&slot.hash, slot_number)
183    }
184
185    fn put_batch(
186        &self,
187        batch: &StoredBatch,
188        batch_number: &BatchNumber,
189        schema_batch: &mut SchemaBatch,
190    ) -> Result<(), anyhow::Error> {
191        schema_batch.put::<BatchByNumber>(batch_number, batch)?;
192        schema_batch.put::<BatchByHash>(&batch.hash, batch_number)
193    }
194
195    fn put_transaction(
196        &self,
197        tx: &StoredTransaction,
198        tx_number: &TxNumber,
199        schema_batch: &mut SchemaBatch,
200    ) -> Result<(), anyhow::Error> {
201        schema_batch.put::<TxByNumber>(tx_number, tx)?;
202        schema_batch.put::<TxByHash>(&tx.hash, tx_number)
203    }
204
205    fn put_event(
206        &self,
207        event: &Event,
208        event_number: &EventNumber,
209        tx_number: TxNumber,
210        schema_batch: &mut SchemaBatch,
211    ) -> Result<(), anyhow::Error> {
212        schema_batch.put::<EventByNumber>(event_number, event)?;
213        schema_batch.put::<EventByKey>(&(event.key().clone(), tx_number, *event_number), &())
214    }
215
216    /// Commits a slot to the database by inserting its events, transactions, and batches before
217    /// inserting the slot metadata.
218    pub fn commit_slot<S: SlotData, B: Serialize, T: Serialize>(
219        &self,
220        data_to_commit: SlotCommit<S, B, T>,
221    ) -> Result<(), anyhow::Error> {
222        // Create a scope to ensure that the lock is released before we commit to the db
223        let mut current_item_numbers = {
224            let mut next_item_numbers = self.next_item_numbers.lock().unwrap();
225            let item_numbers = next_item_numbers.clone();
226            next_item_numbers.slot_number += 1;
227            next_item_numbers.batch_number += data_to_commit.batch_receipts.len() as u64;
228            next_item_numbers.tx_number += data_to_commit.num_txs as u64;
229            next_item_numbers.event_number += data_to_commit.num_events as u64;
230            item_numbers
231            // The lock is released here
232        };
233
234        let mut schema_batch = SchemaBatch::new();
235
236        let first_batch_number = current_item_numbers.batch_number;
237        let last_batch_number = first_batch_number + data_to_commit.batch_receipts.len() as u64;
238        // Insert data from "bottom up" to ensure consistency if the application crashes during insertion
239        for batch_receipt in data_to_commit.batch_receipts.into_iter() {
240            let first_tx_number = current_item_numbers.tx_number;
241            let last_tx_number = first_tx_number + batch_receipt.tx_receipts.len() as u64;
242            // Insert transactions and events from each batch before inserting the batch
243            for tx in batch_receipt.tx_receipts.into_iter() {
244                let (tx_to_store, events) =
245                    split_tx_for_storage(tx, current_item_numbers.event_number);
246                for event in events.into_iter() {
247                    self.put_event(
248                        &event,
249                        &EventNumber(current_item_numbers.event_number),
250                        TxNumber(current_item_numbers.tx_number),
251                        &mut schema_batch,
252                    )?;
253                    current_item_numbers.event_number += 1;
254                }
255                self.put_transaction(
256                    &tx_to_store,
257                    &TxNumber(current_item_numbers.tx_number),
258                    &mut schema_batch,
259                )?;
260                current_item_numbers.tx_number += 1;
261            }
262
263            // Insert batch
264            let batch_to_store = StoredBatch {
265                hash: batch_receipt.batch_hash,
266                txs: TxNumber(first_tx_number)..TxNumber(last_tx_number),
267                custom_receipt: bincode::serialize(&batch_receipt.inner)
268                    .expect("serialization to vec is infallible")
269                    .into(),
270            };
271            self.put_batch(
272                &batch_to_store,
273                &BatchNumber(current_item_numbers.batch_number),
274                &mut schema_batch,
275            )?;
276            current_item_numbers.batch_number += 1;
277        }
278
279        // Once all batches are inserted, Insert slot
280        let slot_to_store = StoredSlot {
281            hash: data_to_commit.slot_data.hash(),
282            // TODO: Add a method to the slot data trait allowing additional data to be stored
283            extra_data: vec![].into(),
284            batches: BatchNumber(first_batch_number)..BatchNumber(last_batch_number),
285        };
286        self.put_slot(
287            &slot_to_store,
288            &SlotNumber(current_item_numbers.slot_number),
289            &mut schema_batch,
290        )?;
291
292        self.db.write_schemas(schema_batch)?;
293
294        // Notify subscribers. This call returns an error IFF there are no subscribers, so we don't need to check the result
295        let _ = self
296            .slot_subscriptions
297            .send(current_item_numbers.slot_number);
298
299        Ok(())
300    }
301
302    fn last_version_written<T: Schema<Key = U>, U: Into<u64>>(
303        db: &DB,
304        _schema: T,
305    ) -> anyhow::Result<Option<u64>> {
306        let mut iter = db.iter::<T>()?;
307        iter.seek_to_last();
308
309        match iter.next() {
310            Some(Ok((version, _))) => Ok(Some(version.into())),
311            Some(Err(e)) => Err(e),
312            _ => Ok(None),
313        }
314    }
315
316    /// Get the most recent committed slot, if any
317    pub fn get_head_slot(&self) -> anyhow::Result<Option<(SlotNumber, StoredSlot)>> {
318        let mut iter = self.db.iter::<SlotByNumber>()?;
319        iter.seek_to_last();
320
321        match iter.next() {
322            Some(Ok((slot_number, slot))) => Ok(Some((slot_number, slot))),
323            Some(Err(e)) => Err(e),
324            _ => Ok(None),
325        }
326    }
327}
328
329#[cfg(feature = "arbitrary")]
330pub mod arbitrary {
331    //! Arbitrary definitions for the [`LedgerDB`].
332
333    use core::ops::{Deref, DerefMut};
334
335    use proptest::strategy::LazyJust;
336    use tempfile::TempDir;
337
338    use super::*;
339
340    /// Arbitrary instance of [`LedgerDB`].
341    ///
342    /// This is a db wrapper bound to its temporary path that will be deleted once the type is
343    /// dropped.
344    #[derive(Debug)]
345    pub struct ArbitraryLedgerDB {
346        /// The underlying RocksDB instance.
347        pub db: LedgerDB,
348        /// The temporary directory used to create the [`LedgerDB`].
349        pub path: TempDir,
350    }
351
352    /// A fallible, arbitrary instance of [`LedgerDB`].
353    ///
354    /// This type is suitable for operations that are expected to be infallible. The internal
355    /// implementation of the db requires I/O to create the temporary dir, making it fallible.
356    #[derive(Debug)]
357    pub struct FallibleArbitraryLedgerDB {
358        /// The result of the new db instance.
359        pub result: anyhow::Result<ArbitraryLedgerDB>,
360    }
361
362    impl Deref for ArbitraryLedgerDB {
363        type Target = LedgerDB;
364
365        fn deref(&self) -> &Self::Target {
366            &self.db
367        }
368    }
369
370    impl DerefMut for ArbitraryLedgerDB {
371        fn deref_mut(&mut self) -> &mut Self::Target {
372            &mut self.db
373        }
374    }
375
376    impl<'a> ::arbitrary::Arbitrary<'a> for ArbitraryLedgerDB {
377        fn arbitrary(_u: &mut ::arbitrary::Unstructured<'a>) -> ::arbitrary::Result<Self> {
378            let path = TempDir::new().map_err(|_| ::arbitrary::Error::NotEnoughData)?;
379            let db = LedgerDB::with_path(&path).map_err(|_| ::arbitrary::Error::IncorrectFormat)?;
380            Ok(Self { db, path })
381        }
382    }
383
384    impl proptest::arbitrary::Arbitrary for FallibleArbitraryLedgerDB {
385        type Parameters = ();
386        type Strategy = LazyJust<Self, fn() -> FallibleArbitraryLedgerDB>;
387
388        fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
389            fn gen() -> FallibleArbitraryLedgerDB {
390                FallibleArbitraryLedgerDB {
391                    result: TempDir::new()
392                        .map_err(|e| {
393                            anyhow::anyhow!(format!("failed to generate path for LedgerDB: {e}"))
394                        })
395                        .and_then(|path| {
396                            let db = LedgerDB::with_path(&path)?;
397                            Ok(ArbitraryLedgerDB { db, path })
398                        }),
399                }
400            }
401            LazyJust::new(gen)
402        }
403    }
404
405    impl<'a> ::arbitrary::Arbitrary<'a> for ItemNumbers {
406        fn arbitrary(u: &mut ::arbitrary::Unstructured<'a>) -> ::arbitrary::Result<Self> {
407            Ok(ItemNumbers {
408                slot_number: u.arbitrary()?,
409                batch_number: u.arbitrary()?,
410                tx_number: u.arbitrary()?,
411                event_number: u.arbitrary()?,
412            })
413        }
414    }
415}