1#![forbid(unsafe_code)]
5#![deny(missing_docs)]
6
7mod iterator;
18mod metrics;
19pub mod schema;
20
21use std::collections::HashMap;
22use std::path::Path;
23use std::sync::Mutex;
24
25use anyhow::format_err;
26use iterator::ScanDirection;
27pub use iterator::{SchemaIterator, SeekKeyEncoder};
28use metrics::{
29 SCHEMADB_BATCH_COMMIT_BYTES, SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS,
30 SCHEMADB_BATCH_PUT_LATENCY_SECONDS, SCHEMADB_DELETES, SCHEMADB_GET_BYTES,
31 SCHEMADB_GET_LATENCY_SECONDS, SCHEMADB_PUT_BYTES,
32};
33use rocksdb::ReadOptions;
34pub use rocksdb::DEFAULT_COLUMN_FAMILY_NAME;
35use thiserror::Error;
36use tracing::info;
37
38pub use crate::schema::Schema;
39use crate::schema::{ColumnFamilyName, KeyCodec, ValueCodec};
40
41#[derive(Debug)]
44pub struct DB {
45 name: &'static str, inner: rocksdb::DB,
47}
48
49impl DB {
50 pub fn open(
53 path: impl AsRef<Path>,
54 name: &'static str,
55 column_families: impl IntoIterator<Item = impl Into<String>>,
56 db_opts: &rocksdb::Options,
57 ) -> anyhow::Result<Self> {
58 let db = DB::open_with_cfds(
59 db_opts,
60 path,
61 name,
62 column_families.into_iter().map(|cf_name| {
63 let mut cf_opts = rocksdb::Options::default();
64 cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
65 rocksdb::ColumnFamilyDescriptor::new(cf_name, cf_opts)
66 }),
67 )?;
68 Ok(db)
69 }
70
71 pub fn open_with_cfds(
74 db_opts: &rocksdb::Options,
75 path: impl AsRef<Path>,
76 name: &'static str,
77 cfds: impl IntoIterator<Item = rocksdb::ColumnFamilyDescriptor>,
78 ) -> anyhow::Result<DB> {
79 let inner = rocksdb::DB::open_cf_descriptors(db_opts, path, cfds)?;
80 Ok(Self::log_construct(name, inner))
81 }
82
83 pub fn open_cf_readonly(
86 opts: &rocksdb::Options,
87 path: impl AsRef<Path>,
88 name: &'static str,
89 cfs: Vec<ColumnFamilyName>,
90 ) -> anyhow::Result<DB> {
91 let error_if_log_file_exists = false;
92 let inner = rocksdb::DB::open_cf_for_read_only(opts, path, cfs, error_if_log_file_exists)?;
93
94 Ok(Self::log_construct(name, inner))
95 }
96
97 pub fn open_cf_as_secondary<P: AsRef<Path>>(
101 opts: &rocksdb::Options,
102 primary_path: P,
103 secondary_path: P,
104 name: &'static str,
105 cfs: Vec<ColumnFamilyName>,
106 ) -> anyhow::Result<DB> {
107 let inner = rocksdb::DB::open_cf_as_secondary(opts, primary_path, secondary_path, cfs)?;
108 Ok(Self::log_construct(name, inner))
109 }
110
111 fn log_construct(name: &'static str, inner: rocksdb::DB) -> DB {
112 info!(rocksdb_name = name, "Opened RocksDB.");
113 DB { name, inner }
114 }
115
116 pub fn get<S: Schema>(
118 &self,
119 schema_key: &impl KeyCodec<S>,
120 ) -> anyhow::Result<Option<S::Value>> {
121 let _timer = SCHEMADB_GET_LATENCY_SECONDS
122 .with_label_values(&[S::COLUMN_FAMILY_NAME])
123 .start_timer();
124
125 let k = schema_key.encode_key()?;
126 let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
127
128 let result = self.inner.get_cf(cf_handle, k)?;
129 SCHEMADB_GET_BYTES
130 .with_label_values(&[S::COLUMN_FAMILY_NAME])
131 .observe(result.as_ref().map_or(0.0, |v| v.len() as f64));
132
133 result
134 .map(|raw_value| <S::Value as ValueCodec<S>>::decode_value(&raw_value))
135 .transpose()
136 .map_err(|err| err.into())
137 }
138
139 pub fn put<S: Schema>(
141 &self,
142 key: &impl KeyCodec<S>,
143 value: &impl ValueCodec<S>,
144 ) -> anyhow::Result<()> {
145 let batch = SchemaBatch::new();
148 batch.put::<S>(key, value)?;
149 self.write_schemas(batch)
150 }
151
152 fn iter_with_direction<S: Schema>(
153 &self,
154 opts: ReadOptions,
155 direction: ScanDirection,
156 ) -> anyhow::Result<SchemaIterator<S>> {
157 let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
158 Ok(SchemaIterator::new(
159 self.inner.raw_iterator_cf_opt(cf_handle, opts),
160 direction,
161 ))
162 }
163
164 pub fn iter<S: Schema>(&self) -> anyhow::Result<SchemaIterator<S>> {
166 self.iter_with_direction::<S>(Default::default(), ScanDirection::Forward)
167 }
168
169 pub fn iter_with_opts<S: Schema>(
171 &self,
172 opts: ReadOptions,
173 ) -> anyhow::Result<SchemaIterator<S>> {
174 self.iter_with_direction::<S>(opts, ScanDirection::Forward)
175 }
176
177 pub fn rev_iter<S: Schema>(&self) -> anyhow::Result<SchemaIterator<S>> {
179 self.iter_with_direction::<S>(Default::default(), ScanDirection::Backward)
180 }
181
182 pub fn rev_iter_with_opts<S: Schema>(
184 &self,
185 opts: ReadOptions,
186 ) -> anyhow::Result<SchemaIterator<S>> {
187 self.iter_with_direction::<S>(opts, ScanDirection::Backward)
188 }
189
190 pub fn write_schemas(&self, batch: SchemaBatch) -> anyhow::Result<()> {
192 let _timer = SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS
193 .with_label_values(&[self.name])
194 .start_timer();
195 let rows_locked = batch.rows.lock().expect("Lock must not be poisoned");
196
197 let mut db_batch = rocksdb::WriteBatch::default();
198 for (cf_name, rows) in rows_locked.iter() {
199 let cf_handle = self.get_cf_handle(cf_name)?;
200 for write_op in rows {
201 match write_op {
202 WriteOp::Value { key, value } => db_batch.put_cf(cf_handle, key, value),
203 WriteOp::Deletion { key } => db_batch.delete_cf(cf_handle, key),
204 }
205 }
206 }
207 let serialized_size = db_batch.size_in_bytes();
208
209 self.inner.write_opt(db_batch, &default_write_options())?;
210
211 for (cf_name, rows) in rows_locked.iter() {
213 for write_op in rows {
214 match write_op {
215 WriteOp::Value { key, value } => {
216 SCHEMADB_PUT_BYTES
217 .with_label_values(&[cf_name])
218 .observe((key.len() + value.len()) as f64);
219 }
220 WriteOp::Deletion { key: _ } => {
221 SCHEMADB_DELETES.with_label_values(&[cf_name]).inc();
222 }
223 }
224 }
225 }
226 SCHEMADB_BATCH_COMMIT_BYTES
227 .with_label_values(&[self.name])
228 .observe(serialized_size as f64);
229
230 Ok(())
231 }
232
233 fn get_cf_handle(&self, cf_name: &str) -> anyhow::Result<&rocksdb::ColumnFamily> {
234 self.inner.cf_handle(cf_name).ok_or_else(|| {
235 format_err!(
236 "DB::cf_handle not found for column family name: {}",
237 cf_name
238 )
239 })
240 }
241
242 pub fn flush_cf(&self, cf_name: &str) -> anyhow::Result<()> {
245 Ok(self.inner.flush_cf(self.get_cf_handle(cf_name)?)?)
246 }
247
248 pub fn get_property(&self, cf_name: &str, property_name: &str) -> anyhow::Result<u64> {
251 self.inner
252 .property_int_value_cf(self.get_cf_handle(cf_name)?, property_name)?
253 .ok_or_else(|| {
254 format_err!(
255 "Unable to get property \"{}\" of column family \"{}\".",
256 property_name,
257 cf_name,
258 )
259 })
260 }
261
262 pub fn create_checkpoint<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
264 rocksdb::checkpoint::Checkpoint::new(&self.inner)?.create_checkpoint(path)?;
265 Ok(())
266 }
267}
268
269#[derive(Debug)]
270enum WriteOp {
271 Value { key: Vec<u8>, value: Vec<u8> },
272 Deletion { key: Vec<u8> },
273}
274
275#[derive(Debug, Default)]
279pub struct SchemaBatch {
280 rows: Mutex<HashMap<ColumnFamilyName, Vec<WriteOp>>>,
281}
282
283impl SchemaBatch {
284 pub fn new() -> Self {
286 Self::default()
287 }
288
289 pub fn put<S: Schema>(
291 &self,
292 key: &impl KeyCodec<S>,
293 value: &impl ValueCodec<S>,
294 ) -> anyhow::Result<()> {
295 let _timer = SCHEMADB_BATCH_PUT_LATENCY_SECONDS
296 .with_label_values(&["unknown"])
297 .start_timer();
298 let key = key.encode_key()?;
299 let value = value.encode_value()?;
300 self.rows
301 .lock()
302 .expect("Lock must not be poisoned")
303 .entry(S::COLUMN_FAMILY_NAME)
304 .or_default()
305 .push(WriteOp::Value { key, value });
306
307 Ok(())
308 }
309
310 pub fn delete<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<()> {
312 let key = key.encode_key()?;
313 self.rows
314 .lock()
315 .expect("Lock must not be poisoned")
316 .entry(S::COLUMN_FAMILY_NAME)
317 .or_default()
318 .push(WriteOp::Deletion { key });
319
320 Ok(())
321 }
322}
323
324#[derive(Error, Debug)]
327pub enum CodecError {
328 #[error("Invalid key length. Expected {expected:}, got {got:}")]
331 #[allow(missing_docs)] InvalidKeyLength { expected: usize, got: usize },
333 #[error(transparent)]
336 Wrapped(#[from] anyhow::Error),
337 #[error(transparent)]
339 Io(#[from] std::io::Error),
340}
341
342fn default_write_options() -> rocksdb::WriteOptions {
346 let mut opts = rocksdb::WriteOptions::default();
347 opts.set_sync(true);
348 opts
349}