use openraft::RaftTypeConfig;
use openraft::type_config::alias::{LogIdOf, VoteOf};
use rocksdb::{BoundColumnFamily, DB, WriteBatch};
use std::io;
use std::sync::Arc;
use crate::codec_provider::LogStoreCodec;
use super::key_space::{KeySpace, MetaLabel};
use super::{decode_log_id_record, decode_vote_record, encode_log_id_record, encode_vote_record};
pub(super) fn read_vote<C, K, Codec>(
db: &DB,
cf: &Arc<BoundColumnFamily<'_>>,
keys: &K,
label: MetaLabel,
) -> io::Result<Option<VoteOf<C>>>
where
C: RaftTypeConfig,
K: KeySpace,
Codec: LogStoreCodec<C>,
{
let key = keys.meta_key(label);
match db.get_pinned_cf(cf, &key).map_err(io::Error::other)? {
Some(bytes) => Ok(Some(decode_vote_record::<C, Codec>(&bytes)?)),
None => Ok(None),
}
}
pub(super) fn read_log_id<C, K, Codec>(
db: &DB,
cf: &Arc<BoundColumnFamily<'_>>,
keys: &K,
label: MetaLabel,
) -> io::Result<Option<LogIdOf<C>>>
where
C: RaftTypeConfig,
K: KeySpace,
Codec: LogStoreCodec<C>,
{
let key = keys.meta_key(label);
match db.get_pinned_cf(cf, &key).map_err(io::Error::other)? {
Some(bytes) => Ok(Some(decode_log_id_record::<C, Codec>(&bytes)?)),
None => Ok(None),
}
}
pub(super) fn put_vote<C, K, Codec>(
batch: &mut WriteBatch,
cf: &Arc<BoundColumnFamily<'_>>,
keys: &K,
label: MetaLabel,
version: u8,
value: &VoteOf<C>,
) -> io::Result<()>
where
C: RaftTypeConfig,
K: KeySpace,
Codec: LogStoreCodec<C>,
{
let key = keys.meta_key(label);
let bytes = encode_vote_record::<C, Codec>(version, value)?;
batch.put_cf(cf, &key, &bytes);
Ok(())
}
pub(super) fn put_log_id<C, K, Codec>(
batch: &mut WriteBatch,
cf: &Arc<BoundColumnFamily<'_>>,
keys: &K,
label: MetaLabel,
version: u8,
value: &LogIdOf<C>,
) -> io::Result<()>
where
C: RaftTypeConfig,
K: KeySpace,
Codec: LogStoreCodec<C>,
{
let key = keys.meta_key(label);
let bytes = encode_log_id_record::<C, Codec>(version, value)?;
batch.put_cf(cf, &key, &bytes);
Ok(())
}
pub(super) fn delete<K: KeySpace>(
batch: &mut WriteBatch,
cf: &Arc<BoundColumnFamily<'_>>,
keys: &K,
label: MetaLabel,
) {
let key = keys.meta_key(label);
batch.delete_cf(cf, &key);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::declare_raft_types_ext;
use crate::log_store::{DefaultLogStoreCodec, Flat, KeySpace, MetaLabel};
use openraft::type_config::alias::VoteOf;
use rocksdb::{ColumnFamilyDescriptor, DB, Options};
use serde::{Deserialize, Serialize};
use tempfile::TempDir;
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct MetaPeer {
addr: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MetaData {
v: u64,
}
impl std::fmt::Display for MetaData {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MetaData({})", self.v)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MetaApplied;
declare_raft_types_ext! {
pub MetaConfig:
Node = MetaPeer,
AppData = MetaData,
AppDataResponse = MetaApplied,
SnapshotData = std::io::Cursor<Vec<u8>>,
}
#[test]
fn meta_vote_roundtrips_through_provider_with_version_byte() {
let dir = TempDir::new().unwrap();
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let cfs = vec![ColumnFamilyDescriptor::new("meta", Options::default())];
let db = DB::open_cf_descriptors(&opts, dir.path(), cfs).unwrap();
let keys = Flat;
let cf = db.cf_handle("meta").unwrap();
let vote: VoteOf<MetaConfig> = openraft::Vote::new_committed(7, 3);
let mut batch = rocksdb::WriteBatch::default();
put_vote::<MetaConfig, Flat, DefaultLogStoreCodec>(
&mut batch,
&cf,
&keys,
MetaLabel::Vote,
crate::codec::BASELINE_WRITE_VERSION,
&vote,
)
.unwrap();
db.write(batch).unwrap();
let raw = db
.get_cf(&cf, keys.meta_key(MetaLabel::Vote))
.unwrap()
.unwrap();
assert_eq!(raw[0], crate::codec::BASELINE_WRITE_VERSION);
let back: Option<VoteOf<MetaConfig>> =
read_vote::<MetaConfig, Flat, DefaultLogStoreCodec>(&db, &cf, &keys, MetaLabel::Vote)
.unwrap();
assert_eq!(back, Some(vote));
}
}