pub trait StateMachine:
Send
+ Sync
+ 'static {
Show 21 methods
// Required methods
fn start<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
fn stop(&self) -> Result<(), Error>;
fn is_running(&self) -> bool;
fn get(&self, key_buffer: &[u8]) -> Result<Option<Bytes>, Error>;
fn entry_term(&self, entry_id: u64) -> Option<u64>;
fn apply_chunk<'life0, 'async_trait>(
&'life0 self,
chunk: Vec<Entry>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
fn len(&self) -> usize;
fn update_last_applied(&self, last_applied: LogId);
fn last_applied(&self) -> LogId;
fn persist_last_applied(&self, last_applied: LogId) -> Result<(), Error>;
fn update_last_snapshot_metadata(
&self,
snapshot_metadata: &SnapshotMetadata,
) -> Result<(), Error>;
fn snapshot_metadata(&self) -> Option<SnapshotMetadata>;
fn persist_last_snapshot_metadata(
&self,
snapshot_metadata: &SnapshotMetadata,
) -> Result<(), Error>;
fn apply_snapshot_from_file<'life0, 'life1, 'async_trait>(
&'life0 self,
metadata: &'life1 SnapshotMetadata,
snapshot_path: PathBuf,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>
where 'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait;
fn generate_snapshot_data<'life0, 'async_trait>(
&'life0 self,
new_snapshot_dir: PathBuf,
last_included: LogId,
) -> Pin<Box<dyn Future<Output = Result<Bytes, Error>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
fn save_hard_state(&self) -> Result<(), Error>;
fn flush(&self) -> Result<(), Error>;
fn flush_async<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
fn reset<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait;
// Provided methods
fn is_empty(&self) -> bool { ... }
fn lease_background_cleanup<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<Bytes>, Error>> + Send + 'async_trait>>
where 'life0: 'async_trait,
Self: 'async_trait { ... }
}Expand description
State machine trait for Raft consensus
§Thread Safety Requirements
CRITICAL: Implementations MUST be thread-safe.
- Read methods (
get(),len()) may be called concurrently - Write methods should use internal synchronization
- No assumptions about caller’s threading model
Required Methods§
Sourcefn start<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn start<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Starts the state machine service.
This method:
- Flips internal state flags
- Loads persisted data (e.g., TTL state from disk)
Called once during node startup. Performance is not critical.
Sourcefn stop(&self) -> Result<(), Error>
fn stop(&self) -> Result<(), Error>
Stops the state machine service gracefully. This is typically a sync operation for state management.
Sourcefn is_running(&self) -> bool
fn is_running(&self) -> bool
Checks if the state machine is currently running. Sync operation as it just checks an atomic boolean.
Sourcefn get(&self, key_buffer: &[u8]) -> Result<Option<Bytes>, Error>
fn get(&self, key_buffer: &[u8]) -> Result<Option<Bytes>, Error>
Retrieves a value by key from the state machine. Sync operation as it accesses in-memory data structures.
Sourcefn entry_term(&self, entry_id: u64) -> Option<u64>
fn entry_term(&self, entry_id: u64) -> Option<u64>
Returns the term of a specific log entry by its ID. Sync operation as it queries in-memory data.
Sourcefn apply_chunk<'life0, 'async_trait>(
&'life0 self,
chunk: Vec<Entry>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn apply_chunk<'life0, 'async_trait>(
&'life0 self,
chunk: Vec<Entry>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Applies a chunk of log entries to the state machine. Async operation as it may involve disk I/O for persistence.
Sourcefn len(&self) -> usize
fn len(&self) -> usize
Returns the number of entries in the state machine. NOTE: This may be expensive for some implementations. Sync operation but should be used cautiously.
Sourcefn update_last_applied(&self, last_applied: LogId)
fn update_last_applied(&self, last_applied: LogId)
Updates the last applied index in memory. Sync operation as it just updates atomic variables.
Sourcefn last_applied(&self) -> LogId
fn last_applied(&self) -> LogId
Gets the last applied log index and term. Sync operation as it reads from atomic variables.
Sourcefn persist_last_applied(&self, last_applied: LogId) -> Result<(), Error>
fn persist_last_applied(&self, last_applied: LogId) -> Result<(), Error>
Persists the last applied index to durable storage. Should be async as it involves disk I/O.
Sourcefn update_last_snapshot_metadata(
&self,
snapshot_metadata: &SnapshotMetadata,
) -> Result<(), Error>
fn update_last_snapshot_metadata( &self, snapshot_metadata: &SnapshotMetadata, ) -> Result<(), Error>
Updates snapshot metadata in memory. Sync operation as it updates in-memory structures.
Sourcefn snapshot_metadata(&self) -> Option<SnapshotMetadata>
fn snapshot_metadata(&self) -> Option<SnapshotMetadata>
Retrieves the current snapshot metadata. Sync operation as it reads from in-memory structures.
Sourcefn persist_last_snapshot_metadata(
&self,
snapshot_metadata: &SnapshotMetadata,
) -> Result<(), Error>
fn persist_last_snapshot_metadata( &self, snapshot_metadata: &SnapshotMetadata, ) -> Result<(), Error>
Persists snapshot metadata to durable storage. Should be async as it involves disk I/O.
Sourcefn apply_snapshot_from_file<'life0, 'life1, 'async_trait>(
&'life0 self,
metadata: &'life1 SnapshotMetadata,
snapshot_path: PathBuf,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn apply_snapshot_from_file<'life0, 'life1, 'async_trait>(
&'life0 self,
metadata: &'life1 SnapshotMetadata,
snapshot_path: PathBuf,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Applies a snapshot received from the Raft leader to the local state machine
§Critical Security and Integrity Measures
- Checksum Validation: Verifies snapshot integrity before application
- Version Validation: Ensures snapshot is newer than current state
- Atomic Application: Uses locking to prevent concurrent modifications
- File Validation: Confirms compressed format before decompression
§Workflow
- Validate snapshot metadata and version
- Verify compressed file format
- Decompress to temporary directory
- Validate checklsum
- Initialize new state machine database
- Atomically replace current database
- Update Raft metadata and indexes
Sourcefn generate_snapshot_data<'life0, 'async_trait>(
&'life0 self,
new_snapshot_dir: PathBuf,
last_included: LogId,
) -> Pin<Box<dyn Future<Output = Result<Bytes, Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn generate_snapshot_data<'life0, 'async_trait>(
&'life0 self,
new_snapshot_dir: PathBuf,
last_included: LogId,
) -> Pin<Box<dyn Future<Output = Result<Bytes, Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Generates a snapshot of the state machine’s current key-value entries
up to the specified last_included_index.
This function:
- Creates a new database at
temp_snapshot_path. - Copies all key-value entries from the current state machine’s database where the key
(interpreted as a log index) does not exceed
last_included_index. - Uses batch writes for efficiency, committing every 100 records.
- Will update last_included_index and last_included_term in memory
- Will persist last_included_index and last_included_term into current database and new
database specified by
temp_snapshot_path
§Arguments
new_snapshot_dir- Temporary path to store the snapshot data.last_included_index- Last log index included in the snapshot.last_included_term- Last log term included in the snapshot.
§Returns
- if success, checksum will be returned
Sourcefn save_hard_state(&self) -> Result<(), Error>
fn save_hard_state(&self) -> Result<(), Error>
Saves the hard state of the state machine. Sync operation as it typically just delegates to other persistence methods.
Sourcefn flush(&self) -> Result<(), Error>
fn flush(&self) -> Result<(), Error>
Flushes any pending writes to durable storage. Sync operation that may block but provides a synchronous interface.
Provided Methods§
Sourcefn is_empty(&self) -> bool
fn is_empty(&self) -> bool
Checks if the state machine is empty. Sync operation that typically delegates to len().
Sourcefn lease_background_cleanup<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<Bytes>, Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn lease_background_cleanup<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<Bytes>, Error>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Background lease cleanup hook.
Called periodically by the background cleanup task (if enabled). Returns keys that were cleaned up.
§Default Implementation
No-op, suitable for state machines without lease support.
§Called By
Framework calls this from background cleanup task spawned in NodeBuilder::build(). Only called when cleanup_strategy = “background”.
§Returns
- Vec of deleted keys (for logging/metrics)
§Example (d-engine built-in state machines)
async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
if let Some(ref lease) = self.lease {
let expired_keys = lease.get_expired_keys(SystemTime::now());
if !expired_keys.is_empty() {
self.delete_batch(&expired_keys).await?;
}
return Ok(expired_keys);
}
Ok(vec![])
}