1use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::RwLock;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct Command {
19 pub command_type: CommandType,
20 pub key: String,
21 pub value: Option<Vec<u8>>,
22 pub metadata: HashMap<String, String>,
23}
24
25impl Command {
26 pub fn get(key: impl Into<String>) -> Self {
28 Self {
29 command_type: CommandType::Get,
30 key: key.into(),
31 value: None,
32 metadata: HashMap::new(),
33 }
34 }
35
36 pub fn set(key: impl Into<String>, value: Vec<u8>) -> Self {
38 Self {
39 command_type: CommandType::Set,
40 key: key.into(),
41 value: Some(value),
42 metadata: HashMap::new(),
43 }
44 }
45
46 pub fn delete(key: impl Into<String>) -> Self {
48 Self {
49 command_type: CommandType::Delete,
50 key: key.into(),
51 value: None,
52 metadata: HashMap::new(),
53 }
54 }
55
56 pub fn to_bytes(&self) -> Vec<u8> {
58 serde_json::to_vec(self).unwrap_or_default()
59 }
60
61 pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
63 serde_json::from_slice(bytes).ok()
64 }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
73pub enum CommandType {
74 Get,
75 Set,
76 Delete,
77 CompareAndSwap,
78 Increment,
79 Custom,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct CommandResult {
89 pub success: bool,
90 pub value: Option<Vec<u8>>,
91 pub error: Option<String>,
92 pub applied_index: u64,
93}
94
95impl CommandResult {
96 pub fn success(value: Option<Vec<u8>>, applied_index: u64) -> Self {
97 Self {
98 success: true,
99 value,
100 error: None,
101 applied_index,
102 }
103 }
104
105 pub fn error(message: impl Into<String>, applied_index: u64) -> Self {
106 Self {
107 success: false,
108 value: None,
109 error: Some(message.into()),
110 applied_index,
111 }
112 }
113}
114
115pub struct StateMachine {
121 data: RwLock<HashMap<String, Vec<u8>>>,
122 last_applied: RwLock<u64>,
123 version: RwLock<u64>,
124}
125
126impl StateMachine {
127 pub fn new() -> Self {
129 Self {
130 data: RwLock::new(HashMap::new()),
131 last_applied: RwLock::new(0),
132 version: RwLock::new(0),
133 }
134 }
135
136 pub fn apply(&self, command: &Command, index: u64) -> CommandResult {
138 let mut data = self.data.write().unwrap();
139 let mut last_applied = self.last_applied.write().unwrap();
140 let mut version = self.version.write().unwrap();
141
142 if index <= *last_applied {
143 return CommandResult::error("Already applied", *last_applied);
144 }
145
146 let result = match command.command_type {
147 CommandType::Get => {
148 let value = data.get(&command.key).cloned();
149 CommandResult::success(value, index)
150 }
151 CommandType::Set => {
152 if let Some(ref value) = command.value {
153 data.insert(command.key.clone(), value.clone());
154 *version += 1;
155 CommandResult::success(None, index)
156 } else {
157 CommandResult::error("No value provided", index)
158 }
159 }
160 CommandType::Delete => {
161 let old = data.remove(&command.key);
162 *version += 1;
163 CommandResult::success(old, index)
164 }
165 CommandType::CompareAndSwap => {
166 CommandResult::error("Not implemented", index)
167 }
168 CommandType::Increment => {
169 let current = data
170 .get(&command.key)
171 .and_then(|v| String::from_utf8(v.clone()).ok())
172 .and_then(|s| s.parse::<i64>().ok())
173 .unwrap_or(0);
174
175 let new_value = (current + 1).to_string().into_bytes();
176 data.insert(command.key.clone(), new_value.clone());
177 *version += 1;
178 CommandResult::success(Some(new_value), index)
179 }
180 CommandType::Custom => {
181 CommandResult::error("Custom commands not handled", index)
182 }
183 };
184
185 *last_applied = index;
186 result
187 }
188
189 pub fn get(&self, key: &str) -> Option<Vec<u8>> {
191 let data = self.data.read().unwrap();
192 data.get(key).cloned()
193 }
194
195 pub fn last_applied(&self) -> u64 {
197 *self.last_applied.read().unwrap()
198 }
199
200 pub fn version(&self) -> u64 {
202 *self.version.read().unwrap()
203 }
204
205 pub fn len(&self) -> usize {
207 let data = self.data.read().unwrap();
208 data.len()
209 }
210
211 pub fn is_empty(&self) -> bool {
213 self.len() == 0
214 }
215
216 pub fn snapshot(&self) -> Snapshot {
218 let data = self.data.read().unwrap();
219 let last_applied = *self.last_applied.read().unwrap();
220 let version = *self.version.read().unwrap();
221
222 Snapshot {
223 data: data.clone(),
224 last_applied,
225 version,
226 }
227 }
228
229 pub fn restore(&self, snapshot: Snapshot) {
231 let mut data = self.data.write().unwrap();
232 let mut last_applied = self.last_applied.write().unwrap();
233 let mut version = self.version.write().unwrap();
234
235 *data = snapshot.data;
236 *last_applied = snapshot.last_applied;
237 *version = snapshot.version;
238 }
239}
240
241impl Default for StateMachine {
242 fn default() -> Self {
243 Self::new()
244 }
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct Snapshot {
254 pub data: HashMap<String, Vec<u8>>,
255 pub last_applied: u64,
256 pub version: u64,
257}
258
259impl Snapshot {
260 pub fn to_bytes(&self) -> Vec<u8> {
262 serde_json::to_vec(self).unwrap_or_default()
263 }
264
265 pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
267 serde_json::from_slice(bytes).ok()
268 }
269}
270
271#[cfg(test)]
276mod tests {
277 use super::*;
278
279 #[test]
280 fn test_command() {
281 let cmd = Command::set("key1", b"value1".to_vec());
282 assert_eq!(cmd.command_type, CommandType::Set);
283 assert_eq!(cmd.key, "key1");
284
285 let bytes = cmd.to_bytes();
286 let restored = Command::from_bytes(&bytes).unwrap();
287 assert_eq!(restored.key, "key1");
288 }
289
290 #[test]
291 fn test_state_machine_set_get() {
292 let sm = StateMachine::new();
293
294 let cmd = Command::set("key1", b"value1".to_vec());
295 let result = sm.apply(&cmd, 1);
296 assert!(result.success);
297 assert_eq!(sm.last_applied(), 1);
298
299 let value = sm.get("key1").unwrap();
300 assert_eq!(value, b"value1");
301 }
302
303 #[test]
304 fn test_state_machine_delete() {
305 let sm = StateMachine::new();
306
307 sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
308 assert!(sm.get("key1").is_some());
309
310 sm.apply(&Command::delete("key1"), 2);
311 assert!(sm.get("key1").is_none());
312 }
313
314 #[test]
315 fn test_state_machine_increment() {
316 let sm = StateMachine::new();
317
318 sm.apply(&Command::set("counter", b"0".to_vec()), 1);
319
320 let cmd = Command {
321 command_type: CommandType::Increment,
322 key: "counter".to_string(),
323 value: None,
324 metadata: HashMap::new(),
325 };
326
327 sm.apply(&cmd, 2);
328 let value = sm.get("counter").unwrap();
329 assert_eq!(String::from_utf8(value).unwrap(), "1");
330 }
331
332 #[test]
333 fn test_snapshot() {
334 let sm = StateMachine::new();
335
336 sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
337 sm.apply(&Command::set("key2", b"value2".to_vec()), 2);
338
339 let snapshot = sm.snapshot();
340 assert_eq!(snapshot.last_applied, 2);
341 assert_eq!(snapshot.data.len(), 2);
342
343 let new_sm = StateMachine::new();
344 new_sm.restore(snapshot);
345
346 assert_eq!(new_sm.get("key1").unwrap(), b"value1");
347 assert_eq!(new_sm.get("key2").unwrap(), b"value2");
348 assert_eq!(new_sm.last_applied(), 2);
349 }
350
351 #[test]
352 fn test_duplicate_apply() {
353 let sm = StateMachine::new();
354
355 let result = sm.apply(&Command::set("key1", b"v1".to_vec()), 1);
356 assert!(result.success);
357
358 let result = sm.apply(&Command::set("key1", b"v2".to_vec()), 1);
359 assert!(!result.success);
360
361 assert_eq!(sm.get("key1").unwrap(), b"v1");
362 }
363}