praborrow_lease/
state_machine.rs

1//! State Machine Abstraction
2//!
3//! Provides the interface for user-defined state machines that can be replicated via Raft.
4
5use serde::{Serialize, de::DeserializeOwned};
6use std::fmt::Debug;
7
8/// Abstract state machine that can be replicated via Raft.
9///
10/// The state machine receives commands from the Raft log and applies them
11/// to produce outputs. It also supports snapshotting for log compaction.
12///
13/// # Type Parameters
14///
15/// - `Command`: The command type that modifies state
16/// - `Output`: The result of applying a command
17/// - `SnapshotData`: The serialized snapshot format
18///
19/// # Example
20///
21/// ```ignore
22/// use praborrow_lease::state_machine::StateMachine;
23///
24/// struct KeyValueStore {
25///     data: HashMap<String, String>,
26/// }
27///
28/// impl StateMachine for KeyValueStore {
29///     type Command = KvCommand;
30///     type Output = Option<String>;
31///     type SnapshotData = HashMap<String, String>;
32///
33///     fn apply(&mut self, command: Self::Command) -> Self::Output {
34///         match command {
35///             KvCommand::Set { key, value } => {
36///                 self.data.insert(key, value)
37///             }
38///             KvCommand::Get { key } => {
39///                 self.data.get(&key).cloned()
40///             }
41///             KvCommand::Delete { key } => {
42///                 self.data.remove(&key)
43///             }
44///         }
45///     }
46///
47///     fn snapshot(&self) -> Self::SnapshotData {
48///         self.data.clone()
49///     }
50///
51///     fn restore(&mut self, snapshot: Self::SnapshotData) {
52///         self.data = snapshot;
53///     }
54/// }
55/// ```
56pub trait StateMachine: Send + Sync {
57    /// The command type that modifies the state machine.
58    type Command: Clone + Send + Sync + Serialize + DeserializeOwned + Debug + 'static;
59
60    /// The output produced when applying a command.
61    type Output: Clone + Send + Sync + Serialize + DeserializeOwned + Debug + 'static;
62
63    /// The snapshot data format.
64    type SnapshotData: Clone + Send + Sync + Serialize + DeserializeOwned + 'static;
65
66    /// Applies a command to the state machine, returning the result.
67    ///
68    /// This method must be deterministic - the same command applied to the
69    /// same state must always produce the same result.
70    fn apply(&mut self, command: Self::Command) -> Self::Output;
71
72    /// Creates a snapshot of the current state.
73    ///
74    /// This is used for log compaction. The snapshot must capture
75    /// the complete state as of the last applied command.
76    fn snapshot(&self) -> Self::SnapshotData;
77
78    /// Restores the state machine from a snapshot.
79    ///
80    /// Called when installing a snapshot from the leader.
81    fn restore(&mut self, snapshot: Self::SnapshotData);
82
83    /// Returns the name of this state machine (for logging/metrics).
84    fn name(&self) -> &str {
85        std::any::type_name::<Self>()
86    }
87}
88
89/// A simple no-op state machine for testing.
90#[derive(Debug, Default, Clone)]
91pub struct NoOpStateMachine;
92
93impl StateMachine for NoOpStateMachine {
94    type Command = Vec<u8>;
95    type Output = ();
96    type SnapshotData = ();
97
98    fn apply(&mut self, _command: Self::Command) -> Self::Output {}
99
100    fn snapshot(&self) -> Self::SnapshotData {}
101
102    fn restore(&mut self, _snapshot: Self::SnapshotData) {
103        // No state to restore
104    }
105
106    fn name(&self) -> &str {
107        "NoOpStateMachine"
108    }
109}
110
111/// A key-value store state machine for testing and examples.
112#[derive(Debug, Clone)]
113pub struct KeyValueStateMachine {
114    data: std::collections::HashMap<String, Vec<u8>>,
115}
116
117impl Default for KeyValueStateMachine {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123impl KeyValueStateMachine {
124    pub fn new() -> Self {
125        Self {
126            data: std::collections::HashMap::new(),
127        }
128    }
129
130    pub fn get(&self, key: &str) -> Option<&Vec<u8>> {
131        self.data.get(key)
132    }
133
134    pub fn len(&self) -> usize {
135        self.data.len()
136    }
137
138    pub fn is_empty(&self) -> bool {
139        self.data.is_empty()
140    }
141}
142
143/// Commands for the key-value state machine.
144#[derive(Debug, Clone, Serialize, serde::Deserialize)]
145pub enum KvCommand {
146    /// Set a key to a value
147    Set { key: String, value: Vec<u8> },
148    /// Delete a key
149    Delete { key: String },
150}
151
152/// Output for key-value commands.
153#[derive(Debug, Clone, Serialize, serde::Deserialize)]
154pub enum KvOutput {
155    /// Previous value (if any) for Set/Delete
156    Value(Option<Vec<u8>>),
157}
158
159impl StateMachine for KeyValueStateMachine {
160    type Command = KvCommand;
161    type Output = KvOutput;
162    type SnapshotData = std::collections::HashMap<String, Vec<u8>>;
163
164    fn apply(&mut self, command: Self::Command) -> Self::Output {
165        match command {
166            KvCommand::Set { key, value } => {
167                let old = self.data.insert(key, value);
168                KvOutput::Value(old)
169            }
170            KvCommand::Delete { key } => {
171                let old = self.data.remove(&key);
172                KvOutput::Value(old)
173            }
174        }
175    }
176
177    fn snapshot(&self) -> Self::SnapshotData {
178        self.data.clone()
179    }
180
181    fn restore(&mut self, snapshot: Self::SnapshotData) {
182        self.data = snapshot;
183    }
184
185    fn name(&self) -> &str {
186        "KeyValueStateMachine"
187    }
188}
189
190// ============================================================================
191// REPLICATED STATE MACHINE
192// ============================================================================
193
194use crate::engine::ConsensusError;
195use crate::raft::{LogIndex, RaftStorage};
196
197/// A replicated state machine that applies committed log entries.
198pub struct ReplicatedStateMachine<SM: StateMachine, S: RaftStorage<SM::Command>>
199where
200    SM::Command: Send + Sync,
201{
202    /// The underlying state machine
203    state_machine: SM,
204    /// Storage for accessing committed entries
205    storage: S,
206    /// Last applied log index
207    last_applied: LogIndex,
208}
209
210impl<SM, S> ReplicatedStateMachine<SM, S>
211where
212    SM: StateMachine,
213    SM::Command: Send + Sync,
214    S: RaftStorage<SM::Command>,
215{
216    /// Creates a new replicated state machine.
217    pub fn new(state_machine: SM, storage: S) -> Self {
218        Self {
219            state_machine,
220            storage,
221            last_applied: 0,
222        }
223    }
224
225    /// Returns a reference to the underlying state machine.
226    pub fn state_machine(&self) -> &SM {
227        &self.state_machine
228    }
229
230    /// Returns the last applied index.
231    pub fn last_applied(&self) -> LogIndex {
232        self.last_applied
233    }
234
235    /// Applies all committed entries up to commit_index.
236    ///
237    /// Returns the outputs for each applied command.
238    pub async fn apply_committed(
239        &mut self,
240        commit_index: LogIndex,
241    ) -> Result<Vec<(LogIndex, SM::Output)>, ConsensusError> {
242        let mut outputs = Vec::new();
243
244        while self.last_applied < commit_index {
245            let next_index = self.last_applied + 1;
246
247            if let Some(entry) = self.storage.get_log_entry(next_index).await? {
248                match entry.command {
249                    crate::raft::LogCommand::App(cmd) => {
250                        let output = self.state_machine.apply(cmd);
251                        outputs.push((next_index, output));
252                    }
253                    crate::raft::LogCommand::Config(_) | crate::raft::LogCommand::NoOp => {
254                        // Configuration changes and NoOps are handled by the consensus engine
255                    }
256                }
257                self.last_applied = next_index;
258
259                tracing::debug!(
260                    index = next_index,
261                    sm = self.state_machine.name(),
262                    "Applied log entry"
263                );
264            } else {
265                // Entry not found - might be compacted
266                break;
267            }
268        }
269
270        Ok(outputs)
271    }
272
273    /// Creates a snapshot of the current state.
274    pub fn create_snapshot(&self) -> SM::SnapshotData {
275        self.state_machine.snapshot()
276    }
277
278    /// Restores from a snapshot.
279    pub fn restore_snapshot(&mut self, snapshot: SM::SnapshotData, last_included_index: LogIndex) {
280        self.state_machine.restore(snapshot);
281        self.last_applied = last_included_index;
282
283        tracing::info!(
284            index = last_included_index,
285            sm = self.state_machine.name(),
286            "Restored from snapshot"
287        );
288    }
289}
290
291// ============================================================================
292// TESTS
293// ============================================================================
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298
299    #[test]
300    fn test_noop_state_machine() {
301        let mut sm = NoOpStateMachine;
302        sm.apply(vec![1, 2, 3]);
303
304        sm.snapshot();
305        sm.restore(());
306    }
307
308    #[test]
309    fn test_kv_state_machine_set() {
310        let mut sm = KeyValueStateMachine::new();
311
312        let output = sm.apply(KvCommand::Set {
313            key: "foo".to_string(),
314            value: b"bar".to_vec(),
315        });
316
317        assert!(matches!(output, KvOutput::Value(None)));
318        assert_eq!(sm.get("foo"), Some(&b"bar".to_vec()));
319    }
320
321    #[test]
322    fn test_kv_state_machine_overwrite() {
323        let mut sm = KeyValueStateMachine::new();
324
325        sm.apply(KvCommand::Set {
326            key: "foo".to_string(),
327            value: b"bar".to_vec(),
328        });
329
330        let output = sm.apply(KvCommand::Set {
331            key: "foo".to_string(),
332            value: b"baz".to_vec(),
333        });
334
335        assert!(matches!(output, KvOutput::Value(Some(_))));
336        assert_eq!(sm.get("foo"), Some(&b"baz".to_vec()));
337    }
338
339    #[test]
340    fn test_kv_state_machine_delete() {
341        let mut sm = KeyValueStateMachine::new();
342
343        sm.apply(KvCommand::Set {
344            key: "foo".to_string(),
345            value: b"bar".to_vec(),
346        });
347
348        let output = sm.apply(KvCommand::Delete {
349            key: "foo".to_string(),
350        });
351
352        assert!(matches!(output, KvOutput::Value(Some(_))));
353        assert!(sm.get("foo").is_none());
354    }
355
356    #[test]
357    fn test_kv_state_machine_snapshot() {
358        let mut sm = KeyValueStateMachine::new();
359
360        sm.apply(KvCommand::Set {
361            key: "a".to_string(),
362            value: b"1".to_vec(),
363        });
364        sm.apply(KvCommand::Set {
365            key: "b".to_string(),
366            value: b"2".to_vec(),
367        });
368
369        let snapshot = sm.snapshot();
370        assert_eq!(snapshot.len(), 2);
371
372        // Modify state
373        sm.apply(KvCommand::Delete {
374            key: "a".to_string(),
375        });
376        assert_eq!(sm.len(), 1);
377
378        // Restore from snapshot
379        sm.restore(snapshot);
380        assert_eq!(sm.len(), 2);
381        assert_eq!(sm.get("a"), Some(&b"1".to_vec()));
382    }
383}