Skip to main content

d_engine_server/storage/adaptors/rocksdb/
rocksdb_unified_engine.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use d_engine_core::Error;
5use d_engine_core::StorageError;
6use rocksdb::Cache;
7use rocksdb::ColumnFamilyDescriptor;
8use rocksdb::DB;
9
10use super::LOG_CF;
11use super::META_CF;
12use super::RocksDBStateMachine;
13use super::RocksDBStorageEngine;
14use super::STATE_MACHINE_CF;
15use super::STATE_MACHINE_META_CF;
16
17/// Factory for unified single-DB storage (4 CFs, one `Arc<DB>`).
18///
19/// Opens a single RocksDB instance with all four column families and returns
20/// both a `RocksDBStorageEngine` and a `RocksDBStateMachine` sharing the same
21/// underlying `Arc<DB>`. This halves resource usage compared to the dual-instance
22/// setup: one 128 MB block cache, one set of background jobs, one file-descriptor
23/// budget.
24///
25/// # Usage
26///
27/// ```rust,ignore
28/// use d_engine_server::storage::RocksDBUnifiedEngine;
29///
30/// let (storage, sm) = RocksDBUnifiedEngine::open("/data/raft")?;
31/// ```
32pub struct RocksDBUnifiedEngine;
33
34impl RocksDBUnifiedEngine {
35    /// Opens (or creates) a unified RocksDB instance at `path`.
36    ///
37    /// Returns `(RocksDBStorageEngine, RocksDBStateMachine)` sharing one `Arc<DB>`.
38    pub fn open<P: AsRef<Path>>(
39        path: P
40    ) -> Result<(RocksDBStorageEngine, RocksDBStateMachine), Error> {
41        let db = Arc::new(Self::open_db(path.as_ref())?);
42
43        let storage = RocksDBStorageEngine::from_shared_db(Arc::clone(&db))?;
44        let sm = RocksDBStateMachine::from_shared_db(Arc::clone(&db))?;
45
46        Ok((storage, sm))
47    }
48
49    fn open_db(path: &Path) -> Result<DB, Error> {
50        let mut db_opts = super::base_db_options();
51
52        // One DB hosts 4 CFs across two workloads: reduce background job contention.
53        db_opts.set_max_background_jobs(2);
54        // Disable global write buffer cap so each CF controls its own flush lifecycle.
55        // Without this, RocksDB imposes a DB-wide limit that triggers simultaneous flushes
56        // across all CFs, causing all CFs to compete for the same background thread pool
57        // and producing write stalls under high-throughput embedded workloads.
58        db_opts.set_db_write_buffer_size(0);
59
60        // Single shared block cache for all CFs — one 128 MB budget, no per-CF duplication.
61        // Block cache sits in front of SST reads; hot data blocks stay in memory.
62        let block_cache = Cache::new_lru_cache(128 * 1024 * 1024);
63
64        // Raft log CF: sequential writes, range reads, prefix truncation
65        let log_cf = ColumnFamilyDescriptor::new(LOG_CF, super::log_cf_options(&block_cache));
66        // Raft meta CF: very-low-frequency point reads/writes (term, vote)
67        let meta_cf = ColumnFamilyDescriptor::new(META_CF, super::meta_cf_options(&block_cache));
68        // SM CF: high-frequency random reads/writes (user KV data)
69        let sm_cf =
70            ColumnFamilyDescriptor::new(STATE_MACHINE_CF, super::sm_cf_options(&block_cache));
71        // SM meta CF: point reads/writes (applied index, snapshot metadata)
72        let sm_meta_cf = ColumnFamilyDescriptor::new(
73            STATE_MACHINE_META_CF,
74            super::meta_cf_options(&block_cache),
75        );
76
77        DB::open_cf_descriptors(&db_opts, path, vec![log_cf, meta_cf, sm_cf, sm_meta_cf])
78            .map_err(|e| StorageError::DbError(e.to_string()).into())
79    }
80}