praborrow_lease/
state_machine.rs1use serde::{Serialize, de::DeserializeOwned};
6use std::fmt::Debug;
7
8pub trait StateMachine: Send + Sync {
57 type Command: Clone + Send + Sync + Serialize + DeserializeOwned + Debug + 'static;
59
60 type Output: Clone + Send + Sync + Serialize + DeserializeOwned + Debug + 'static;
62
63 type SnapshotData: Clone + Send + Sync + Serialize + DeserializeOwned + 'static;
65
66 fn apply(&mut self, command: Self::Command) -> Self::Output;
71
72 fn snapshot(&self) -> Self::SnapshotData;
77
78 fn restore(&mut self, snapshot: Self::SnapshotData);
82
83 fn name(&self) -> &str {
85 std::any::type_name::<Self>()
86 }
87}
88
89#[derive(Debug, Default, Clone)]
91pub struct NoOpStateMachine;
92
93impl StateMachine for NoOpStateMachine {
94 type Command = Vec<u8>;
95 type Output = ();
96 type SnapshotData = ();
97
98 fn apply(&mut self, _command: Self::Command) -> Self::Output {}
99
100 fn snapshot(&self) -> Self::SnapshotData {}
101
102 fn restore(&mut self, _snapshot: Self::SnapshotData) {
103 }
105
106 fn name(&self) -> &str {
107 "NoOpStateMachine"
108 }
109}
110
111#[derive(Debug, Clone)]
113pub struct KeyValueStateMachine {
114 data: std::collections::HashMap<String, Vec<u8>>,
115}
116
117impl Default for KeyValueStateMachine {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123impl KeyValueStateMachine {
124 pub fn new() -> Self {
125 Self {
126 data: std::collections::HashMap::new(),
127 }
128 }
129
130 pub fn get(&self, key: &str) -> Option<&Vec<u8>> {
131 self.data.get(key)
132 }
133
134 pub fn len(&self) -> usize {
135 self.data.len()
136 }
137
138 pub fn is_empty(&self) -> bool {
139 self.data.is_empty()
140 }
141}
142
143#[derive(Debug, Clone, Serialize, serde::Deserialize)]
145pub enum KvCommand {
146 Set { key: String, value: Vec<u8> },
148 Delete { key: String },
150}
151
152#[derive(Debug, Clone, Serialize, serde::Deserialize)]
154pub enum KvOutput {
155 Value(Option<Vec<u8>>),
157}
158
159impl StateMachine for KeyValueStateMachine {
160 type Command = KvCommand;
161 type Output = KvOutput;
162 type SnapshotData = std::collections::HashMap<String, Vec<u8>>;
163
164 fn apply(&mut self, command: Self::Command) -> Self::Output {
165 match command {
166 KvCommand::Set { key, value } => {
167 let old = self.data.insert(key, value);
168 KvOutput::Value(old)
169 }
170 KvCommand::Delete { key } => {
171 let old = self.data.remove(&key);
172 KvOutput::Value(old)
173 }
174 }
175 }
176
177 fn snapshot(&self) -> Self::SnapshotData {
178 self.data.clone()
179 }
180
181 fn restore(&mut self, snapshot: Self::SnapshotData) {
182 self.data = snapshot;
183 }
184
185 fn name(&self) -> &str {
186 "KeyValueStateMachine"
187 }
188}
189
190use crate::engine::ConsensusError;
195use crate::raft::{LogIndex, RaftStorage};
196
197pub struct ReplicatedStateMachine<SM: StateMachine, S: RaftStorage<SM::Command>>
199where
200 SM::Command: Send + Sync,
201{
202 state_machine: SM,
204 storage: S,
206 last_applied: LogIndex,
208}
209
210impl<SM, S> ReplicatedStateMachine<SM, S>
211where
212 SM: StateMachine,
213 SM::Command: Send + Sync,
214 S: RaftStorage<SM::Command>,
215{
216 pub fn new(state_machine: SM, storage: S) -> Self {
218 Self {
219 state_machine,
220 storage,
221 last_applied: 0,
222 }
223 }
224
225 pub fn state_machine(&self) -> &SM {
227 &self.state_machine
228 }
229
230 pub fn last_applied(&self) -> LogIndex {
232 self.last_applied
233 }
234
235 pub async fn apply_committed(
239 &mut self,
240 commit_index: LogIndex,
241 ) -> Result<Vec<(LogIndex, SM::Output)>, ConsensusError> {
242 let mut outputs = Vec::new();
243
244 while self.last_applied < commit_index {
245 let next_index = self.last_applied + 1;
246
247 if let Some(entry) = self.storage.get_log_entry(next_index).await? {
248 match entry.command {
249 crate::raft::LogCommand::App(cmd) => {
250 let output = self.state_machine.apply(cmd);
251 outputs.push((next_index, output));
252 }
253 crate::raft::LogCommand::Config(_) | crate::raft::LogCommand::NoOp => {
254 }
256 }
257 self.last_applied = next_index;
258
259 tracing::debug!(
260 index = next_index,
261 sm = self.state_machine.name(),
262 "Applied log entry"
263 );
264 } else {
265 break;
267 }
268 }
269
270 Ok(outputs)
271 }
272
273 pub fn create_snapshot(&self) -> SM::SnapshotData {
275 self.state_machine.snapshot()
276 }
277
278 pub fn restore_snapshot(&mut self, snapshot: SM::SnapshotData, last_included_index: LogIndex) {
280 self.state_machine.restore(snapshot);
281 self.last_applied = last_included_index;
282
283 tracing::info!(
284 index = last_included_index,
285 sm = self.state_machine.name(),
286 "Restored from snapshot"
287 );
288 }
289}
290
291#[cfg(test)]
296mod tests {
297 use super::*;
298
299 #[test]
300 fn test_noop_state_machine() {
301 let mut sm = NoOpStateMachine;
302 sm.apply(vec![1, 2, 3]);
303
304 sm.snapshot();
305 sm.restore(());
306 }
307
308 #[test]
309 fn test_kv_state_machine_set() {
310 let mut sm = KeyValueStateMachine::new();
311
312 let output = sm.apply(KvCommand::Set {
313 key: "foo".to_string(),
314 value: b"bar".to_vec(),
315 });
316
317 assert!(matches!(output, KvOutput::Value(None)));
318 assert_eq!(sm.get("foo"), Some(&b"bar".to_vec()));
319 }
320
321 #[test]
322 fn test_kv_state_machine_overwrite() {
323 let mut sm = KeyValueStateMachine::new();
324
325 sm.apply(KvCommand::Set {
326 key: "foo".to_string(),
327 value: b"bar".to_vec(),
328 });
329
330 let output = sm.apply(KvCommand::Set {
331 key: "foo".to_string(),
332 value: b"baz".to_vec(),
333 });
334
335 assert!(matches!(output, KvOutput::Value(Some(_))));
336 assert_eq!(sm.get("foo"), Some(&b"baz".to_vec()));
337 }
338
339 #[test]
340 fn test_kv_state_machine_delete() {
341 let mut sm = KeyValueStateMachine::new();
342
343 sm.apply(KvCommand::Set {
344 key: "foo".to_string(),
345 value: b"bar".to_vec(),
346 });
347
348 let output = sm.apply(KvCommand::Delete {
349 key: "foo".to_string(),
350 });
351
352 assert!(matches!(output, KvOutput::Value(Some(_))));
353 assert!(sm.get("foo").is_none());
354 }
355
356 #[test]
357 fn test_kv_state_machine_snapshot() {
358 let mut sm = KeyValueStateMachine::new();
359
360 sm.apply(KvCommand::Set {
361 key: "a".to_string(),
362 value: b"1".to_vec(),
363 });
364 sm.apply(KvCommand::Set {
365 key: "b".to_string(),
366 value: b"2".to_vec(),
367 });
368
369 let snapshot = sm.snapshot();
370 assert_eq!(snapshot.len(), 2);
371
372 sm.apply(KvCommand::Delete {
374 key: "a".to_string(),
375 });
376 assert_eq!(sm.len(), 1);
377
378 sm.restore(snapshot);
380 assert_eq!(sm.len(), 2);
381 assert_eq!(sm.get("a"), Some(&b"1".to_vec()));
382 }
383}