Skip to main content

d_engine_core/storage/
state_machine.rs

1//! State machine trait for Raft consensus.
2//!
3//! See the [server customization guide](https://github.com/deventlab/d-engine/blob/master/d-engine/src/docs/server_guide/customize-state-machine.md) for details.
4
5use bytes::Bytes;
6use d_engine_proto::common::Entry;
7use d_engine_proto::common::LogId;
8use d_engine_proto::server::storage::SnapshotMetadata;
9#[cfg(any(test, feature = "__test_support"))]
10use mockall::automock;
11use tonic::async_trait;
12
13use crate::Error;
14
15/// Result of applying a single log entry to the state machine
16///
17/// Returned by `StateMachine::apply_chunk()` for each entry in the chunk.
18/// Enables operations like CAS to communicate their execution result back to clients.
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct ApplyResult {
21    /// Log index of the applied entry
22    pub index: u64,
23
24    /// Whether the operation succeeded
25    ///
26    /// Semantics by operation type:
27    /// - CAS: `true` if compare succeeded and value was updated, `false` otherwise
28    /// - PUT/DELETE: always `true` (failures return `Err` from `apply_chunk`)
29    pub succeeded: bool,
30}
31
32impl ApplyResult {
33    /// Create a successful result for the given index
34    pub fn success(index: u64) -> Self {
35        Self {
36            index,
37            succeeded: true,
38        }
39    }
40
41    /// Create a failed result for the given index
42    pub fn failure(index: u64) -> Self {
43        Self {
44            index,
45            succeeded: false,
46        }
47    }
48}
49
50/// State machine trait for Raft consensus
51///
52/// # Thread Safety Requirements
53///
54/// **CRITICAL**: Implementations MUST be thread-safe.
55///
56/// - Read methods (`get()`, `len()`) may be called concurrently
57/// - Write methods should use internal synchronization
58/// - No assumptions about caller's threading model
59#[cfg_attr(any(test, feature = "__test_support"), automock)]
60#[async_trait]
61pub trait StateMachine: Send + Sync + 'static {
62    /// Starts the state machine service.
63    ///
64    /// This method:
65    /// 1. Flips internal state flags
66    /// 2. Loads persisted data (e.g., TTL state from disk)
67    ///
68    /// Called once during node startup. Performance is not critical.
69    async fn start(&self) -> Result<(), Error>;
70
71    /// Stops the state machine service gracefully.
72    /// This is typically a sync operation for state management.
73    fn stop(&self) -> Result<(), Error>;
74
75    /// Checks if the state machine is currently running.
76    /// Sync operation as it just checks an atomic boolean.
77    fn is_running(&self) -> bool;
78
79    /// Retrieves a value by key from the state machine.
80    /// Sync operation as it accesses in-memory data structures.
81    fn get(
82        &self,
83        key_buffer: &[u8],
84    ) -> Result<Option<Bytes>, Error>;
85
86    /// Returns the term of a specific log entry by its ID.
87    /// Sync operation as it queries in-memory data.
88    fn entry_term(
89        &self,
90        entry_id: u64,
91    ) -> Option<u64>;
92
93    /// Applies a chunk of log entries to the state machine.
94    ///
95    /// Returns a result for each entry in the same order as the input chunk.
96    /// The returned vector MUST have the same length as the input chunk.
97    ///
98    /// # Performance Note
99    /// This is a hot path - implementations should minimize allocations.
100    /// Pre-allocate the result vector with `Vec::with_capacity(chunk.len())`.
101    async fn apply_chunk(
102        &self,
103        chunk: Vec<Entry>,
104    ) -> Result<Vec<ApplyResult>, Error>;
105
106    /// Returns the number of entries in the state machine.
107    /// NOTE: This may be expensive for some implementations.
108    /// Sync operation but should be used cautiously.
109    fn len(&self) -> usize;
110
111    /// Checks if the state machine is empty.
112    /// Sync operation that typically delegates to len().
113    fn is_empty(&self) -> bool {
114        self.len() == 0
115    }
116
117    /// Updates the last applied index in memory.
118    /// Sync operation as it just updates atomic variables.
119    fn update_last_applied(
120        &self,
121        last_applied: LogId,
122    );
123
124    /// Gets the last applied log index and term.
125    /// Sync operation as it reads from atomic variables.
126    fn last_applied(&self) -> LogId;
127
128    /// Persists the last applied index to durable storage.
129    /// Should be async as it involves disk I/O.
130    fn persist_last_applied(
131        &self,
132        last_applied: LogId,
133    ) -> Result<(), Error>;
134
135    /// Updates snapshot metadata in memory.
136    /// Sync operation as it updates in-memory structures.
137    fn update_last_snapshot_metadata(
138        &self,
139        snapshot_metadata: &SnapshotMetadata,
140    ) -> Result<(), Error>;
141
142    /// Retrieves the current snapshot metadata.
143    /// Sync operation as it reads from in-memory structures.
144    fn snapshot_metadata(&self) -> Option<SnapshotMetadata>;
145
146    /// Persists snapshot metadata to durable storage.
147    /// Should be async as it involves disk I/O.
148    fn persist_last_snapshot_metadata(
149        &self,
150        snapshot_metadata: &SnapshotMetadata,
151    ) -> Result<(), Error>;
152
153    /// Applies a snapshot received from the Raft leader to the local state machine
154    ///
155    /// # Critical Security and Integrity Measures
156    /// 1. Checksum Validation: Verifies snapshot integrity before application
157    /// 2. Version Validation: Ensures snapshot is newer than current state
158    /// 3. Atomic Application: Uses locking to prevent concurrent modifications
159    /// 4. File Validation: Confirms compressed format before decompression
160    ///
161    /// # Workflow
162    /// 1. Validate snapshot metadata and version
163    /// 2. Verify compressed file format
164    /// 3. Decompress to temporary directory
165    /// 4. Validate checklsum
166    /// 5. Initialize new state machine database
167    /// 6. Atomically replace current database
168    /// 7. Update Raft metadata and indexes
169    async fn apply_snapshot_from_file(
170        &self,
171        metadata: &SnapshotMetadata,
172        snapshot_path: std::path::PathBuf,
173    ) -> Result<(), Error>;
174
175    /// Generates a snapshot of the state machine's current key-value entries
176    /// up to the specified `last_included_index`.
177    ///
178    /// This function:
179    /// 1. Creates a new database at `temp_snapshot_path`.
180    /// 2. Copies all key-value entries from the current state machine's database where the key
181    ///    (interpreted as a log index) does not exceed `last_included_index`.
182    /// 3. Uses batch writes for efficiency, committing every 100 records.
183    /// 4. Will update last_included_index and last_included_term in memory
184    /// 5. Will persist last_included_index and last_included_term into current database and new
185    ///    database specified by `temp_snapshot_path`
186    ///
187    /// # Arguments
188    /// * `new_snapshot_dir` - Temporary path to store the snapshot data.
189    /// * `last_included_index` - Last log index included in the snapshot.
190    /// * `last_included_term` - Last log term included in the snapshot.
191    ///
192    /// # Returns
193    /// * if success, checksum will be returned
194    async fn generate_snapshot_data(
195        &self,
196        new_snapshot_dir: std::path::PathBuf,
197        last_included: LogId,
198    ) -> Result<Bytes, Error>;
199
200    /// Saves the hard state of the state machine.
201    /// Sync operation as it typically just delegates to other persistence methods.
202    fn save_hard_state(&self) -> Result<(), Error>;
203
204    /// Flushes any pending writes to durable storage.
205    /// Sync operation that may block but provides a synchronous interface.
206    fn flush(&self) -> Result<(), Error>;
207
208    /// Flushes any pending writes to durable storage.
209    /// Async operation as it involves disk I/O.
210    async fn flush_async(&self) -> Result<(), Error>;
211
212    /// Resets the state machine to its initial state.
213    /// Async operation as it may involve cleaning up files and data.
214    async fn reset(&self) -> Result<(), Error>;
215
216    /// Background lease cleanup hook.
217    ///
218    /// Called periodically by the background cleanup task (if enabled).
219    /// Returns keys that were cleaned up.
220    ///
221    /// # Default Implementation
222    /// No-op, suitable for state machines without lease support.
223    ///
224    /// # Called By
225    /// Framework calls this from background cleanup task spawned in NodeBuilder::build().
226    /// Only called when cleanup_strategy = "background".
227    ///
228    /// # Returns
229    /// - Vec of deleted keys (for logging/metrics)
230    ///
231    /// # Example (d-engine built-in state machines)
232    /// ```ignore
233    /// async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
234    ///     if let Some(ref lease) = self.lease {
235    ///         let expired_keys = lease.get_expired_keys(SystemTime::now());
236    ///         if !expired_keys.is_empty() {
237    ///             self.delete_batch(&expired_keys).await?;
238    ///         }
239    ///         return Ok(expired_keys);
240    ///     }
241    ///     Ok(vec![])
242    /// }
243    /// ```
244    async fn lease_background_cleanup(&self) -> Result<Vec<bytes::Bytes>, Error> {
245        Ok(vec![])
246    }
247}