d_engine_core/storage/
state_machine.rs

1//! State machine trait for Raft consensus.
2//!
3//! See the [server customization guide](https://github.com/deventlab/d-engine/blob/master/d-engine/src/docs/server_guide/customize-state-machine.md) for details.
4
5use bytes::Bytes;
6use d_engine_proto::common::Entry;
7use d_engine_proto::common::LogId;
8use d_engine_proto::server::storage::SnapshotMetadata;
9#[cfg(any(test, feature = "__test_support"))]
10use mockall::automock;
11use tonic::async_trait;
12
13use crate::Error;
14
15/// State machine trait for Raft consensus
16///
17/// # Thread Safety Requirements
18///
19/// **CRITICAL**: Implementations MUST be thread-safe.
20///
21/// - Read methods (`get()`, `len()`) may be called concurrently
22/// - Write methods should use internal synchronization
23/// - No assumptions about caller's threading model
24#[cfg_attr(any(test, feature = "__test_support"), automock)]
25#[async_trait]
26pub trait StateMachine: Send + Sync + 'static {
27    /// Starts the state machine service.
28    ///
29    /// This method:
30    /// 1. Flips internal state flags
31    /// 2. Loads persisted data (e.g., TTL state from disk)
32    ///
33    /// Called once during node startup. Performance is not critical.
34    async fn start(&self) -> Result<(), Error>;
35
36    /// Stops the state machine service gracefully.
37    /// This is typically a sync operation for state management.
38    fn stop(&self) -> Result<(), Error>;
39
40    /// Checks if the state machine is currently running.
41    /// Sync operation as it just checks an atomic boolean.
42    fn is_running(&self) -> bool;
43
44    /// Retrieves a value by key from the state machine.
45    /// Sync operation as it accesses in-memory data structures.
46    fn get(
47        &self,
48        key_buffer: &[u8],
49    ) -> Result<Option<Bytes>, Error>;
50
51    /// Returns the term of a specific log entry by its ID.
52    /// Sync operation as it queries in-memory data.
53    fn entry_term(
54        &self,
55        entry_id: u64,
56    ) -> Option<u64>;
57
58    /// Applies a chunk of log entries to the state machine.
59    /// Async operation as it may involve disk I/O for persistence.
60    async fn apply_chunk(
61        &self,
62        chunk: Vec<Entry>,
63    ) -> Result<(), Error>;
64
65    /// Returns the number of entries in the state machine.
66    /// NOTE: This may be expensive for some implementations.
67    /// Sync operation but should be used cautiously.
68    fn len(&self) -> usize;
69
70    /// Checks if the state machine is empty.
71    /// Sync operation that typically delegates to len().
72    fn is_empty(&self) -> bool {
73        self.len() == 0
74    }
75
76    /// Updates the last applied index in memory.
77    /// Sync operation as it just updates atomic variables.
78    fn update_last_applied(
79        &self,
80        last_applied: LogId,
81    );
82
83    /// Gets the last applied log index and term.
84    /// Sync operation as it reads from atomic variables.
85    fn last_applied(&self) -> LogId;
86
87    /// Persists the last applied index to durable storage.
88    /// Should be async as it involves disk I/O.
89    fn persist_last_applied(
90        &self,
91        last_applied: LogId,
92    ) -> Result<(), Error>;
93
94    /// Updates snapshot metadata in memory.
95    /// Sync operation as it updates in-memory structures.
96    fn update_last_snapshot_metadata(
97        &self,
98        snapshot_metadata: &SnapshotMetadata,
99    ) -> Result<(), Error>;
100
101    /// Retrieves the current snapshot metadata.
102    /// Sync operation as it reads from in-memory structures.
103    fn snapshot_metadata(&self) -> Option<SnapshotMetadata>;
104
105    /// Persists snapshot metadata to durable storage.
106    /// Should be async as it involves disk I/O.
107    fn persist_last_snapshot_metadata(
108        &self,
109        snapshot_metadata: &SnapshotMetadata,
110    ) -> Result<(), Error>;
111
112    /// Applies a snapshot received from the Raft leader to the local state machine
113    ///
114    /// # Critical Security and Integrity Measures
115    /// 1. Checksum Validation: Verifies snapshot integrity before application
116    /// 2. Version Validation: Ensures snapshot is newer than current state
117    /// 3. Atomic Application: Uses locking to prevent concurrent modifications
118    /// 4. File Validation: Confirms compressed format before decompression
119    ///
120    /// # Workflow
121    /// 1. Validate snapshot metadata and version
122    /// 2. Verify compressed file format
123    /// 3. Decompress to temporary directory
124    /// 4. Validate checklsum
125    /// 5. Initialize new state machine database
126    /// 6. Atomically replace current database
127    /// 7. Update Raft metadata and indexes
128    async fn apply_snapshot_from_file(
129        &self,
130        metadata: &SnapshotMetadata,
131        snapshot_path: std::path::PathBuf,
132    ) -> Result<(), Error>;
133
134    /// Generates a snapshot of the state machine's current key-value entries
135    /// up to the specified `last_included_index`.
136    ///
137    /// This function:
138    /// 1. Creates a new database at `temp_snapshot_path`.
139    /// 2. Copies all key-value entries from the current state machine's database where the key
140    ///    (interpreted as a log index) does not exceed `last_included_index`.
141    /// 3. Uses batch writes for efficiency, committing every 100 records.
142    /// 4. Will update last_included_index and last_included_term in memory
143    /// 5. Will persist last_included_index and last_included_term into current database and new
144    ///    database specified by `temp_snapshot_path`
145    ///
146    /// # Arguments
147    /// * `new_snapshot_dir` - Temporary path to store the snapshot data.
148    /// * `last_included_index` - Last log index included in the snapshot.
149    /// * `last_included_term` - Last log term included in the snapshot.
150    ///
151    /// # Returns
152    /// * if success, checksum will be returned
153    async fn generate_snapshot_data(
154        &self,
155        new_snapshot_dir: std::path::PathBuf,
156        last_included: LogId,
157    ) -> Result<Bytes, Error>;
158
159    /// Saves the hard state of the state machine.
160    /// Sync operation as it typically just delegates to other persistence methods.
161    fn save_hard_state(&self) -> Result<(), Error>;
162
163    /// Flushes any pending writes to durable storage.
164    /// Sync operation that may block but provides a synchronous interface.
165    fn flush(&self) -> Result<(), Error>;
166
167    /// Flushes any pending writes to durable storage.
168    /// Async operation as it involves disk I/O.
169    async fn flush_async(&self) -> Result<(), Error>;
170
171    /// Resets the state machine to its initial state.
172    /// Async operation as it may involve cleaning up files and data.
173    async fn reset(&self) -> Result<(), Error>;
174
175    /// Background lease cleanup hook.
176    ///
177    /// Called periodically by the background cleanup task (if enabled).
178    /// Returns keys that were cleaned up.
179    ///
180    /// # Default Implementation
181    /// No-op, suitable for state machines without lease support.
182    ///
183    /// # Called By
184    /// Framework calls this from background cleanup task spawned in NodeBuilder::build().
185    /// Only called when cleanup_strategy = "background".
186    ///
187    /// # Returns
188    /// - Vec of deleted keys (for logging/metrics)
189    ///
190    /// # Example (d-engine built-in state machines)
191    /// ```ignore
192    /// async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
193    ///     if let Some(ref lease) = self.lease {
194    ///         let expired_keys = lease.get_expired_keys(SystemTime::now());
195    ///         if !expired_keys.is_empty() {
196    ///             self.delete_batch(&expired_keys).await?;
197    ///         }
198    ///         return Ok(expired_keys);
199    ///     }
200    ///     Ok(vec![])
201    /// }
202    /// ```
203    async fn lease_background_cleanup(&self) -> Result<Vec<bytes::Bytes>, Error> {
204        Ok(vec![])
205    }
206}