use std::fmt::Debug;
use std::io::Cursor;
use std::ops::RangeBounds;
use std::path::Path;
use std::sync::Arc;
use openraft::log_id::RaftLogId;
use openraft::storage::{LogFlushed, LogState, RaftLogStorage, RaftStateMachine, Snapshot};
use openraft::{
Entry, EntryPayload, LogId, OptionalSend, RaftLogReader, RaftSnapshotBuilder, RaftTypeConfig,
SnapshotMeta, StorageError, StoredMembership, Vote,
};
use redb::{Database, ReadableTable, TableDefinition};
use tokio::sync::RwLock;
use tracing::{debug, warn};
use crate::types::NodeId;
const LOG_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("log_entries");
const META_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("meta");
const SNAPSHOT_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("snapshots");
const SM_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("state_machine");
const META_VOTE: &str = "vote";
const META_LAST_PURGED: &str = "last_purged";
const META_COMMITTED: &str = "committed";
const META_LAST_APPLIED: &str = "last_applied";
const META_LAST_MEMBERSHIP: &str = "last_membership";
const SM_STATE: &str = "state";
const SNAPSHOT_CURRENT: &str = "current";
const SNAPSHOT_META: &str = "meta";
fn redb_to_storage_err(
subject: openraft::ErrorSubject<NodeId>,
verb: openraft::ErrorVerb,
e: impl std::fmt::Display,
) -> StorageError<NodeId> {
StorageError::from_io_error(subject, verb, std::io::Error::other(e.to_string()))
}
pub struct RedbLogStore<C: RaftTypeConfig<NodeId = NodeId>> {
db: Arc<Database>,
_phantom: std::marker::PhantomData<C>,
}
impl<C: RaftTypeConfig<NodeId = NodeId>> RedbLogStore<C> {
#[allow(clippy::result_large_err)]
pub fn new(path: impl AsRef<Path>) -> Result<Self, crate::error::ConsensusError> {
let db = Database::create(path.as_ref()).map_err(|e| {
crate::error::ConsensusError::Redb(format!(
"Failed to open redb at {}: {}",
path.as_ref().display(),
e
))
})?;
let txn = db.begin_write().map_err(|e| {
crate::error::ConsensusError::Redb(format!("Failed to begin write txn: {e}"))
})?;
{
let _ = txn.open_table(LOG_TABLE).map_err(|e| {
crate::error::ConsensusError::Redb(format!("Failed to open log table: {e}"))
})?;
let _ = txn.open_table(META_TABLE).map_err(|e| {
crate::error::ConsensusError::Redb(format!("Failed to open meta table: {e}"))
})?;
}
txn.commit().map_err(|e| {
crate::error::ConsensusError::Redb(format!("Failed to commit init txn: {e}"))
})?;
Ok(Self {
db: Arc::new(db),
_phantom: std::marker::PhantomData,
})
}
#[must_use]
pub fn from_db(db: Arc<Database>) -> Self {
Self {
db,
_phantom: std::marker::PhantomData,
}
}
}
impl<C: RaftTypeConfig<NodeId = NodeId>> Clone for RedbLogStore<C> {
fn clone(&self) -> Self {
Self {
db: Arc::clone(&self.db),
_phantom: std::marker::PhantomData,
}
}
}
pub struct RedbLogReader<C: RaftTypeConfig<NodeId = NodeId>> {
db: Arc<Database>,
_phantom: std::marker::PhantomData<C>,
}
impl<C: RaftTypeConfig<NodeId = NodeId>> Clone for RedbLogReader<C> {
fn clone(&self) -> Self {
Self {
db: Arc::clone(&self.db),
_phantom: std::marker::PhantomData,
}
}
}
impl<C> RaftLogReader<C> for RedbLogReader<C>
where
C: RaftTypeConfig<NodeId = NodeId>,
C::Entry: serde::Serialize + serde::de::DeserializeOwned,
{
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<NodeId>> {
let db = &self.db;
let txn = db.begin_read().map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Read, e)
})?;
let table = txn.open_table(LOG_TABLE).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Read, e)
})?;
let mut entries = Vec::new();
for item in table.range(range).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Read, e)
})? {
let (_, value) = item.map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Read, e)
})?;
let entry: C::Entry = postcard2::from_bytes(value.value()).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Read, e)
})?;
entries.push(entry);
}
Ok(entries)
}
}
impl<C> RaftLogReader<C> for RedbLogStore<C>
where
C: RaftTypeConfig<NodeId = NodeId>,
C::Entry: serde::Serialize + serde::de::DeserializeOwned,
{
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<NodeId>> {
let mut reader: RedbLogReader<C> = RedbLogReader {
db: Arc::clone(&self.db),
_phantom: std::marker::PhantomData,
};
reader.try_get_log_entries(range).await
}
}
impl<C> RaftLogStorage<C> for RedbLogStore<C>
where
C: RaftTypeConfig<NodeId = NodeId>,
C::Entry: serde::Serialize + serde::de::DeserializeOwned + Clone,
{
type LogReader = RedbLogReader<C>;
async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<NodeId>> {
let txn = self.db.begin_read().map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Read, e)
})?;
let last_purged = {
let meta = txn.open_table(META_TABLE).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Read, e)
})?;
match meta.get(META_LAST_PURGED) {
Ok(Some(v)) => {
let id: LogId<NodeId> = postcard2::from_bytes(v.value()).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Logs,
openraft::ErrorVerb::Read,
e,
)
})?;
Some(id)
}
_ => None,
}
};
let last_log = {
let log = txn.open_table(LOG_TABLE).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Read, e)
})?;
let result = log.last().map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Read, e)
})?;
match result {
Some((_, v)) => {
let entry: C::Entry = postcard2::from_bytes(v.value()).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Logs,
openraft::ErrorVerb::Read,
e,
)
})?;
Some(*entry.get_log_id())
}
None => None,
}
};
Ok(LogState {
last_purged_log_id: last_purged,
last_log_id: last_log,
})
}
async fn get_log_reader(&mut self) -> Self::LogReader {
RedbLogReader {
db: Arc::clone(&self.db),
_phantom: std::marker::PhantomData,
}
}
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<NodeId>> {
let bytes = postcard2::to_vec(vote).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Vote, openraft::ErrorVerb::Write, e)
})?;
let txn = self.db.begin_write().map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Vote, openraft::ErrorVerb::Write, e)
})?;
{
let mut meta = txn.open_table(META_TABLE).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Vote, openraft::ErrorVerb::Write, e)
})?;
meta.insert(META_VOTE, bytes.as_slice()).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Vote, openraft::ErrorVerb::Write, e)
})?;
}
txn.commit().map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Vote, openraft::ErrorVerb::Write, e)
})?;
Ok(())
}
async fn read_vote(&mut self) -> Result<Option<Vote<NodeId>>, StorageError<NodeId>> {
let txn = self.db.begin_read().map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Vote, openraft::ErrorVerb::Read, e)
})?;
let meta = txn.open_table(META_TABLE).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Vote, openraft::ErrorVerb::Read, e)
})?;
match meta.get(META_VOTE) {
Ok(Some(v)) => {
let vote: Vote<NodeId> = postcard2::from_bytes(v.value()).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Vote, openraft::ErrorVerb::Read, e)
})?;
Ok(Some(vote))
}
Ok(None) => Ok(None),
Err(e) => Err(redb_to_storage_err(
openraft::ErrorSubject::Vote,
openraft::ErrorVerb::Read,
e,
)),
}
}
async fn save_committed(
&mut self,
committed: Option<LogId<NodeId>>,
) -> Result<(), StorageError<NodeId>> {
let txn = self.db.begin_write().map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
{
let mut meta = txn.open_table(META_TABLE).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
match committed {
Some(ref id) => {
let bytes = postcard2::to_vec(id).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Logs,
openraft::ErrorVerb::Write,
e,
)
})?;
meta.insert(META_COMMITTED, bytes.as_slice()).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Logs,
openraft::ErrorVerb::Write,
e,
)
})?;
}
None => {
let _ = meta.remove(META_COMMITTED);
}
}
}
txn.commit().map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
Ok(())
}
async fn read_committed(&mut self) -> Result<Option<LogId<NodeId>>, StorageError<NodeId>> {
let txn = self.db.begin_read().map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Read, e)
})?;
let meta = txn.open_table(META_TABLE).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Read, e)
})?;
match meta.get(META_COMMITTED) {
Ok(Some(v)) => {
let id: LogId<NodeId> = postcard2::from_bytes(v.value()).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Read, e)
})?;
Ok(Some(id))
}
Ok(None) => Ok(None),
Err(e) => Err(redb_to_storage_err(
openraft::ErrorSubject::Logs,
openraft::ErrorVerb::Read,
e,
)),
}
}
async fn append<I>(
&mut self,
entries: I,
callback: LogFlushed<C>,
) -> Result<(), StorageError<NodeId>>
where
I: IntoIterator<Item = C::Entry> + OptionalSend,
I::IntoIter: OptionalSend,
{
let txn = self.db.begin_write().map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
{
let mut table = txn.open_table(LOG_TABLE).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
for entry in entries {
let idx = entry.get_log_id().index;
let bytes = postcard2::to_vec(&entry).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
table.insert(idx, bytes.as_slice()).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
}
}
txn.commit().map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
callback.log_io_completed(Ok(()));
Ok(())
}
async fn truncate(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<NodeId>> {
let txn = self.db.begin_write().map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
{
let mut table = txn.open_table(LOG_TABLE).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
let keys: Vec<u64> = {
let mut keys = Vec::new();
for item in table.range(log_id.index..).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})? {
let (k, _) = item.map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Logs,
openraft::ErrorVerb::Write,
e,
)
})?;
keys.push(k.value());
}
keys
};
for key in keys {
table.remove(key).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
}
}
txn.commit().map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
Ok(())
}
async fn purge(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<NodeId>> {
let txn = self.db.begin_write().map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
{
let mut table = txn.open_table(LOG_TABLE).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
let keys: Vec<u64> = {
let mut keys = Vec::new();
for item in table.range(..=log_id.index).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})? {
let (k, _) = item.map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Logs,
openraft::ErrorVerb::Write,
e,
)
})?;
keys.push(k.value());
}
keys
};
for key in keys {
table.remove(key).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
}
let mut meta = txn.open_table(META_TABLE).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
let bytes = postcard2::to_vec(&log_id).map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
meta.insert(META_LAST_PURGED, bytes.as_slice())
.map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
}
txn.commit().map_err(|e| {
redb_to_storage_err(openraft::ErrorSubject::Logs, openraft::ErrorVerb::Write, e)
})?;
Ok(())
}
}
pub struct RedbStateMachine<C, S, F>
where
C: RaftTypeConfig<NodeId = NodeId>,
S: Default + Clone + serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
F: Fn(&mut S, &C::D) -> C::R + Send + Sync + 'static,
{
db: Arc<Database>,
sm: Arc<RwLock<RedbSmCache<C, S>>>,
apply_fn: Arc<F>,
}
pub struct RedbSmCache<C: RaftTypeConfig<NodeId = NodeId>, S> {
pub last_applied_log: Option<LogId<NodeId>>,
pub last_membership: StoredMembership<NodeId, C::Node>,
pub state: S,
}
impl<C: RaftTypeConfig<NodeId = NodeId>, S: Default> Default for RedbSmCache<C, S> {
fn default() -> Self {
Self {
last_applied_log: None,
last_membership: StoredMembership::default(),
state: S::default(),
}
}
}
impl<C, S, F> RedbStateMachine<C, S, F>
where
C: RaftTypeConfig<NodeId = NodeId>,
S: Default + Clone + serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
F: Fn(&mut S, &C::D) -> C::R + Send + Sync + 'static,
{
#[allow(clippy::result_large_err)]
pub fn new(path: impl AsRef<Path>, apply_fn: F) -> Result<Self, crate::error::ConsensusError> {
let db = Database::create(path.as_ref()).map_err(|e| {
crate::error::ConsensusError::Redb(format!(
"Failed to open redb SM at {}: {}",
path.as_ref().display(),
e
))
})?;
let txn = db.begin_write().map_err(|e| {
crate::error::ConsensusError::Redb(format!("Failed to begin write txn: {e}"))
})?;
{
let _ = txn.open_table(SM_TABLE).map_err(|e| {
crate::error::ConsensusError::Redb(format!("Failed to open SM table: {e}"))
})?;
let _ = txn.open_table(SNAPSHOT_TABLE).map_err(|e| {
crate::error::ConsensusError::Redb(format!("Failed to open snapshot table: {e}"))
})?;
let _ = txn.open_table(META_TABLE).map_err(|e| {
crate::error::ConsensusError::Redb(format!("Failed to open meta table: {e}"))
})?;
}
txn.commit().map_err(|e| {
crate::error::ConsensusError::Redb(format!("Failed to commit init txn: {e}"))
})?;
let db = Arc::new(db);
let cache = Self::load_cache(&db);
Ok(Self {
db,
sm: Arc::new(RwLock::new(cache)),
apply_fn: Arc::new(apply_fn),
})
}
pub fn from_db(db: Arc<Database>, apply_fn: F) -> Self {
let cache = Self::load_cache(&db);
Self {
db,
sm: Arc::new(RwLock::new(cache)),
apply_fn: Arc::new(apply_fn),
}
}
#[must_use]
pub fn state(&self) -> Arc<RwLock<RedbSmCache<C, S>>> {
Arc::clone(&self.sm)
}
fn load_cache(db: &Database) -> RedbSmCache<C, S> {
let Ok(txn) = db.begin_read() else {
warn!("Failed to read SM cache from redb");
return RedbSmCache::default();
};
let state = {
let Ok(table) = txn.open_table(SM_TABLE) else {
return RedbSmCache::default();
};
match table.get(SM_STATE) {
Ok(Some(v)) => postcard2::from_bytes(v.value()).unwrap_or_default(),
_ => S::default(),
}
};
let last_applied_log = {
let Ok(meta) = txn.open_table(META_TABLE) else {
return RedbSmCache::default();
};
match meta.get(META_LAST_APPLIED) {
Ok(Some(v)) => postcard2::from_bytes(v.value()).ok(),
_ => None,
}
};
let last_membership = {
let Ok(meta) = txn.open_table(META_TABLE) else {
return RedbSmCache::default();
};
match meta.get(META_LAST_MEMBERSHIP) {
Ok(Some(v)) => postcard2::from_bytes(v.value()).unwrap_or_default(),
_ => StoredMembership::default(),
}
};
RedbSmCache {
last_applied_log,
last_membership,
state,
}
}
}
impl<C, S, F> Clone for RedbStateMachine<C, S, F>
where
C: RaftTypeConfig<NodeId = NodeId>,
S: Default + Clone + serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
F: Fn(&mut S, &C::D) -> C::R + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
db: Arc::clone(&self.db),
sm: Arc::clone(&self.sm),
apply_fn: Arc::clone(&self.apply_fn),
}
}
}
pub struct RedbSnapshotBuilder<C, S>
where
C: RaftTypeConfig<NodeId = NodeId>,
{
sm: Arc<RwLock<RedbSmCache<C, S>>>,
}
impl<C, S> RaftSnapshotBuilder<C> for RedbSnapshotBuilder<C, S>
where
C: RaftTypeConfig<NodeId = NodeId, SnapshotData = Cursor<Vec<u8>>>,
S: Default + Clone + serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
{
async fn build_snapshot(&mut self) -> Result<Snapshot<C>, StorageError<NodeId>> {
let sm = self.sm.read().await;
let data = postcard2::to_vec(&sm.state).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::StateMachine,
openraft::ErrorVerb::Read,
e,
)
})?;
let snapshot_id = if let Some(ref last) = sm.last_applied_log {
format!("{}-{}", last.leader_id, last.index)
} else {
"0-0".to_string()
};
let meta = SnapshotMeta {
last_log_id: sm.last_applied_log,
last_membership: sm.last_membership.clone(),
snapshot_id,
};
debug!(
last_log_id = ?meta.last_log_id,
"Built redb snapshot"
);
Ok(Snapshot {
meta,
snapshot: Box::new(Cursor::new(data)),
})
}
}
impl<C, S, F> RaftStateMachine<C> for RedbStateMachine<C, S, F>
where
C: RaftTypeConfig<NodeId = NodeId, SnapshotData = Cursor<Vec<u8>>, Entry = Entry<C>>,
C::D: Clone + serde::Serialize + serde::de::DeserializeOwned,
C::R: Default,
S: Default + Clone + serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
F: Fn(&mut S, &C::D) -> C::R + Send + Sync + 'static,
{
type SnapshotBuilder = RedbSnapshotBuilder<C, S>;
async fn applied_state(
&mut self,
) -> Result<(Option<LogId<NodeId>>, StoredMembership<NodeId, C::Node>), StorageError<NodeId>>
{
let sm = self.sm.read().await;
Ok((sm.last_applied_log, sm.last_membership.clone()))
}
#[allow(clippy::too_many_lines)]
async fn apply<I>(&mut self, entries: I) -> Result<Vec<C::R>, StorageError<NodeId>>
where
I: IntoIterator<Item = C::Entry> + OptionalSend,
I::IntoIter: OptionalSend,
{
let mut sm = self.sm.write().await;
let mut responses = Vec::new();
for entry in entries {
sm.last_applied_log = Some(entry.log_id);
match entry.payload {
EntryPayload::Normal(ref data) => {
let resp = (self.apply_fn)(&mut sm.state, data);
responses.push(resp);
}
EntryPayload::Membership(ref mem) => {
sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone());
responses.push(C::R::default());
}
EntryPayload::Blank => {
responses.push(C::R::default());
}
}
}
let txn = self.db.begin_write().map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::StateMachine,
openraft::ErrorVerb::Write,
e,
)
})?;
{
let mut sm_table = txn.open_table(SM_TABLE).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::StateMachine,
openraft::ErrorVerb::Write,
e,
)
})?;
let state_bytes = postcard2::to_vec(&sm.state).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::StateMachine,
openraft::ErrorVerb::Write,
e,
)
})?;
sm_table
.insert(SM_STATE, state_bytes.as_slice())
.map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::StateMachine,
openraft::ErrorVerb::Write,
e,
)
})?;
let mut meta_table = txn.open_table(META_TABLE).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::StateMachine,
openraft::ErrorVerb::Write,
e,
)
})?;
if let Some(ref id) = sm.last_applied_log {
let bytes = postcard2::to_vec(id).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::StateMachine,
openraft::ErrorVerb::Write,
e,
)
})?;
meta_table
.insert(META_LAST_APPLIED, bytes.as_slice())
.map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::StateMachine,
openraft::ErrorVerb::Write,
e,
)
})?;
}
let membership_bytes = postcard2::to_vec(&sm.last_membership).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::StateMachine,
openraft::ErrorVerb::Write,
e,
)
})?;
meta_table
.insert(META_LAST_MEMBERSHIP, membership_bytes.as_slice())
.map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::StateMachine,
openraft::ErrorVerb::Write,
e,
)
})?;
}
txn.commit().map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::StateMachine,
openraft::ErrorVerb::Write,
e,
)
})?;
Ok(responses)
}
async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder {
RedbSnapshotBuilder {
sm: Arc::clone(&self.sm),
}
}
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<Cursor<Vec<u8>>>, StorageError<NodeId>> {
Ok(Box::new(Cursor::new(Vec::new())))
}
#[allow(clippy::too_many_lines)]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, C::Node>,
snapshot: Box<Cursor<Vec<u8>>>,
) -> Result<(), StorageError<NodeId>> {
let data = snapshot.into_inner();
let state: S = postcard2::from_bytes(&data).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Read,
e,
)
})?;
let mut sm = self.sm.write().await;
sm.last_applied_log = meta.last_log_id;
sm.last_membership = meta.last_membership.clone();
sm.state = state;
let txn = self.db.begin_write().map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Write,
e,
)
})?;
{
let mut sm_table = txn.open_table(SM_TABLE).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Write,
e,
)
})?;
let state_bytes = postcard2::to_vec(&sm.state).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Write,
e,
)
})?;
sm_table
.insert(SM_STATE, state_bytes.as_slice())
.map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Write,
e,
)
})?;
let mut snap_table = txn.open_table(SNAPSHOT_TABLE).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Write,
e,
)
})?;
snap_table
.insert(SNAPSHOT_CURRENT, data.as_slice())
.map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Write,
e,
)
})?;
let meta_bytes = postcard2::to_vec(meta).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Write,
e,
)
})?;
snap_table
.insert(SNAPSHOT_META, meta_bytes.as_slice())
.map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Write,
e,
)
})?;
let mut meta_table = txn.open_table(META_TABLE).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Write,
e,
)
})?;
if let Some(ref id) = sm.last_applied_log {
let bytes = postcard2::to_vec(id).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Write,
e,
)
})?;
meta_table
.insert(META_LAST_APPLIED, bytes.as_slice())
.map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Write,
e,
)
})?;
}
let membership_bytes = postcard2::to_vec(&sm.last_membership).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Write,
e,
)
})?;
meta_table
.insert(META_LAST_MEMBERSHIP, membership_bytes.as_slice())
.map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Write,
e,
)
})?;
}
txn.commit().map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Write,
e,
)
})?;
debug!(
last_log_id = ?meta.last_log_id,
"Installed snapshot into redb state machine"
);
Ok(())
}
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<C>>, StorageError<NodeId>> {
let txn = self.db.begin_read().map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Read,
e,
)
})?;
let Ok(table) = txn.open_table(SNAPSHOT_TABLE) else {
return Ok(None);
};
let data = match table.get(SNAPSHOT_CURRENT) {
Ok(Some(v)) => v.value().to_vec(),
_ => return Ok(None),
};
let meta_bytes = match table.get(SNAPSHOT_META) {
Ok(Some(v)) => v.value().to_vec(),
_ => return Ok(None),
};
let meta: SnapshotMeta<NodeId, C::Node> =
postcard2::from_bytes(&meta_bytes).map_err(|e| {
redb_to_storage_err(
openraft::ErrorSubject::Snapshot(None),
openraft::ErrorVerb::Read,
e,
)
})?;
Ok(Some(Snapshot {
meta,
snapshot: Box::new(Cursor::new(data)),
}))
}
}