pub struct BufferedRaftLog<T>where
T: TypeConfig,{ /* private fields */ }Expand description
High-performance buffered Raft log with event-driven architecture
This implementation provides in-memory first access with configurable persistence strategies while ensuring thread safety and avoiding deadlocks.
Key design principles:
- Lock-free reads for 99% of operations
- Event-driven asynchronous processing
- Deadlock prevention through proper error handling
- Memory-efficient batch operations
Implementations§
Source§impl<T> BufferedRaftLog<T>where
T: TypeConfig,
impl<T> BufferedRaftLog<T>where
T: TypeConfig,
pub fn new( node_id: u32, persistence_config: PersistenceConfig, storage: Arc<<T as TypeConfig>::SE>, ) -> (BufferedRaftLog<T>, UnboundedReceiver<IOTask>)
Sourcepub fn start(
self,
receiver: UnboundedReceiver<IOTask>,
log_flush_tx: Option<UnboundedSender<RoleEvent>>,
) -> Arc<BufferedRaftLog<T>>
pub fn start( self, receiver: UnboundedReceiver<IOTask>, log_flush_tx: Option<UnboundedSender<RoleEvent>>, ) -> Arc<BufferedRaftLog<T>>
Start the command processor and return an Arc-wrapped instance.
batch_processor runs on a dedicated OS thread (not a tokio worker) so that
synchronous RocksDB calls (db.write, flush_wal) never block the async runtime.
The thread owns its own new_current_thread runtime — fully independent of the
caller’s runtime lifecycle. When the caller’s runtime drops (e.g. at test teardown),
the IO thread’s timers and channels are unaffected; it exits cleanly on Shutdown.
Sourcepub fn remove_range(&self, range: RangeInclusive<u64>)
pub fn remove_range(&self, range: RangeInclusive<u64>)
Efficient range removal with targeted term index updates O(k + t) where k = number of entries removed, t = number of affected terms
Trait Implementations§
Source§impl<T> Debug for BufferedRaftLog<T>where
T: TypeConfig,
impl<T> Debug for BufferedRaftLog<T>where
T: TypeConfig,
Source§impl<T> Drop for BufferedRaftLog<T>where
T: TypeConfig,
impl<T> Drop for BufferedRaftLog<T>where
T: TypeConfig,
Source§impl<T> RaftLog for BufferedRaftLog<T>where
T: TypeConfig,
impl<T> RaftLog for BufferedRaftLog<T>where
T: TypeConfig,
Source§fn entry(&self, index: u64) -> Result<Option<Entry>, Error>
fn entry(&self, index: u64) -> Result<Option<Entry>, Error>
TODO: not considered the order of configured storage rule also should we remove Result<>?
Source§fn first_entry_id(&self) -> u64
fn first_entry_id(&self) -> u64
Source§fn last_entry_id(&self) -> u64
fn last_entry_id(&self) -> u64
Source§fn durable_index(&self) -> u64
fn durable_index(&self) -> u64
Source§fn last_log_id(&self) -> Option<LogId>
fn last_log_id(&self) -> Option<LogId>
Source§fn entry_term(&self, entry_id: u64) -> Option<u64>
fn entry_term(&self, entry_id: u64) -> Option<u64>
Source§fn first_index_for_term(&self, term: u64) -> Option<u64>
fn first_index_for_term(&self, term: u64) -> Option<u64>
Source§fn last_index_for_term(&self, term: u64) -> Option<u64>
fn last_index_for_term(&self, term: u64) -> Option<u64>
Source§fn pre_allocate_raft_logs_next_index(&self) -> u64
fn pre_allocate_raft_logs_next_index(&self) -> u64
Source§fn pre_allocate_id_range(&self, count: u64) -> RangeInclusive<u64>
fn pre_allocate_id_range(&self, count: u64) -> RangeInclusive<u64>
Source§fn get_entries_range(
&self,
range: RangeInclusive<u64>,
) -> Result<Vec<Entry>, Error>
fn get_entries_range( &self, range: RangeInclusive<u64>, ) -> Result<Vec<Entry>, Error>
Source§fn append_entries<'life0, 'async_trait>(
&'life0 self,
entries: Vec<Entry>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
BufferedRaftLog<T>: 'async_trait,
fn append_entries<'life0, 'async_trait>(
&'life0 self,
entries: Vec<Entry>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
BufferedRaftLog<T>: 'async_trait,
Source§fn insert_batch<'life0, 'async_trait>(
&'life0 self,
logs: Vec<Entry>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
BufferedRaftLog<T>: 'async_trait,
fn insert_batch<'life0, 'async_trait>(
&'life0 self,
logs: Vec<Entry>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
BufferedRaftLog<T>: 'async_trait,
Source§fn filter_out_conflicts_and_append<'life0, 'async_trait>(
&'life0 self,
prev_log_index: u64,
prev_log_term: u64,
new_entries: Vec<Entry>,
) -> Pin<Box<dyn Future<Output = Result<Option<LogId>, Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
BufferedRaftLog<T>: 'async_trait,
fn filter_out_conflicts_and_append<'life0, 'async_trait>(
&'life0 self,
prev_log_index: u64,
prev_log_term: u64,
new_entries: Vec<Entry>,
) -> Pin<Box<dyn Future<Output = Result<Option<LogId>, Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
BufferedRaftLog<T>: 'async_trait,
Source§fn calculate_majority_matched_index(
&self,
current_term: u64,
commit_index: u64,
peer_matched_ids: Vec<u64>,
) -> Option<u64>
fn calculate_majority_matched_index( &self, current_term: u64, commit_index: u64, peer_matched_ids: Vec<u64>, ) -> Option<u64>
Source§fn purge_logs_up_to<'life0, 'async_trait>(
&'life0 self,
cutoff_index: LogId,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
BufferedRaftLog<T>: 'async_trait,
fn purge_logs_up_to<'life0, 'async_trait>(
&'life0 self,
cutoff_index: LogId,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
BufferedRaftLog<T>: 'async_trait,
Source§fn flush<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
BufferedRaftLog<T>: 'async_trait,
fn flush<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
BufferedRaftLog<T>: 'async_trait,
Source§fn reset<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
BufferedRaftLog<T>: 'async_trait,
fn reset<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
BufferedRaftLog<T>: 'async_trait,
Source§fn load_hard_state(&self) -> Result<Option<HardState>, Error>
fn load_hard_state(&self) -> Result<Option<HardState>, Error>
Source§fn save_hard_state(&self, hard_state: &HardState) -> Result<(), Error>
fn save_hard_state(&self, hard_state: &HardState) -> Result<(), Error>
Source§fn close<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
'life0: 'async_trait,
BufferedRaftLog<T>: 'async_trait,
fn close<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
'life0: 'async_trait,
BufferedRaftLog<T>: 'async_trait,
Auto Trait Implementations§
impl<T> !Freeze for BufferedRaftLog<T>
impl<T> !RefUnwindSafe for BufferedRaftLog<T>
impl<T> Send for BufferedRaftLog<T>
impl<T> Sync for BufferedRaftLog<T>
impl<T> Unpin for BufferedRaftLog<T>
impl<T> UnsafeUnpin for BufferedRaftLog<T>
impl<T> !UnwindSafe for BufferedRaftLog<T>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request