d_engine/storage/state_machine.rs
1#![doc = include_str!("../docs/server_guide/customize-state-machine.md")]
2
3#[cfg(test)]
4use mockall::automock;
5use tonic::async_trait;
6
7use crate::proto::common::Entry;
8use crate::proto::common::LogId;
9use crate::proto::storage::SnapshotMetadata;
10use crate::ConvertError;
11use crate::Error;
12
13#[cfg_attr(test, automock)]
14#[async_trait]
15pub trait StateMachine: Send + Sync + 'static {
16 /// Starts the state machine service.
17 /// This is typically a sync operation as it just flips internal state flags.
18 fn start(&self) -> Result<(), Error>;
19
20 /// Stops the state machine service gracefully.
21 /// This is typically a sync operation for state management.
22 fn stop(&self) -> Result<(), Error>;
23
24 /// Checks if the state machine is currently running.
25 /// Sync operation as it just checks an atomic boolean.
26 fn is_running(&self) -> bool;
27
28 /// Retrieves a value by key from the state machine.
29 /// Sync operation as it accesses in-memory data structures.
30 fn get(
31 &self,
32 key_buffer: &[u8],
33 ) -> Result<Option<Vec<u8>>, Error>;
34
35 /// Returns the term of a specific log entry by its ID.
36 /// Sync operation as it queries in-memory data.
37 fn entry_term(
38 &self,
39 entry_id: u64,
40 ) -> Option<u64>;
41
42 /// Applies a chunk of log entries to the state machine.
43 /// Async operation as it may involve disk I/O for persistence.
44 async fn apply_chunk(
45 &self,
46 chunk: Vec<Entry>,
47 ) -> Result<(), Error>;
48
49 /// Returns the number of entries in the state machine.
50 /// NOTE: This may be expensive for some implementations.
51 /// Sync operation but should be used cautiously.
52 fn len(&self) -> usize;
53
54 /// Checks if the state machine is empty.
55 /// Sync operation that typically delegates to len().
56 fn is_empty(&self) -> bool {
57 self.len() == 0
58 }
59
60 /// Updates the last applied index in memory.
61 /// Sync operation as it just updates atomic variables.
62 fn update_last_applied(
63 &self,
64 last_applied: LogId,
65 );
66
67 /// Gets the last applied log index and term.
68 /// Sync operation as it reads from atomic variables.
69 fn last_applied(&self) -> LogId;
70
71 /// Persists the last applied index to durable storage.
72 /// Should be async as it involves disk I/O.
73 fn persist_last_applied(
74 &self,
75 last_applied: LogId,
76 ) -> Result<(), Error>;
77
78 /// Updates snapshot metadata in memory.
79 /// Sync operation as it updates in-memory structures.
80 fn update_last_snapshot_metadata(
81 &self,
82 snapshot_metadata: &SnapshotMetadata,
83 ) -> Result<(), Error>;
84
85 /// Retrieves the current snapshot metadata.
86 /// Sync operation as it reads from in-memory structures.
87 fn snapshot_metadata(&self) -> Option<SnapshotMetadata>;
88
89 /// Persists snapshot metadata to durable storage.
90 /// Should be async as it involves disk I/O.
91 fn persist_last_snapshot_metadata(
92 &self,
93 snapshot_metadata: &SnapshotMetadata,
94 ) -> Result<(), Error>;
95
96 /// Applies a snapshot received from the Raft leader to the local state machine
97 ///
98 /// # Critical Security and Integrity Measures
99 /// 1. Checksum Validation: Verifies snapshot integrity before application
100 /// 2. Version Validation: Ensures snapshot is newer than current state
101 /// 3. Atomic Application: Uses locking to prevent concurrent modifications
102 /// 4. File Validation: Confirms compressed format before decompression
103 ///
104 /// # Workflow
105 /// 1. Validate snapshot metadata and version
106 /// 2. Verify compressed file format
107 /// 3. Decompress to temporary directory
108 /// 4. Validate checklsum
109 /// 5. Initialize new state machine database
110 /// 6. Atomically replace current database
111 /// 7. Update Raft metadata and indexes
112 async fn apply_snapshot_from_file(
113 &self,
114 metadata: &SnapshotMetadata,
115 snapshot_path: std::path::PathBuf,
116 ) -> Result<(), Error>;
117
118 /// Generates a snapshot of the state machine's current key-value entries
119 /// up to the specified `last_included_index`.
120 ///
121 /// This function:
122 /// 1. Creates a new database at `temp_snapshot_path`.
123 /// 2. Copies all key-value entries from the current state machine's database where the key
124 /// (interpreted as a log index) does not exceed `last_included_index`.
125 /// 3. Uses batch writes for efficiency, committing every 100 records.
126 /// 4. Will update last_included_index and last_included_term in memory
127 /// 5. Will persist last_included_index and last_included_term into current database and new
128 /// database specified by `temp_snapshot_path`
129 ///
130 /// # Arguments
131 /// * `new_snapshot_dir` - Temporary path to store the snapshot data.
132 /// * `last_included_index` - Last log index included in the snapshot.
133 /// * `last_included_term` - Last log term included in the snapshot.
134 ///
135 /// # Returns
136 /// * if success, checksum will be returned
137 async fn generate_snapshot_data(
138 &self,
139 new_snapshot_dir: std::path::PathBuf,
140 last_included: LogId,
141 ) -> Result<[u8; 32], Error>;
142
143 /// Saves the hard state of the state machine.
144 /// Sync operation as it typically just delegates to other persistence methods.
145 fn save_hard_state(&self) -> Result<(), Error>;
146
147 /// Flushes any pending writes to durable storage.
148 /// Sync operation that may block but provides a synchronous interface.
149 fn flush(&self) -> Result<(), Error>;
150
151 /// Flushes any pending writes to durable storage.
152 /// Async operation as it involves disk I/O.
153 async fn flush_async(&self) -> Result<(), Error>;
154
155 /// Resets the state machine to its initial state.
156 /// Async operation as it may involve cleaning up files and data.
157 async fn reset(&self) -> Result<(), Error>;
158}
159
160impl SnapshotMetadata {
161 pub fn checksum_array(&self) -> Result<[u8; 32], Error> {
162 if self.checksum.len() == 32 {
163 let mut array = [0u8; 32];
164 array.copy_from_slice(&self.checksum);
165 Ok(array)
166 } else {
167 Err(ConvertError::ConversionFailure("Invalid checksum length".to_string()).into())
168 }
169 }
170
171 pub fn set_checksum_array(
172 &mut self,
173 array: [u8; 32],
174 ) {
175 self.checksum = array.to_vec();
176 }
177}