d_engine_server/storage/adaptors/rocksdb/rocksdb_unified_engine.rs
1use std::path::Path;
2use std::sync::Arc;
3
4use d_engine_core::Error;
5use d_engine_core::StorageError;
6use rocksdb::Cache;
7use rocksdb::ColumnFamilyDescriptor;
8use rocksdb::DB;
9
10use super::LOG_CF;
11use super::META_CF;
12use super::RocksDBStateMachine;
13use super::RocksDBStorageEngine;
14use super::STATE_MACHINE_CF;
15use super::STATE_MACHINE_META_CF;
16
17/// Factory for unified single-DB storage (4 CFs, one `Arc<DB>`).
18///
19/// Opens a single RocksDB instance with all four column families and returns
20/// both a `RocksDBStorageEngine` and a `RocksDBStateMachine` sharing the same
21/// underlying `Arc<DB>`. This halves resource usage compared to the dual-instance
22/// setup: one 128 MB block cache, one set of background jobs, one file-descriptor
23/// budget.
24///
25/// # Usage
26///
27/// ```rust,ignore
28/// use d_engine_server::storage::RocksDBUnifiedEngine;
29///
30/// let (storage, sm) = RocksDBUnifiedEngine::open("/data/raft")?;
31/// ```
32pub struct RocksDBUnifiedEngine;
33
34impl RocksDBUnifiedEngine {
35 /// Opens (or creates) a unified RocksDB instance at `path`.
36 ///
37 /// Returns `(RocksDBStorageEngine, RocksDBStateMachine)` sharing one `Arc<DB>`.
38 pub fn open<P: AsRef<Path>>(
39 path: P
40 ) -> Result<(RocksDBStorageEngine, RocksDBStateMachine), Error> {
41 let db = Arc::new(Self::open_db(path.as_ref())?);
42
43 let storage = RocksDBStorageEngine::from_shared_db(Arc::clone(&db))?;
44 let sm = RocksDBStateMachine::from_shared_db(Arc::clone(&db))?;
45
46 Ok((storage, sm))
47 }
48
49 fn open_db(path: &Path) -> Result<DB, Error> {
50 let mut db_opts = super::base_db_options();
51
52 // One DB hosts 4 CFs across two workloads: reduce background job contention.
53 db_opts.set_max_background_jobs(2);
54 // Disable global write buffer cap so each CF controls its own flush lifecycle.
55 // Without this, RocksDB imposes a DB-wide limit that triggers simultaneous flushes
56 // across all CFs, causing all CFs to compete for the same background thread pool
57 // and producing write stalls under high-throughput embedded workloads.
58 db_opts.set_db_write_buffer_size(0);
59
60 // Single shared block cache for all CFs — one 128 MB budget, no per-CF duplication.
61 // Block cache sits in front of SST reads; hot data blocks stay in memory.
62 let block_cache = Cache::new_lru_cache(128 * 1024 * 1024);
63
64 // Raft log CF: sequential writes, range reads, prefix truncation
65 let log_cf = ColumnFamilyDescriptor::new(LOG_CF, super::log_cf_options(&block_cache));
66 // Raft meta CF: very-low-frequency point reads/writes (term, vote)
67 let meta_cf = ColumnFamilyDescriptor::new(META_CF, super::meta_cf_options(&block_cache));
68 // SM CF: high-frequency random reads/writes (user KV data)
69 let sm_cf =
70 ColumnFamilyDescriptor::new(STATE_MACHINE_CF, super::sm_cf_options(&block_cache));
71 // SM meta CF: point reads/writes (applied index, snapshot metadata)
72 let sm_meta_cf = ColumnFamilyDescriptor::new(
73 STATE_MACHINE_META_CF,
74 super::meta_cf_options(&block_cache),
75 );
76
77 DB::open_cf_descriptors(&db_opts, path, vec![log_cf, meta_cf, sm_cf, sm_meta_cf])
78 .map_err(|e| StorageError::DbError(e.to_string()).into())
79 }
80}