#![cfg(feature = "rocksdb-log-store")]
use std::sync::Arc;
use openraft::entry::RaftEntry;
use openraft::storage::{IOFlushed, RaftLogReader, RaftLogStorage};
use openraft::{Entry, LogId, Vote};
use rocksdb::{ColumnFamilyDescriptor, DB, Options};
use tempfile::TempDir;
use tsoracle_openraft_toolkit::{Flat, GroupPrefixed, KeySpace, MetaLabel, RocksdbLogStore};
mod common;
use common::{TestLeaderId, TestTypeConfig};
const LOG_CF: &str = "raft_log";
const META_CF: &str = "raft_meta";
fn open_db(dir: &TempDir) -> Arc<DB> {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let cfs = vec![
ColumnFamilyDescriptor::new(LOG_CF, Options::default()),
ColumnFamilyDescriptor::new(META_CF, Options::default()),
];
Arc::new(DB::open_cf_descriptors(&opts, dir.path(), cfs).unwrap())
}
#[tokio::test]
async fn opens_empty_store_without_error() {
let dir = TempDir::new().unwrap();
let db = open_db(&dir);
let store: RocksdbLogStore<TestTypeConfig, Flat> =
RocksdbLogStore::open(db, LOG_CF, META_CF, Flat).unwrap();
let rendered = format!("{store:?}");
assert!(rendered.contains("RocksdbLogStore"));
assert!(rendered.contains(LOG_CF));
assert!(rendered.contains(META_CF));
let cloned = store.clone();
assert_eq!(format!("{cloned:?}"), rendered);
}
#[test]
fn open_fails_when_log_cf_missing() {
let dir = TempDir::new().unwrap();
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let cfs = vec![ColumnFamilyDescriptor::new(META_CF, Options::default())];
let db = Arc::new(DB::open_cf_descriptors(&opts, dir.path(), cfs).unwrap());
let err = RocksdbLogStore::<TestTypeConfig, Flat>::open(db, LOG_CF, META_CF, Flat).unwrap_err();
assert!(
matches!(err, tsoracle_openraft_toolkit::RocksdbLogStoreError::MissingColumnFamily(ref s) if s == LOG_CF)
);
}
#[tokio::test]
async fn save_and_read_vote_roundtrips() {
let dir = TempDir::new().unwrap();
let db = open_db(&dir);
let mut store: RocksdbLogStore<TestTypeConfig, Flat> =
RocksdbLogStore::open(db, LOG_CF, META_CF, Flat).unwrap();
let vote: Vote<TestLeaderId> = Vote::new_committed(7, 3);
store.save_vote(&vote).await.unwrap();
let got = store.read_vote().await.unwrap();
assert_eq!(got, Some(vote));
}
#[tokio::test]
async fn empty_store_log_state_is_empty() {
let dir = TempDir::new().unwrap();
let db = open_db(&dir);
let mut store: RocksdbLogStore<TestTypeConfig, Flat> =
RocksdbLogStore::open(db, LOG_CF, META_CF, Flat).unwrap();
let state = store.get_log_state().await.unwrap();
assert!(state.last_purged_log_id.is_none());
assert!(state.last_log_id.is_none());
}
#[tokio::test]
async fn save_and_read_committed_roundtrips() {
let dir = TempDir::new().unwrap();
let db = open_db(&dir);
let mut store: RocksdbLogStore<TestTypeConfig, Flat> =
RocksdbLogStore::open(db, LOG_CF, META_CF, Flat).unwrap();
assert!(store.read_committed().await.unwrap().is_none());
let log_id: LogId<TestLeaderId> = LogId::new(
TestLeaderId {
term: 7,
node_id: 3,
},
2,
);
store.save_committed(Some(log_id)).await.unwrap();
assert_eq!(store.read_committed().await.unwrap(), Some(log_id));
}
#[tokio::test]
async fn save_committed_with_none_clears_existing_record() {
let dir = TempDir::new().unwrap();
let db = open_db(&dir);
let mut store: RocksdbLogStore<TestTypeConfig, Flat> =
RocksdbLogStore::open(db, LOG_CF, META_CF, Flat).unwrap();
let log_id: LogId<TestLeaderId> = LogId::new(
TestLeaderId {
term: 4,
node_id: 1,
},
9,
);
store.save_committed(Some(log_id)).await.unwrap();
assert_eq!(store.read_committed().await.unwrap(), Some(log_id));
store.save_committed(None).await.unwrap();
assert!(store.read_committed().await.unwrap().is_none());
}
#[tokio::test]
async fn read_vote_surfaces_decode_error_on_corrupted_meta() {
let dir = TempDir::new().unwrap();
let db = open_db(&dir);
let key = Flat.meta_key(MetaLabel::Vote);
let cf = db.cf_handle(META_CF).unwrap();
db.put_cf(&cf, &key, b"not a valid postcard-encoded vote")
.unwrap();
let mut store: RocksdbLogStore<TestTypeConfig, Flat> =
RocksdbLogStore::open(Arc::clone(&db), LOG_CF, META_CF, Flat).unwrap();
let err = store
.read_vote()
.await
.expect_err("read_vote should propagate the decode failure");
let _ = err.to_string();
}
#[tokio::test]
async fn get_log_state_isolates_groups_in_shared_column_family() {
let dir = TempDir::new().unwrap();
let db = open_db(&dir);
let mut store_g4: RocksdbLogStore<TestTypeConfig, GroupPrefixed> =
RocksdbLogStore::open(Arc::clone(&db), LOG_CF, META_CF, GroupPrefixed::new(4)).unwrap();
let entry_g4: Entry<TestLeaderId, common::TestAppData, u64, common::TestPeer> =
Entry::new_blank(LogId::new(
TestLeaderId {
term: 1,
node_id: 1,
},
100,
));
store_g4
.append(std::iter::once(entry_g4), IOFlushed::noop())
.await
.unwrap();
let mut store_g5: RocksdbLogStore<TestTypeConfig, GroupPrefixed> =
RocksdbLogStore::open(Arc::clone(&db), LOG_CF, META_CF, GroupPrefixed::new(5)).unwrap();
let state = store_g5.get_log_state().await.unwrap();
assert!(
state.last_log_id.is_none(),
"group 5 has no entries but get_log_state returned {:?}; \
last_log_id_in_cf leaked group 4's entry across the keyspace boundary",
state.last_log_id,
);
assert!(state.last_purged_log_id.is_none());
let state_g4 = store_g4.get_log_state().await.unwrap();
assert_eq!(
state_g4.last_log_id.as_ref().map(|id| id.index),
Some(100),
"group 4 should still observe its own entry",
);
}