d_engine/storage/
state_machine.rs

1#![doc = include_str!("../docs/server_guide/customize-state-machine.md")]
2
3#[cfg(test)]
4use mockall::automock;
5use tonic::async_trait;
6
7use crate::proto::common::Entry;
8use crate::proto::common::LogId;
9use crate::proto::storage::SnapshotMetadata;
10use crate::ConvertError;
11use crate::Error;
12
13#[cfg_attr(test, automock)]
14#[async_trait]
15pub trait StateMachine: Send + Sync + 'static {
16    /// Starts the state machine service.
17    /// This is typically a sync operation as it just flips internal state flags.
18    fn start(&self) -> Result<(), Error>;
19
20    /// Stops the state machine service gracefully.
21    /// This is typically a sync operation for state management.
22    fn stop(&self) -> Result<(), Error>;
23
24    /// Checks if the state machine is currently running.
25    /// Sync operation as it just checks an atomic boolean.
26    fn is_running(&self) -> bool;
27
28    /// Retrieves a value by key from the state machine.
29    /// Sync operation as it accesses in-memory data structures.
30    fn get(
31        &self,
32        key_buffer: &[u8],
33    ) -> Result<Option<Vec<u8>>, Error>;
34
35    /// Returns the term of a specific log entry by its ID.
36    /// Sync operation as it queries in-memory data.
37    fn entry_term(
38        &self,
39        entry_id: u64,
40    ) -> Option<u64>;
41
42    /// Applies a chunk of log entries to the state machine.
43    /// Async operation as it may involve disk I/O for persistence.
44    async fn apply_chunk(
45        &self,
46        chunk: Vec<Entry>,
47    ) -> Result<(), Error>;
48
49    /// Returns the number of entries in the state machine.
50    /// NOTE: This may be expensive for some implementations.
51    /// Sync operation but should be used cautiously.
52    fn len(&self) -> usize;
53
54    /// Checks if the state machine is empty.
55    /// Sync operation that typically delegates to len().
56    fn is_empty(&self) -> bool {
57        self.len() == 0
58    }
59
60    /// Updates the last applied index in memory.
61    /// Sync operation as it just updates atomic variables.
62    fn update_last_applied(
63        &self,
64        last_applied: LogId,
65    );
66
67    /// Gets the last applied log index and term.
68    /// Sync operation as it reads from atomic variables.
69    fn last_applied(&self) -> LogId;
70
71    /// Persists the last applied index to durable storage.
72    /// Should be async as it involves disk I/O.
73    fn persist_last_applied(
74        &self,
75        last_applied: LogId,
76    ) -> Result<(), Error>;
77
78    /// Updates snapshot metadata in memory.
79    /// Sync operation as it updates in-memory structures.
80    fn update_last_snapshot_metadata(
81        &self,
82        snapshot_metadata: &SnapshotMetadata,
83    ) -> Result<(), Error>;
84
85    /// Retrieves the current snapshot metadata.
86    /// Sync operation as it reads from in-memory structures.
87    fn snapshot_metadata(&self) -> Option<SnapshotMetadata>;
88
89    /// Persists snapshot metadata to durable storage.
90    /// Should be async as it involves disk I/O.
91    fn persist_last_snapshot_metadata(
92        &self,
93        snapshot_metadata: &SnapshotMetadata,
94    ) -> Result<(), Error>;
95
96    /// Applies a snapshot received from the Raft leader to the local state machine
97    ///
98    /// # Critical Security and Integrity Measures
99    /// 1. Checksum Validation: Verifies snapshot integrity before application
100    /// 2. Version Validation: Ensures snapshot is newer than current state
101    /// 3. Atomic Application: Uses locking to prevent concurrent modifications
102    /// 4. File Validation: Confirms compressed format before decompression
103    ///
104    /// # Workflow
105    /// 1. Validate snapshot metadata and version
106    /// 2. Verify compressed file format
107    /// 3. Decompress to temporary directory
108    /// 4. Validate checklsum
109    /// 5. Initialize new state machine database
110    /// 6. Atomically replace current database
111    /// 7. Update Raft metadata and indexes
112    async fn apply_snapshot_from_file(
113        &self,
114        metadata: &SnapshotMetadata,
115        snapshot_path: std::path::PathBuf,
116    ) -> Result<(), Error>;
117
118    /// Generates a snapshot of the state machine's current key-value entries
119    /// up to the specified `last_included_index`.
120    ///
121    /// This function:
122    /// 1. Creates a new database at `temp_snapshot_path`.
123    /// 2. Copies all key-value entries from the current state machine's database where the key
124    ///    (interpreted as a log index) does not exceed `last_included_index`.
125    /// 3. Uses batch writes for efficiency, committing every 100 records.
126    /// 4. Will update last_included_index and last_included_term in memory
127    /// 5. Will persist last_included_index and last_included_term into current database and new
128    ///    database specified by `temp_snapshot_path`
129    ///
130    /// # Arguments
131    /// * `new_snapshot_dir` - Temporary path to store the snapshot data.
132    /// * `last_included_index` - Last log index included in the snapshot.
133    /// * `last_included_term` - Last log term included in the snapshot.
134    ///
135    /// # Returns
136    /// * if success, checksum will be returned
137    async fn generate_snapshot_data(
138        &self,
139        new_snapshot_dir: std::path::PathBuf,
140        last_included: LogId,
141    ) -> Result<[u8; 32], Error>;
142
143    /// Saves the hard state of the state machine.
144    /// Sync operation as it typically just delegates to other persistence methods.
145    fn save_hard_state(&self) -> Result<(), Error>;
146
147    /// Flushes any pending writes to durable storage.
148    /// Sync operation that may block but provides a synchronous interface.
149    fn flush(&self) -> Result<(), Error>;
150
151    /// Flushes any pending writes to durable storage.
152    /// Async operation as it involves disk I/O.
153    async fn flush_async(&self) -> Result<(), Error>;
154
155    /// Resets the state machine to its initial state.
156    /// Async operation as it may involve cleaning up files and data.
157    async fn reset(&self) -> Result<(), Error>;
158}
159
160impl SnapshotMetadata {
161    pub fn checksum_array(&self) -> Result<[u8; 32], Error> {
162        if self.checksum.len() == 32 {
163            let mut array = [0u8; 32];
164            array.copy_from_slice(&self.checksum);
165            Ok(array)
166        } else {
167            Err(ConvertError::ConversionFailure("Invalid checksum length".to_string()).into())
168        }
169    }
170
171    pub fn set_checksum_array(
172        &mut self,
173        array: [u8; 32],
174    ) {
175        self.checksum = array.to_vec();
176    }
177}