sov_schema_db/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Adapted from aptos-core/schemadb
3
4#![forbid(unsafe_code)]
5#![deny(missing_docs)]
6
7//! This library implements a schematized DB on top of [RocksDB](https://rocksdb.org/). It makes
8//! sure all data passed in and out are structured according to predefined schemas and prevents
9//! access to raw keys and values. This library also enforces a set of specific DB options,
10//! like custom comparators and schema-to-column-family mapping.
11//!
12//! It requires that different kinds of key-value pairs be stored in separate column
13//! families.  To use this library to store a kind of key-value pairs, the user needs to use the
14//! [`define_schema!`] macro to define the schema name, the types of key and value, and name of the
15//! column family.
16
17mod iterator;
18mod metrics;
19pub mod schema;
20
21use std::collections::HashMap;
22use std::path::Path;
23use std::sync::Mutex;
24
25use anyhow::format_err;
26use iterator::ScanDirection;
27pub use iterator::{SchemaIterator, SeekKeyEncoder};
28use metrics::{
29    SCHEMADB_BATCH_COMMIT_BYTES, SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS,
30    SCHEMADB_BATCH_PUT_LATENCY_SECONDS, SCHEMADB_DELETES, SCHEMADB_GET_BYTES,
31    SCHEMADB_GET_LATENCY_SECONDS, SCHEMADB_PUT_BYTES,
32};
33use rocksdb::ReadOptions;
34pub use rocksdb::DEFAULT_COLUMN_FAMILY_NAME;
35use thiserror::Error;
36use tracing::info;
37
38pub use crate::schema::Schema;
39use crate::schema::{ColumnFamilyName, KeyCodec, ValueCodec};
40
41/// This DB is a schematized RocksDB wrapper where all data passed in and out are typed according to
42/// [`Schema`]s.
43#[derive(Debug)]
44pub struct DB {
45    name: &'static str, // for logging
46    inner: rocksdb::DB,
47}
48
49impl DB {
50    /// Opens a database backed by RocksDB, using the provided column family names and default
51    /// column family options.
52    pub fn open(
53        path: impl AsRef<Path>,
54        name: &'static str,
55        column_families: impl IntoIterator<Item = impl Into<String>>,
56        db_opts: &rocksdb::Options,
57    ) -> anyhow::Result<Self> {
58        let db = DB::open_with_cfds(
59            db_opts,
60            path,
61            name,
62            column_families.into_iter().map(|cf_name| {
63                let mut cf_opts = rocksdb::Options::default();
64                cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
65                rocksdb::ColumnFamilyDescriptor::new(cf_name, cf_opts)
66            }),
67        )?;
68        Ok(db)
69    }
70
71    /// Open RocksDB with the provided column family descriptors.
72    /// This allows to configure options for each column family.
73    pub fn open_with_cfds(
74        db_opts: &rocksdb::Options,
75        path: impl AsRef<Path>,
76        name: &'static str,
77        cfds: impl IntoIterator<Item = rocksdb::ColumnFamilyDescriptor>,
78    ) -> anyhow::Result<DB> {
79        let inner = rocksdb::DB::open_cf_descriptors(db_opts, path, cfds)?;
80        Ok(Self::log_construct(name, inner))
81    }
82
83    /// Open db in readonly mode. This db is completely static, so any writes that occur on the primary
84    /// after it has been opened will not be visible to the readonly instance.
85    pub fn open_cf_readonly(
86        opts: &rocksdb::Options,
87        path: impl AsRef<Path>,
88        name: &'static str,
89        cfs: Vec<ColumnFamilyName>,
90    ) -> anyhow::Result<DB> {
91        let error_if_log_file_exists = false;
92        let inner = rocksdb::DB::open_cf_for_read_only(opts, path, cfs, error_if_log_file_exists)?;
93
94        Ok(Self::log_construct(name, inner))
95    }
96
97    /// Open db in secondary mode. A secondary db is does not support writes, but can be dynamically caught up
98    /// to the primary instance by a manual call. See <https://github.com/facebook/rocksdb/wiki/Read-only-and-Secondary-instances>
99    /// for more details.
100    pub fn open_cf_as_secondary<P: AsRef<Path>>(
101        opts: &rocksdb::Options,
102        primary_path: P,
103        secondary_path: P,
104        name: &'static str,
105        cfs: Vec<ColumnFamilyName>,
106    ) -> anyhow::Result<DB> {
107        let inner = rocksdb::DB::open_cf_as_secondary(opts, primary_path, secondary_path, cfs)?;
108        Ok(Self::log_construct(name, inner))
109    }
110
111    fn log_construct(name: &'static str, inner: rocksdb::DB) -> DB {
112        info!(rocksdb_name = name, "Opened RocksDB.");
113        DB { name, inner }
114    }
115
116    /// Reads single record by key.
117    pub fn get<S: Schema>(
118        &self,
119        schema_key: &impl KeyCodec<S>,
120    ) -> anyhow::Result<Option<S::Value>> {
121        let _timer = SCHEMADB_GET_LATENCY_SECONDS
122            .with_label_values(&[S::COLUMN_FAMILY_NAME])
123            .start_timer();
124
125        let k = schema_key.encode_key()?;
126        let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
127
128        let result = self.inner.get_cf(cf_handle, k)?;
129        SCHEMADB_GET_BYTES
130            .with_label_values(&[S::COLUMN_FAMILY_NAME])
131            .observe(result.as_ref().map_or(0.0, |v| v.len() as f64));
132
133        result
134            .map(|raw_value| <S::Value as ValueCodec<S>>::decode_value(&raw_value))
135            .transpose()
136            .map_err(|err| err.into())
137    }
138
139    /// Writes single record.
140    pub fn put<S: Schema>(
141        &self,
142        key: &impl KeyCodec<S>,
143        value: &impl ValueCodec<S>,
144    ) -> anyhow::Result<()> {
145        // Not necessary to use a batch, but we'd like a central place to bump counters.
146        // Used in tests only anyway.
147        let batch = SchemaBatch::new();
148        batch.put::<S>(key, value)?;
149        self.write_schemas(batch)
150    }
151
152    fn iter_with_direction<S: Schema>(
153        &self,
154        opts: ReadOptions,
155        direction: ScanDirection,
156    ) -> anyhow::Result<SchemaIterator<S>> {
157        let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
158        Ok(SchemaIterator::new(
159            self.inner.raw_iterator_cf_opt(cf_handle, opts),
160            direction,
161        ))
162    }
163
164    /// Returns a forward [`SchemaIterator`] on a certain schema with the default read options.
165    pub fn iter<S: Schema>(&self) -> anyhow::Result<SchemaIterator<S>> {
166        self.iter_with_direction::<S>(Default::default(), ScanDirection::Forward)
167    }
168
169    /// Returns a forward [`SchemaIterator`] on a certain schema with the provided read options.
170    pub fn iter_with_opts<S: Schema>(
171        &self,
172        opts: ReadOptions,
173    ) -> anyhow::Result<SchemaIterator<S>> {
174        self.iter_with_direction::<S>(opts, ScanDirection::Forward)
175    }
176
177    /// Returns a backward [`SchemaIterator`] on a certain schema with the default read options.
178    pub fn rev_iter<S: Schema>(&self) -> anyhow::Result<SchemaIterator<S>> {
179        self.iter_with_direction::<S>(Default::default(), ScanDirection::Backward)
180    }
181
182    /// Returns a backward [`SchemaIterator`] on a certain schema with the provided read options.
183    pub fn rev_iter_with_opts<S: Schema>(
184        &self,
185        opts: ReadOptions,
186    ) -> anyhow::Result<SchemaIterator<S>> {
187        self.iter_with_direction::<S>(opts, ScanDirection::Backward)
188    }
189
190    /// Writes a group of records wrapped in a [`SchemaBatch`].
191    pub fn write_schemas(&self, batch: SchemaBatch) -> anyhow::Result<()> {
192        let _timer = SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS
193            .with_label_values(&[self.name])
194            .start_timer();
195        let rows_locked = batch.rows.lock().expect("Lock must not be poisoned");
196
197        let mut db_batch = rocksdb::WriteBatch::default();
198        for (cf_name, rows) in rows_locked.iter() {
199            let cf_handle = self.get_cf_handle(cf_name)?;
200            for write_op in rows {
201                match write_op {
202                    WriteOp::Value { key, value } => db_batch.put_cf(cf_handle, key, value),
203                    WriteOp::Deletion { key } => db_batch.delete_cf(cf_handle, key),
204                }
205            }
206        }
207        let serialized_size = db_batch.size_in_bytes();
208
209        self.inner.write_opt(db_batch, &default_write_options())?;
210
211        // Bump counters only after DB write succeeds.
212        for (cf_name, rows) in rows_locked.iter() {
213            for write_op in rows {
214                match write_op {
215                    WriteOp::Value { key, value } => {
216                        SCHEMADB_PUT_BYTES
217                            .with_label_values(&[cf_name])
218                            .observe((key.len() + value.len()) as f64);
219                    }
220                    WriteOp::Deletion { key: _ } => {
221                        SCHEMADB_DELETES.with_label_values(&[cf_name]).inc();
222                    }
223                }
224            }
225        }
226        SCHEMADB_BATCH_COMMIT_BYTES
227            .with_label_values(&[self.name])
228            .observe(serialized_size as f64);
229
230        Ok(())
231    }
232
233    fn get_cf_handle(&self, cf_name: &str) -> anyhow::Result<&rocksdb::ColumnFamily> {
234        self.inner.cf_handle(cf_name).ok_or_else(|| {
235            format_err!(
236                "DB::cf_handle not found for column family name: {}",
237                cf_name
238            )
239        })
240    }
241
242    /// Flushes [MemTable](https://github.com/facebook/rocksdb/wiki/MemTable) data.
243    /// This is only used for testing `get_approximate_sizes_cf` in unit tests.
244    pub fn flush_cf(&self, cf_name: &str) -> anyhow::Result<()> {
245        Ok(self.inner.flush_cf(self.get_cf_handle(cf_name)?)?)
246    }
247
248    /// Returns the current RocksDB property value for the provided column family name
249    /// and property name.
250    pub fn get_property(&self, cf_name: &str, property_name: &str) -> anyhow::Result<u64> {
251        self.inner
252            .property_int_value_cf(self.get_cf_handle(cf_name)?, property_name)?
253            .ok_or_else(|| {
254                format_err!(
255                    "Unable to get property \"{}\" of  column family \"{}\".",
256                    property_name,
257                    cf_name,
258                )
259            })
260    }
261
262    /// Creates new physical DB checkpoint in directory specified by `path`.
263    pub fn create_checkpoint<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
264        rocksdb::checkpoint::Checkpoint::new(&self.inner)?.create_checkpoint(path)?;
265        Ok(())
266    }
267}
268
269#[derive(Debug)]
270enum WriteOp {
271    Value { key: Vec<u8>, value: Vec<u8> },
272    Deletion { key: Vec<u8> },
273}
274
275/// [`SchemaBatch`] holds a collection of updates that can be applied to a DB
276/// ([`Schema`]) atomically. The updates will be applied in the order in which
277/// they are added to the [`SchemaBatch`].
278#[derive(Debug, Default)]
279pub struct SchemaBatch {
280    rows: Mutex<HashMap<ColumnFamilyName, Vec<WriteOp>>>,
281}
282
283impl SchemaBatch {
284    /// Creates an empty batch.
285    pub fn new() -> Self {
286        Self::default()
287    }
288
289    /// Adds an insert/update operation to the batch.
290    pub fn put<S: Schema>(
291        &self,
292        key: &impl KeyCodec<S>,
293        value: &impl ValueCodec<S>,
294    ) -> anyhow::Result<()> {
295        let _timer = SCHEMADB_BATCH_PUT_LATENCY_SECONDS
296            .with_label_values(&["unknown"])
297            .start_timer();
298        let key = key.encode_key()?;
299        let value = value.encode_value()?;
300        self.rows
301            .lock()
302            .expect("Lock must not be poisoned")
303            .entry(S::COLUMN_FAMILY_NAME)
304            .or_default()
305            .push(WriteOp::Value { key, value });
306
307        Ok(())
308    }
309
310    /// Adds a delete operation to the batch.
311    pub fn delete<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<()> {
312        let key = key.encode_key()?;
313        self.rows
314            .lock()
315            .expect("Lock must not be poisoned")
316            .entry(S::COLUMN_FAMILY_NAME)
317            .or_default()
318            .push(WriteOp::Deletion { key });
319
320        Ok(())
321    }
322}
323
324/// An error that occurred during (de)serialization of a [`Schema`]'s keys or
325/// values.
326#[derive(Error, Debug)]
327pub enum CodecError {
328    /// Unable to deserialize a key because it has a different length than
329    /// expected.
330    #[error("Invalid key length. Expected {expected:}, got {got:}")]
331    #[allow(missing_docs)] // The fields' names are self-explanatory.
332    InvalidKeyLength { expected: usize, got: usize },
333    /// Some other error occurred when (de)serializing a key or value. Inspect
334    /// the inner [`anyhow::Error`] for more details.
335    #[error(transparent)]
336    Wrapped(#[from] anyhow::Error),
337    /// I/O error.
338    #[error(transparent)]
339    Io(#[from] std::io::Error),
340}
341
342/// For now we always use synchronous writes. This makes sure that once the operation returns
343/// `Ok(())` the data is persisted even if the machine crashes. In the future we might consider
344/// selectively turning this off for some non-critical writes to improve performance.
345fn default_write_options() -> rocksdb::WriteOptions {
346    let mut opts = rocksdb::WriteOptions::default();
347    opts.set_sync(true);
348    opts
349}