d_engine_core/storage/state_machine.rs
1//! State machine trait for Raft consensus.
2//!
3//! See the [server customization guide](https://github.com/deventlab/d-engine/blob/master/d-engine/src/docs/server_guide/customize-state-machine.md) for details.
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use d_engine_proto::common::LogId;
8use d_engine_proto::server::storage::SnapshotMetadata;
9
10use crate::ApplyEntry;
11#[cfg(any(test, feature = "__test_support"))]
12use mockall::automock;
13
14use crate::Error;
15use crate::StorageError;
16
17/// All `(key, value)` pairs returned by a prefix scan, plus the revision anchor.
18///
19/// `revision` equals `last_applied_index` at scan time — clients use it as the
20/// filter threshold when draining a watch buffer: skip events where
21/// `event.revision <= scan_result.revision` to avoid double-applying.
22#[derive(Debug, Clone)]
23pub struct ScanResult {
24 pub entries: Vec<(Bytes, Bytes)>,
25 pub revision: u64,
26}
27
28/// Result of applying a single log entry to the state machine
29///
30/// Returned by `StateMachine::apply_chunk()` for each entry in the chunk.
31/// Enables operations like CAS to communicate their execution result back to clients.
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct ApplyResult {
34 /// Log index of the applied entry
35 pub index: u64,
36
37 /// Whether the operation succeeded
38 ///
39 /// Semantics by operation type:
40 /// - CAS: `true` if compare succeeded and value was updated, `false` otherwise
41 /// - PUT/DELETE: always `true` (failures return `Err` from `apply_chunk`)
42 pub succeeded: bool,
43}
44
45impl ApplyResult {
46 /// Create a successful result for the given index
47 pub fn success(index: u64) -> Self {
48 Self {
49 index,
50 succeeded: true,
51 }
52 }
53
54 /// Create a failed result for the given index
55 pub fn failure(index: u64) -> Self {
56 Self {
57 index,
58 succeeded: false,
59 }
60 }
61}
62
63/// State machine trait for Raft consensus
64///
65/// # Thread Safety Requirements
66///
67/// **CRITICAL**: Implementations MUST be thread-safe.
68///
69/// - Read methods (`get()`, `len()`) may be called concurrently
70/// - Write methods should use internal synchronization
71/// - No assumptions about caller's threading model
72#[cfg_attr(any(test, feature = "__test_support"), automock)]
73#[async_trait]
74pub trait StateMachine: Send + Sync + 'static {
75 /// Starts the state machine service.
76 ///
77 /// This method:
78 /// 1. Flips internal state flags
79 /// 2. Loads persisted data (e.g., TTL state from disk)
80 ///
81 /// Called once during node startup. Performance is not critical.
82 async fn start(&self) -> Result<(), Error>;
83
84 /// Stops the state machine service gracefully.
85 /// This is typically a sync operation for state management.
86 fn stop(&self) -> Result<(), Error>;
87
88 /// Checks if the state machine is currently running.
89 /// Sync operation as it just checks an atomic boolean.
90 fn is_running(&self) -> bool;
91
92 /// Retrieves a value by key from the state machine.
93 /// Sync operation as it accesses in-memory data structures.
94 fn get(
95 &self,
96 key_buffer: &[u8],
97 ) -> Result<Option<Bytes>, Error>;
98
99 /// Returns the term of a specific log entry by its ID.
100 /// Sync operation as it queries in-memory data.
101 fn entry_term(
102 &self,
103 entry_id: u64,
104 ) -> Option<u64>;
105
106 /// Applies a batch of decoded log entries to the state machine.
107 ///
108 /// Receives `&[ApplyEntry]` (already decoded by the framework) instead of raw
109 /// proto `Entry`. The framework decodes `bytes → Command` exactly once in
110 /// `DefaultStateMachineHandler`; implementors never touch proto or wire format.
111 ///
112 /// The returned `Vec` MUST have the same length as `chunk` and preserve order.
113 /// Pre-allocate with `Vec::with_capacity(chunk.len())` on the hot path.
114 async fn apply_chunk(
115 &self,
116 chunk: &[ApplyEntry],
117 ) -> Result<Vec<ApplyResult>, Error>;
118
119 /// Returns the number of entries in the state machine.
120 /// NOTE: This may be expensive for some implementations.
121 /// Sync operation but should be used cautiously.
122 fn len(&self) -> usize;
123
124 /// Checks if the state machine is empty.
125 /// Sync operation that typically delegates to len().
126 fn is_empty(&self) -> bool {
127 self.len() == 0
128 }
129
130 /// Updates the last applied index in memory.
131 /// Sync operation as it just updates atomic variables.
132 fn update_last_applied(
133 &self,
134 last_applied: LogId,
135 );
136
137 /// Gets the last applied log index and term.
138 /// Sync operation as it reads from atomic variables.
139 fn last_applied(&self) -> LogId;
140
141 /// Persists the last applied index to durable storage.
142 /// Should be async as it involves disk I/O.
143 fn persist_last_applied(
144 &self,
145 last_applied: LogId,
146 ) -> Result<(), Error>;
147
148 /// Updates snapshot metadata in memory.
149 /// Sync operation as it updates in-memory structures.
150 fn update_last_snapshot_metadata(
151 &self,
152 snapshot_metadata: &SnapshotMetadata,
153 ) -> Result<(), Error>;
154
155 /// Retrieves the current snapshot metadata.
156 /// Sync operation as it reads from in-memory structures.
157 fn snapshot_metadata(&self) -> Option<SnapshotMetadata>;
158
159 /// Persists snapshot metadata to durable storage.
160 /// Should be async as it involves disk I/O.
161 fn persist_last_snapshot_metadata(
162 &self,
163 snapshot_metadata: &SnapshotMetadata,
164 ) -> Result<(), Error>;
165
166 /// Applies a snapshot received from the Raft leader to the local state machine
167 ///
168 /// # Critical Security and Integrity Measures
169 /// 1. Checksum Validation: Verifies snapshot integrity before application
170 /// 2. Version Validation: Ensures snapshot is newer than current state
171 /// 3. Atomic Application: Uses locking to prevent concurrent modifications
172 /// 4. File Validation: Confirms compressed format before decompression
173 ///
174 /// # Workflow
175 /// 1. Validate snapshot metadata and version
176 /// 2. Verify compressed file format
177 /// 3. Decompress to temporary directory
178 /// 4. Validate checklsum
179 /// 5. Initialize new state machine database
180 /// 6. Atomically replace current database
181 /// 7. Update Raft metadata and indexes
182 async fn apply_snapshot_from_file(
183 &self,
184 metadata: &SnapshotMetadata,
185 snapshot_path: std::path::PathBuf,
186 ) -> Result<(), Error>;
187
188 /// Generates a snapshot of the state machine's current key-value entries
189 /// up to the specified `last_included_index`.
190 ///
191 /// This function:
192 /// 1. Creates a new database at `temp_snapshot_path`.
193 /// 2. Copies all key-value entries from the current state machine's database where the key
194 /// (interpreted as a log index) does not exceed `last_included_index`.
195 /// 3. Uses batch writes for efficiency, committing every 100 records.
196 /// 4. Will update last_included_index and last_included_term in memory
197 /// 5. Will persist last_included_index and last_included_term into current database and new
198 /// database specified by `temp_snapshot_path`
199 ///
200 /// # Arguments
201 /// * `new_snapshot_dir` - Temporary path to store the snapshot data.
202 /// * `last_included_index` - Last log index included in the snapshot.
203 /// * `last_included_term` - Last log term included in the snapshot.
204 ///
205 /// # Returns
206 /// * if success, checksum will be returned
207 async fn generate_snapshot_data(
208 &self,
209 new_snapshot_dir: std::path::PathBuf,
210 last_included: LogId,
211 ) -> Result<Bytes, Error>;
212
213 /// Saves the hard state of the state machine.
214 /// Sync operation as it typically just delegates to other persistence methods.
215 fn save_hard_state(&self) -> Result<(), Error>;
216
217 /// Flushes any pending writes to durable storage.
218 /// Sync operation that may block but provides a synchronous interface.
219 fn flush(&self) -> Result<(), Error>;
220
221 /// Flushes any pending writes to durable storage.
222 /// Async operation as it involves disk I/O.
223 async fn flush_async(&self) -> Result<(), Error>;
224
225 /// Resets the state machine to its initial state.
226 /// Async operation as it may involve cleaning up files and data.
227 async fn reset(&self) -> Result<(), Error>;
228
229 /// Returns all `(key, value)` pairs whose key starts with `prefix`, plus
230 /// the `last_applied_index` at the time of the scan.
231 ///
232 /// Clients use `revision` as the filter anchor when draining watch events
233 /// after reconnection: skip events where `event.revision <= revision`.
234 ///
235 /// # Default implementation
236 /// Returns an error. Implementations that support prefix scan must override.
237 fn scan_prefix(
238 &self,
239 prefix: &[u8],
240 ) -> Result<ScanResult, Error> {
241 let _ = prefix;
242 Err(Error::System(crate::SystemError::Storage(
243 StorageError::StateMachineError(
244 "scan_prefix not supported by this state machine".into(),
245 ),
246 )))
247 }
248
249 /// Background lease cleanup hook.
250 ///
251 /// Called periodically by the background cleanup task (if enabled).
252 /// Returns keys that were cleaned up.
253 ///
254 /// # Default Implementation
255 /// No-op, suitable for state machines without lease support.
256 ///
257 /// # Called By
258 /// Framework calls this from background cleanup task spawned in NodeBuilder::build().
259 /// Only called when cleanup_strategy = "background".
260 ///
261 /// # Returns
262 /// - Vec of deleted keys (for logging/metrics)
263 ///
264 /// # Example (d-engine built-in state machines)
265 /// ```ignore
266 /// async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
267 /// if let Some(ref lease) = self.lease {
268 /// let expired_keys = lease.get_expired_keys(SystemTime::now());
269 /// if !expired_keys.is_empty() {
270 /// self.delete_batch(&expired_keys).await?;
271 /// }
272 /// return Ok(expired_keys);
273 /// }
274 /// Ok(vec![])
275 /// }
276 /// ```
277 async fn lease_background_cleanup(&self) -> Result<Vec<bytes::Bytes>, Error> {
278 Ok(vec![])
279 }
280}