StateMachineHandler

Trait StateMachineHandler 

Source
pub trait StateMachineHandler<T>:
    Send
    + Sync
    + 'static
where T: TypeConfig,
{
Show 15 methods // Required methods fn last_applied(&self) -> u64; fn update_pending(&self, new_commit: u64); fn wait_applied<'life0, 'async_trait>( &'life0 self, target_index: u64, timeout: Duration, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn apply_chunk<'life0, 'async_trait>( &'life0 self, chunk: Vec<Entry>, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn read_from_state_machine( &self, keys: Vec<Bytes>, ) -> Option<Vec<ClientResult>>; fn apply_snapshot_stream_from_leader<'life0, 'life1, 'async_trait>( &'life0 self, current_term: u64, stream: Box<Streaming<SnapshotChunk>>, ack_tx: Sender<SnapshotAck>, config: &'life1 SnapshotConfig, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn should_snapshot(&self, new_commit_data: NewCommitData) -> bool; fn create_snapshot<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<(SnapshotMetadata, PathBuf)>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn cleanup_snapshot<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, before_version: u64, snapshot_dir: &'life1 Path, snapshot_dir_prefix: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn validate_purge_request<'life0, 'life1, 'async_trait>( &'life0 self, current_term: u64, leader_id: Option<u32>, req: &'life1 PurgeLogRequest, ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn handle_purge_request<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, current_term: u64, leader_id: Option<u32>, last_purged: Option<LogId>, req: &'life1 PurgeLogRequest, raft_log: &'life2 Arc<ROF<T>>, ) -> Pin<Box<dyn Future<Output = Result<PurgeLogResponse>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn get_latest_snapshot_metadata(&self) -> Option<SnapshotMetadata>; fn load_snapshot_data<'life0, 'async_trait>( &'life0 self, metadata: SnapshotMetadata, ) -> Pin<Box<dyn Future<Output = Result<BoxStream<'static, Result<SnapshotChunk>>>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn load_snapshot_chunk<'life0, 'life1, 'async_trait>( &'life0 self, metadata: &'life1 SnapshotMetadata, seq: u32, ) -> Pin<Box<dyn Future<Output = Result<SnapshotChunk>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn pending_range(&self) -> Option<RangeInclusive<u64>>;
}

Required Methods§

Source

fn last_applied(&self) -> u64

Source

fn update_pending(&self, new_commit: u64)

Updates the highest known committed log index that hasn’t been applied yet

Source

fn wait_applied<'life0, 'async_trait>( &'life0 self, target_index: u64, timeout: Duration, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Waits until the state machine has applied entries up to the target index. Returns error if timeout is reached before the target is applied.

This is used to ensure linearizable reads: after leader confirms a log entry is committed (via quorum), we must wait for the state machine to apply it before reading to guarantee the read reflects all committed writes.

Source

fn apply_chunk<'life0, 'async_trait>( &'life0 self, chunk: Vec<Entry>, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Applies a batch of committed log entries to the state machine

Source

fn read_from_state_machine(&self, keys: Vec<Bytes>) -> Option<Vec<ClientResult>>

Reads values from the state machine for given keys Returns None if any key doesn’t exist

Source

fn apply_snapshot_stream_from_leader<'life0, 'life1, 'async_trait>( &'life0 self, current_term: u64, stream: Box<Streaming<SnapshotChunk>>, ack_tx: Sender<SnapshotAck>, config: &'life1 SnapshotConfig, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Receives and installs a snapshot stream pushed by the leader Used when leader proactively sends snapshot updates to followers

Source

fn should_snapshot(&self, new_commit_data: NewCommitData) -> bool

Determines if a snapshot should be created based on new commit data

Source

fn create_snapshot<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<(SnapshotMetadata, PathBuf)>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Asynchronously creates a state machine snapshot with the following steps:

  1. Acquires a write lock to ensure exclusive access during snapshot creation
  2. Prepares temporary and final snapshot file paths using:
    • Last applied index/term from state machine
  3. Generates snapshot data to temporary file using state machine implementation
  4. Atomically renames temporary file to final snapshot file to ensure consistency
  5. Cleans up old snapshots based on last_included_index, retaining only the latest snapshot files as specified by cleanup_retain_count.

Returns new Snapshot metadata and final snapshot path to indicate the new snapshot file has been successfully created

Source

fn cleanup_snapshot<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, before_version: u64, snapshot_dir: &'life1 Path, snapshot_dir_prefix: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Cleans up old snapshots before specified version

Source

fn validate_purge_request<'life0, 'life1, 'async_trait>( &'life0 self, current_term: u64, leader_id: Option<u32>, req: &'life1 PurgeLogRequest, ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Validates if a log purge request from leader is authorized

Source

fn handle_purge_request<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, current_term: u64, leader_id: Option<u32>, last_purged: Option<LogId>, req: &'life1 PurgeLogRequest, raft_log: &'life2 Arc<ROF<T>>, ) -> Pin<Box<dyn Future<Output = Result<PurgeLogResponse>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Processes log purge requests (for non-leader nodes)

Source

fn get_latest_snapshot_metadata(&self) -> Option<SnapshotMetadata>

Retrieves metadata of the latest valid snapshot

Source

fn load_snapshot_data<'life0, 'async_trait>( &'life0 self, metadata: SnapshotMetadata, ) -> Pin<Box<dyn Future<Output = Result<BoxStream<'static, Result<SnapshotChunk>>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Loads snapshot data as a stream of chunks

Source

fn load_snapshot_chunk<'life0, 'life1, 'async_trait>( &'life0 self, metadata: &'life1 SnapshotMetadata, seq: u32, ) -> Pin<Box<dyn Future<Output = Result<SnapshotChunk>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Loads a specific chunk of snapshot data by sequence number

Source

fn pending_range(&self) -> Option<RangeInclusive<u64>>

Implementors§

Source§

impl<T> StateMachineHandler<T> for DefaultStateMachineHandler<T>
where T: TypeConfig,