1use std::path::Path;
2use std::sync::{Arc, Mutex};
3
4use serde::Serialize;
5use sov_rollup_interface::services::da::SlotData;
6use sov_rollup_interface::stf::{BatchReceipt, Event};
7use sov_schema_db::{Schema, SchemaBatch, SeekKeyEncoder, DB};
8
9use crate::rocks_db_config::gen_rocksdb_options;
10use crate::schema::tables::{
11 BatchByHash, BatchByNumber, EventByKey, EventByNumber, SlotByHash, SlotByNumber, TxByHash,
12 TxByNumber, LEDGER_TABLES,
13};
14use crate::schema::types::{
15 split_tx_for_storage, BatchNumber, EventNumber, SlotNumber, StoredBatch, StoredSlot,
16 StoredTransaction, TxNumber,
17};
18
19mod rpc;
20
21const LEDGER_DB_PATH_SUFFIX: &str = "ledger";
22
23#[derive(Clone, Debug)]
24pub struct LedgerDB {
28 db: Arc<DB>,
31 next_item_numbers: Arc<Mutex<ItemNumbers>>,
32 slot_subscriptions: tokio::sync::broadcast::Sender<u64>,
33}
34
35#[derive(Default, Clone, Debug)]
38#[cfg_attr(feature = "arbitrary", derive(proptest_derive::Arbitrary))]
39pub struct ItemNumbers {
40 pub slot_number: u64,
42 pub batch_number: u64,
44 pub tx_number: u64,
46 pub event_number: u64,
48}
49
50#[derive(Debug)]
52pub struct SlotCommit<S: SlotData, B, T> {
53 slot_data: S,
54 batch_receipts: Vec<BatchReceipt<B, T>>,
55 num_txs: usize,
56 num_events: usize,
57}
58
59impl<S: SlotData, B, T> SlotCommit<S, B, T> {
60 pub fn slot_data(&self) -> &S {
62 &self.slot_data
63 }
64
65 pub fn batch_receipts(&self) -> &[BatchReceipt<B, T>] {
67 &self.batch_receipts
68 }
69
70 pub fn new(slot_data: S) -> Self {
72 Self {
73 slot_data,
74 batch_receipts: vec![],
75 num_txs: 0,
76 num_events: 0,
77 }
78 }
79 pub fn add_batch(&mut self, batch: BatchReceipt<B, T>) {
81 self.num_txs += batch.tx_receipts.len();
82 let events_this_batch: usize = batch.tx_receipts.iter().map(|r| r.events.len()).sum();
83 self.batch_receipts.push(batch);
84 self.num_events += events_this_batch;
85 }
86}
87
88impl LedgerDB {
89 pub fn with_path(path: impl AsRef<Path>) -> Result<Self, anyhow::Error> {
92 let path = path.as_ref().join(LEDGER_DB_PATH_SUFFIX);
93 let inner = DB::open(
94 path,
95 "ledger-db",
96 LEDGER_TABLES.iter().copied(),
97 &gen_rocksdb_options(&Default::default(), false),
98 )?;
99
100 let next_item_numbers = ItemNumbers {
101 slot_number: Self::last_version_written(&inner, SlotByNumber)?.unwrap_or_default() + 1,
102 batch_number: Self::last_version_written(&inner, BatchByNumber)?.unwrap_or_default()
103 + 1,
104 tx_number: Self::last_version_written(&inner, TxByNumber)?.unwrap_or_default() + 1,
105 event_number: Self::last_version_written(&inner, EventByNumber)?.unwrap_or_default()
106 + 1,
107 };
108
109 Ok(Self {
110 db: Arc::new(inner),
111 next_item_numbers: Arc::new(Mutex::new(next_item_numbers)),
112 slot_subscriptions: tokio::sync::broadcast::channel(10).0,
113 })
114 }
115
116 pub fn get_next_items_numbers(&self) -> ItemNumbers {
118 self.next_item_numbers.lock().unwrap().clone()
119 }
120
121 pub(crate) fn _get_slot_range(
126 &self,
127 range: &std::ops::Range<SlotNumber>,
128 ) -> Result<Vec<StoredSlot>, anyhow::Error> {
129 self.get_data_range::<SlotByNumber, _, _>(range)
130 }
131
132 pub(crate) fn get_batch_range(
137 &self,
138 range: &std::ops::Range<BatchNumber>,
139 ) -> Result<Vec<StoredBatch>, anyhow::Error> {
140 self.get_data_range::<BatchByNumber, _, _>(range)
141 }
142
143 pub(crate) fn get_tx_range(
148 &self,
149 range: &std::ops::Range<TxNumber>,
150 ) -> Result<Vec<StoredTransaction>, anyhow::Error> {
151 self.get_data_range::<TxByNumber, _, _>(range)
152 }
153
154 fn get_data_range<T, K, V>(&self, range: &std::ops::Range<K>) -> Result<Vec<V>, anyhow::Error>
159 where
160 T: Schema<Key = K, Value = V>,
161 K: Into<u64> + Copy + SeekKeyEncoder<T>,
162 {
163 let mut raw_iter = self.db.iter()?;
164 let max_items = (range.end.into() - range.start.into()) as usize;
165 raw_iter.seek(&range.start)?;
166 let iter = raw_iter.take(max_items);
167 let mut out = Vec::with_capacity(max_items);
168 for res in iter {
169 let (_, batch) = res?;
170 out.push(batch)
171 }
172 Ok(out)
173 }
174
175 fn put_slot(
176 &self,
177 slot: &StoredSlot,
178 slot_number: &SlotNumber,
179 schema_batch: &mut SchemaBatch,
180 ) -> Result<(), anyhow::Error> {
181 schema_batch.put::<SlotByNumber>(slot_number, slot)?;
182 schema_batch.put::<SlotByHash>(&slot.hash, slot_number)
183 }
184
185 fn put_batch(
186 &self,
187 batch: &StoredBatch,
188 batch_number: &BatchNumber,
189 schema_batch: &mut SchemaBatch,
190 ) -> Result<(), anyhow::Error> {
191 schema_batch.put::<BatchByNumber>(batch_number, batch)?;
192 schema_batch.put::<BatchByHash>(&batch.hash, batch_number)
193 }
194
195 fn put_transaction(
196 &self,
197 tx: &StoredTransaction,
198 tx_number: &TxNumber,
199 schema_batch: &mut SchemaBatch,
200 ) -> Result<(), anyhow::Error> {
201 schema_batch.put::<TxByNumber>(tx_number, tx)?;
202 schema_batch.put::<TxByHash>(&tx.hash, tx_number)
203 }
204
205 fn put_event(
206 &self,
207 event: &Event,
208 event_number: &EventNumber,
209 tx_number: TxNumber,
210 schema_batch: &mut SchemaBatch,
211 ) -> Result<(), anyhow::Error> {
212 schema_batch.put::<EventByNumber>(event_number, event)?;
213 schema_batch.put::<EventByKey>(&(event.key().clone(), tx_number, *event_number), &())
214 }
215
216 pub fn commit_slot<S: SlotData, B: Serialize, T: Serialize>(
219 &self,
220 data_to_commit: SlotCommit<S, B, T>,
221 ) -> Result<(), anyhow::Error> {
222 let mut current_item_numbers = {
224 let mut next_item_numbers = self.next_item_numbers.lock().unwrap();
225 let item_numbers = next_item_numbers.clone();
226 next_item_numbers.slot_number += 1;
227 next_item_numbers.batch_number += data_to_commit.batch_receipts.len() as u64;
228 next_item_numbers.tx_number += data_to_commit.num_txs as u64;
229 next_item_numbers.event_number += data_to_commit.num_events as u64;
230 item_numbers
231 };
233
234 let mut schema_batch = SchemaBatch::new();
235
236 let first_batch_number = current_item_numbers.batch_number;
237 let last_batch_number = first_batch_number + data_to_commit.batch_receipts.len() as u64;
238 for batch_receipt in data_to_commit.batch_receipts.into_iter() {
240 let first_tx_number = current_item_numbers.tx_number;
241 let last_tx_number = first_tx_number + batch_receipt.tx_receipts.len() as u64;
242 for tx in batch_receipt.tx_receipts.into_iter() {
244 let (tx_to_store, events) =
245 split_tx_for_storage(tx, current_item_numbers.event_number);
246 for event in events.into_iter() {
247 self.put_event(
248 &event,
249 &EventNumber(current_item_numbers.event_number),
250 TxNumber(current_item_numbers.tx_number),
251 &mut schema_batch,
252 )?;
253 current_item_numbers.event_number += 1;
254 }
255 self.put_transaction(
256 &tx_to_store,
257 &TxNumber(current_item_numbers.tx_number),
258 &mut schema_batch,
259 )?;
260 current_item_numbers.tx_number += 1;
261 }
262
263 let batch_to_store = StoredBatch {
265 hash: batch_receipt.batch_hash,
266 txs: TxNumber(first_tx_number)..TxNumber(last_tx_number),
267 custom_receipt: bincode::serialize(&batch_receipt.inner)
268 .expect("serialization to vec is infallible")
269 .into(),
270 };
271 self.put_batch(
272 &batch_to_store,
273 &BatchNumber(current_item_numbers.batch_number),
274 &mut schema_batch,
275 )?;
276 current_item_numbers.batch_number += 1;
277 }
278
279 let slot_to_store = StoredSlot {
281 hash: data_to_commit.slot_data.hash(),
282 extra_data: vec![].into(),
284 batches: BatchNumber(first_batch_number)..BatchNumber(last_batch_number),
285 };
286 self.put_slot(
287 &slot_to_store,
288 &SlotNumber(current_item_numbers.slot_number),
289 &mut schema_batch,
290 )?;
291
292 self.db.write_schemas(schema_batch)?;
293
294 let _ = self
296 .slot_subscriptions
297 .send(current_item_numbers.slot_number);
298
299 Ok(())
300 }
301
302 fn last_version_written<T: Schema<Key = U>, U: Into<u64>>(
303 db: &DB,
304 _schema: T,
305 ) -> anyhow::Result<Option<u64>> {
306 let mut iter = db.iter::<T>()?;
307 iter.seek_to_last();
308
309 match iter.next() {
310 Some(Ok((version, _))) => Ok(Some(version.into())),
311 Some(Err(e)) => Err(e),
312 _ => Ok(None),
313 }
314 }
315
316 pub fn get_head_slot(&self) -> anyhow::Result<Option<(SlotNumber, StoredSlot)>> {
318 let mut iter = self.db.iter::<SlotByNumber>()?;
319 iter.seek_to_last();
320
321 match iter.next() {
322 Some(Ok((slot_number, slot))) => Ok(Some((slot_number, slot))),
323 Some(Err(e)) => Err(e),
324 _ => Ok(None),
325 }
326 }
327}
328
329#[cfg(feature = "arbitrary")]
330pub mod arbitrary {
331 use core::ops::{Deref, DerefMut};
334
335 use proptest::strategy::LazyJust;
336 use tempfile::TempDir;
337
338 use super::*;
339
340 #[derive(Debug)]
345 pub struct ArbitraryLedgerDB {
346 pub db: LedgerDB,
348 pub path: TempDir,
350 }
351
352 #[derive(Debug)]
357 pub struct FallibleArbitraryLedgerDB {
358 pub result: anyhow::Result<ArbitraryLedgerDB>,
360 }
361
362 impl Deref for ArbitraryLedgerDB {
363 type Target = LedgerDB;
364
365 fn deref(&self) -> &Self::Target {
366 &self.db
367 }
368 }
369
370 impl DerefMut for ArbitraryLedgerDB {
371 fn deref_mut(&mut self) -> &mut Self::Target {
372 &mut self.db
373 }
374 }
375
376 impl<'a> ::arbitrary::Arbitrary<'a> for ArbitraryLedgerDB {
377 fn arbitrary(_u: &mut ::arbitrary::Unstructured<'a>) -> ::arbitrary::Result<Self> {
378 let path = TempDir::new().map_err(|_| ::arbitrary::Error::NotEnoughData)?;
379 let db = LedgerDB::with_path(&path).map_err(|_| ::arbitrary::Error::IncorrectFormat)?;
380 Ok(Self { db, path })
381 }
382 }
383
384 impl proptest::arbitrary::Arbitrary for FallibleArbitraryLedgerDB {
385 type Parameters = ();
386 type Strategy = LazyJust<Self, fn() -> FallibleArbitraryLedgerDB>;
387
388 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
389 fn gen() -> FallibleArbitraryLedgerDB {
390 FallibleArbitraryLedgerDB {
391 result: TempDir::new()
392 .map_err(|e| {
393 anyhow::anyhow!(format!("failed to generate path for LedgerDB: {e}"))
394 })
395 .and_then(|path| {
396 let db = LedgerDB::with_path(&path)?;
397 Ok(ArbitraryLedgerDB { db, path })
398 }),
399 }
400 }
401 LazyJust::new(gen)
402 }
403 }
404
405 impl<'a> ::arbitrary::Arbitrary<'a> for ItemNumbers {
406 fn arbitrary(u: &mut ::arbitrary::Unstructured<'a>) -> ::arbitrary::Result<Self> {
407 Ok(ItemNumbers {
408 slot_number: u.arbitrary()?,
409 batch_number: u.arbitrary()?,
410 tx_number: u.arbitrary()?,
411 event_number: u.arbitrary()?,
412 })
413 }
414 }
415}