aegis_replication/
state.rs

1//! Aegis Replication State Machine
2//!
3//! State machine abstraction for replicated state.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::RwLock;
11
12// =============================================================================
13// Command
14// =============================================================================
15
16/// A command to be applied to the state machine.
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct Command {
19    pub command_type: CommandType,
20    pub key: String,
21    pub value: Option<Vec<u8>>,
22    pub metadata: HashMap<String, String>,
23}
24
25impl Command {
26    /// Create a get command.
27    pub fn get(key: impl Into<String>) -> Self {
28        Self {
29            command_type: CommandType::Get,
30            key: key.into(),
31            value: None,
32            metadata: HashMap::new(),
33        }
34    }
35
36    /// Create a set command.
37    pub fn set(key: impl Into<String>, value: Vec<u8>) -> Self {
38        Self {
39            command_type: CommandType::Set,
40            key: key.into(),
41            value: Some(value),
42            metadata: HashMap::new(),
43        }
44    }
45
46    /// Create a delete command.
47    pub fn delete(key: impl Into<String>) -> Self {
48        Self {
49            command_type: CommandType::Delete,
50            key: key.into(),
51            value: None,
52            metadata: HashMap::new(),
53        }
54    }
55
56    /// Serialize the command.
57    pub fn to_bytes(&self) -> Vec<u8> {
58        serde_json::to_vec(self).unwrap_or_default()
59    }
60
61    /// Deserialize a command.
62    pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
63        serde_json::from_slice(bytes).ok()
64    }
65}
66
67// =============================================================================
68// Command Type
69// =============================================================================
70
71/// Type of command.
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
73pub enum CommandType {
74    Get,
75    Set,
76    Delete,
77    CompareAndSwap,
78    Increment,
79    Custom,
80}
81
82// =============================================================================
83// Command Result
84// =============================================================================
85
86/// Result of applying a command.
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct CommandResult {
89    pub success: bool,
90    pub value: Option<Vec<u8>>,
91    pub error: Option<String>,
92    pub applied_index: u64,
93}
94
95impl CommandResult {
96    pub fn success(value: Option<Vec<u8>>, applied_index: u64) -> Self {
97        Self {
98            success: true,
99            value,
100            error: None,
101            applied_index,
102        }
103    }
104
105    pub fn error(message: impl Into<String>, applied_index: u64) -> Self {
106        Self {
107            success: false,
108            value: None,
109            error: Some(message.into()),
110            applied_index,
111        }
112    }
113}
114
115// =============================================================================
116// State Machine
117// =============================================================================
118
119/// The replicated state machine.
120pub struct StateMachine {
121    data: RwLock<HashMap<String, Vec<u8>>>,
122    last_applied: RwLock<u64>,
123    version: RwLock<u64>,
124}
125
126impl StateMachine {
127    /// Create a new state machine.
128    pub fn new() -> Self {
129        Self {
130            data: RwLock::new(HashMap::new()),
131            last_applied: RwLock::new(0),
132            version: RwLock::new(0),
133        }
134    }
135
136    /// Apply a command to the state machine.
137    pub fn apply(&self, command: &Command, index: u64) -> CommandResult {
138        let mut data = self.data.write().unwrap();
139        let mut last_applied = self.last_applied.write().unwrap();
140        let mut version = self.version.write().unwrap();
141
142        if index <= *last_applied {
143            return CommandResult::error("Already applied", *last_applied);
144        }
145
146        let result = match command.command_type {
147            CommandType::Get => {
148                let value = data.get(&command.key).cloned();
149                CommandResult::success(value, index)
150            }
151            CommandType::Set => {
152                if let Some(ref value) = command.value {
153                    data.insert(command.key.clone(), value.clone());
154                    *version += 1;
155                    CommandResult::success(None, index)
156                } else {
157                    CommandResult::error("No value provided", index)
158                }
159            }
160            CommandType::Delete => {
161                let old = data.remove(&command.key);
162                *version += 1;
163                CommandResult::success(old, index)
164            }
165            CommandType::CompareAndSwap => {
166                CommandResult::error("Not implemented", index)
167            }
168            CommandType::Increment => {
169                let current = data
170                    .get(&command.key)
171                    .and_then(|v| String::from_utf8(v.clone()).ok())
172                    .and_then(|s| s.parse::<i64>().ok())
173                    .unwrap_or(0);
174
175                let new_value = (current + 1).to_string().into_bytes();
176                data.insert(command.key.clone(), new_value.clone());
177                *version += 1;
178                CommandResult::success(Some(new_value), index)
179            }
180            CommandType::Custom => {
181                CommandResult::error("Custom commands not handled", index)
182            }
183        };
184
185        *last_applied = index;
186        result
187    }
188
189    /// Get a value from the state machine.
190    pub fn get(&self, key: &str) -> Option<Vec<u8>> {
191        let data = self.data.read().unwrap();
192        data.get(key).cloned()
193    }
194
195    /// Get the last applied index.
196    pub fn last_applied(&self) -> u64 {
197        *self.last_applied.read().unwrap()
198    }
199
200    /// Get the current version.
201    pub fn version(&self) -> u64 {
202        *self.version.read().unwrap()
203    }
204
205    /// Get the number of keys.
206    pub fn len(&self) -> usize {
207        let data = self.data.read().unwrap();
208        data.len()
209    }
210
211    /// Check if the state machine is empty.
212    pub fn is_empty(&self) -> bool {
213        self.len() == 0
214    }
215
216    /// Take a snapshot of the state.
217    pub fn snapshot(&self) -> Snapshot {
218        let data = self.data.read().unwrap();
219        let last_applied = *self.last_applied.read().unwrap();
220        let version = *self.version.read().unwrap();
221
222        Snapshot {
223            data: data.clone(),
224            last_applied,
225            version,
226        }
227    }
228
229    /// Restore from a snapshot.
230    pub fn restore(&self, snapshot: Snapshot) {
231        let mut data = self.data.write().unwrap();
232        let mut last_applied = self.last_applied.write().unwrap();
233        let mut version = self.version.write().unwrap();
234
235        *data = snapshot.data;
236        *last_applied = snapshot.last_applied;
237        *version = snapshot.version;
238    }
239}
240
241impl Default for StateMachine {
242    fn default() -> Self {
243        Self::new()
244    }
245}
246
247// =============================================================================
248// Snapshot
249// =============================================================================
250
251/// A snapshot of the state machine.
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct Snapshot {
254    pub data: HashMap<String, Vec<u8>>,
255    pub last_applied: u64,
256    pub version: u64,
257}
258
259impl Snapshot {
260    /// Serialize the snapshot.
261    pub fn to_bytes(&self) -> Vec<u8> {
262        serde_json::to_vec(self).unwrap_or_default()
263    }
264
265    /// Deserialize a snapshot.
266    pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
267        serde_json::from_slice(bytes).ok()
268    }
269}
270
271// =============================================================================
272// Tests
273// =============================================================================
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278
279    #[test]
280    fn test_command() {
281        let cmd = Command::set("key1", b"value1".to_vec());
282        assert_eq!(cmd.command_type, CommandType::Set);
283        assert_eq!(cmd.key, "key1");
284
285        let bytes = cmd.to_bytes();
286        let restored = Command::from_bytes(&bytes).unwrap();
287        assert_eq!(restored.key, "key1");
288    }
289
290    #[test]
291    fn test_state_machine_set_get() {
292        let sm = StateMachine::new();
293
294        let cmd = Command::set("key1", b"value1".to_vec());
295        let result = sm.apply(&cmd, 1);
296        assert!(result.success);
297        assert_eq!(sm.last_applied(), 1);
298
299        let value = sm.get("key1").unwrap();
300        assert_eq!(value, b"value1");
301    }
302
303    #[test]
304    fn test_state_machine_delete() {
305        let sm = StateMachine::new();
306
307        sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
308        assert!(sm.get("key1").is_some());
309
310        sm.apply(&Command::delete("key1"), 2);
311        assert!(sm.get("key1").is_none());
312    }
313
314    #[test]
315    fn test_state_machine_increment() {
316        let sm = StateMachine::new();
317
318        sm.apply(&Command::set("counter", b"0".to_vec()), 1);
319
320        let cmd = Command {
321            command_type: CommandType::Increment,
322            key: "counter".to_string(),
323            value: None,
324            metadata: HashMap::new(),
325        };
326
327        sm.apply(&cmd, 2);
328        let value = sm.get("counter").unwrap();
329        assert_eq!(String::from_utf8(value).unwrap(), "1");
330    }
331
332    #[test]
333    fn test_snapshot() {
334        let sm = StateMachine::new();
335
336        sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
337        sm.apply(&Command::set("key2", b"value2".to_vec()), 2);
338
339        let snapshot = sm.snapshot();
340        assert_eq!(snapshot.last_applied, 2);
341        assert_eq!(snapshot.data.len(), 2);
342
343        let new_sm = StateMachine::new();
344        new_sm.restore(snapshot);
345
346        assert_eq!(new_sm.get("key1").unwrap(), b"value1");
347        assert_eq!(new_sm.get("key2").unwrap(), b"value2");
348        assert_eq!(new_sm.last_applied(), 2);
349    }
350
351    #[test]
352    fn test_duplicate_apply() {
353        let sm = StateMachine::new();
354
355        let result = sm.apply(&Command::set("key1", b"v1".to_vec()), 1);
356        assert!(result.success);
357
358        let result = sm.apply(&Command::set("key1", b"v2".to_vec()), 1);
359        assert!(!result.success);
360
361        assert_eq!(sm.get("key1").unwrap(), b"v1");
362    }
363}