Skip to main content

aegis_replication/
state.rs

1//! Aegis Replication State Machine
2//!
3//! State machine abstraction for replicated state with pluggable backends.
4//! Supports both key-value operations and full database operations (SQL, documents, graph).
5//!
6//! @version 0.1.0
7//! @author AutomataNexus Development Team
8
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::RwLock;
12
13// =============================================================================
14// State Machine Backend Trait
15// =============================================================================
16
17/// Trait for pluggable state machine backends.
18/// Implementations can delegate to in-memory storage, disk-based storage,
19/// or any other backend.
20pub trait StateMachineBackend: Send + Sync {
21    /// Apply a command to the state machine at the given log index.
22    fn apply(&self, command: &Command, index: u64) -> CommandResult;
23
24    /// Get a value by key.
25    fn get(&self, key: &str) -> Option<Vec<u8>>;
26
27    /// Get the last applied log index.
28    fn last_applied(&self) -> u64;
29
30    /// Get the current version/sequence number.
31    fn version(&self) -> u64;
32
33    /// Get the number of keys stored.
34    fn len(&self) -> usize;
35
36    /// Check if the state machine is empty.
37    fn is_empty(&self) -> bool {
38        self.len() == 0
39    }
40
41    /// Take a snapshot of the current state.
42    fn snapshot(&self) -> Snapshot;
43
44    /// Restore state from a snapshot.
45    fn restore(&self, snapshot: Snapshot);
46}
47
48// =============================================================================
49// Command
50// =============================================================================
51
52/// A command to be applied to the state machine.
53/// Supports both key-value and database operations.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct Command {
56    pub command_type: CommandType,
57    pub key: String,
58    pub value: Option<Vec<u8>>,
59    pub metadata: HashMap<String, String>,
60}
61
62impl Command {
63    /// Create a get command.
64    pub fn get(key: impl Into<String>) -> Self {
65        Self {
66            command_type: CommandType::Get,
67            key: key.into(),
68            value: None,
69            metadata: HashMap::new(),
70        }
71    }
72
73    /// Create a set command.
74    pub fn set(key: impl Into<String>, value: Vec<u8>) -> Self {
75        Self {
76            command_type: CommandType::Set,
77            key: key.into(),
78            value: Some(value),
79            metadata: HashMap::new(),
80        }
81    }
82
83    /// Create a delete command.
84    pub fn delete(key: impl Into<String>) -> Self {
85        Self {
86            command_type: CommandType::Delete,
87            key: key.into(),
88            value: None,
89            metadata: HashMap::new(),
90        }
91    }
92
93    /// Create a SQL execution command.
94    pub fn sql(statement: impl Into<String>) -> Self {
95        Self {
96            command_type: CommandType::SqlExecute,
97            key: String::new(),
98            value: Some(statement.into().into_bytes()),
99            metadata: HashMap::new(),
100        }
101    }
102
103    /// Create a document insert command.
104    pub fn document_insert(collection: impl Into<String>, document: Vec<u8>) -> Self {
105        Self {
106            command_type: CommandType::DocumentInsert,
107            key: collection.into(),
108            value: Some(document),
109            metadata: HashMap::new(),
110        }
111    }
112
113    /// Create a document update command.
114    pub fn document_update(collection: impl Into<String>, doc_id: impl Into<String>, document: Vec<u8>) -> Self {
115        let mut metadata = HashMap::new();
116        metadata.insert("doc_id".to_string(), doc_id.into());
117        Self {
118            command_type: CommandType::DocumentUpdate,
119            key: collection.into(),
120            value: Some(document),
121            metadata,
122        }
123    }
124
125    /// Create a document delete command.
126    pub fn document_delete(collection: impl Into<String>, doc_id: impl Into<String>) -> Self {
127        let mut metadata = HashMap::new();
128        metadata.insert("doc_id".to_string(), doc_id.into());
129        Self {
130            command_type: CommandType::DocumentDelete,
131            key: collection.into(),
132            value: None,
133            metadata,
134        }
135    }
136
137    /// Create a graph node create command.
138    pub fn graph_create_node(label: impl Into<String>, properties: Vec<u8>) -> Self {
139        Self {
140            command_type: CommandType::GraphCreateNode,
141            key: label.into(),
142            value: Some(properties),
143            metadata: HashMap::new(),
144        }
145    }
146
147    /// Create a graph edge create command.
148    pub fn graph_create_edge(edge_type: impl Into<String>, from_node: impl Into<String>, to_node: impl Into<String>, properties: Vec<u8>) -> Self {
149        let mut metadata = HashMap::new();
150        metadata.insert("from".to_string(), from_node.into());
151        metadata.insert("to".to_string(), to_node.into());
152        Self {
153            command_type: CommandType::GraphCreateEdge,
154            key: edge_type.into(),
155            value: Some(properties),
156            metadata,
157        }
158    }
159
160    /// Create a transaction begin command.
161    pub fn tx_begin(tx_id: u64) -> Self {
162        let mut metadata = HashMap::new();
163        metadata.insert("tx_id".to_string(), tx_id.to_string());
164        Self {
165            command_type: CommandType::TransactionBegin,
166            key: String::new(),
167            value: None,
168            metadata,
169        }
170    }
171
172    /// Create a transaction commit command.
173    pub fn tx_commit(tx_id: u64) -> Self {
174        let mut metadata = HashMap::new();
175        metadata.insert("tx_id".to_string(), tx_id.to_string());
176        Self {
177            command_type: CommandType::TransactionCommit,
178            key: String::new(),
179            value: None,
180            metadata,
181        }
182    }
183
184    /// Create a transaction rollback command.
185    pub fn tx_rollback(tx_id: u64) -> Self {
186        let mut metadata = HashMap::new();
187        metadata.insert("tx_id".to_string(), tx_id.to_string());
188        Self {
189            command_type: CommandType::TransactionRollback,
190            key: String::new(),
191            value: None,
192            metadata,
193        }
194    }
195
196    /// Serialize the command.
197    pub fn to_bytes(&self) -> Vec<u8> {
198        serde_json::to_vec(self).unwrap_or_default()
199    }
200
201    /// Deserialize a command.
202    pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
203        serde_json::from_slice(bytes).ok()
204    }
205
206    /// Check if this command is a write operation (modifies state).
207    pub fn is_write(&self) -> bool {
208        !matches!(self.command_type, CommandType::Get)
209    }
210
211    /// Get the SQL statement if this is a SQL command.
212    pub fn sql_statement(&self) -> Option<&str> {
213        if self.command_type == CommandType::SqlExecute {
214            self.value.as_ref().and_then(|v| std::str::from_utf8(v).ok())
215        } else {
216            None
217        }
218    }
219
220    /// Get the transaction ID if this is a transaction command.
221    pub fn transaction_id(&self) -> Option<u64> {
222        self.metadata.get("tx_id").and_then(|s| s.parse().ok())
223    }
224}
225
226// =============================================================================
227// Command Type
228// =============================================================================
229
230/// Type of command - supports key-value, SQL, document, graph, and transaction operations.
231#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
232pub enum CommandType {
233    // Key-value operations
234    Get,
235    Set,
236    Delete,
237    CompareAndSwap,
238    Increment,
239
240    // SQL operations
241    SqlExecute,
242
243    // Document operations
244    DocumentInsert,
245    DocumentUpdate,
246    DocumentDelete,
247
248    // Graph operations
249    GraphCreateNode,
250    GraphDeleteNode,
251    GraphCreateEdge,
252    GraphDeleteEdge,
253
254    // Transaction control
255    TransactionBegin,
256    TransactionCommit,
257    TransactionRollback,
258
259    // Generic custom operation
260    Custom,
261}
262
263// =============================================================================
264// Command Result
265// =============================================================================
266
267/// Result of applying a command.
268#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct CommandResult {
270    pub success: bool,
271    pub value: Option<Vec<u8>>,
272    pub error: Option<String>,
273    pub applied_index: u64,
274}
275
276impl CommandResult {
277    pub fn success(value: Option<Vec<u8>>, applied_index: u64) -> Self {
278        Self {
279            success: true,
280            value,
281            error: None,
282            applied_index,
283        }
284    }
285
286    pub fn error(message: impl Into<String>, applied_index: u64) -> Self {
287        Self {
288            success: false,
289            value: None,
290            error: Some(message.into()),
291            applied_index,
292        }
293    }
294}
295
296// =============================================================================
297// State Machine (In-Memory Implementation)
298// =============================================================================
299
300/// The replicated state machine with in-memory storage.
301/// For production use, consider using DatabaseStateMachine which persists to storage.
302pub struct StateMachine {
303    data: RwLock<HashMap<String, Vec<u8>>>,
304    last_applied: RwLock<u64>,
305    version: RwLock<u64>,
306}
307
308impl StateMachine {
309    /// Create a new in-memory state machine.
310    pub fn new() -> Self {
311        Self {
312            data: RwLock::new(HashMap::new()),
313            last_applied: RwLock::new(0),
314            version: RwLock::new(0),
315        }
316    }
317
318    fn apply_kv_command(&self, command: &Command, index: u64, data: &mut HashMap<String, Vec<u8>>, version: &mut u64) -> CommandResult {
319        match command.command_type {
320            CommandType::Get => {
321                let value = data.get(&command.key).cloned();
322                CommandResult::success(value, index)
323            }
324            CommandType::Set => {
325                if let Some(ref value) = command.value {
326                    data.insert(command.key.clone(), value.clone());
327                    *version += 1;
328                    CommandResult::success(None, index)
329                } else {
330                    CommandResult::error("No value provided", index)
331                }
332            }
333            CommandType::Delete => {
334                let old = data.remove(&command.key);
335                *version += 1;
336                CommandResult::success(old, index)
337            }
338            CommandType::CompareAndSwap => {
339                // Expected value is in metadata["expected"]
340                let expected = command.metadata.get("expected").map(|s| s.as_bytes().to_vec());
341                let current = data.get(&command.key).cloned();
342
343                if current == expected {
344                    if let Some(ref new_value) = command.value {
345                        data.insert(command.key.clone(), new_value.clone());
346                        *version += 1;
347                        CommandResult::success(Some(b"true".to_vec()), index)
348                    } else {
349                        CommandResult::error("No new value provided", index)
350                    }
351                } else {
352                    CommandResult::success(Some(b"false".to_vec()), index)
353                }
354            }
355            CommandType::Increment => {
356                let current = data
357                    .get(&command.key)
358                    .and_then(|v| String::from_utf8(v.clone()).ok())
359                    .and_then(|s| s.parse::<i64>().ok())
360                    .unwrap_or(0);
361
362                let new_value = (current + 1).to_string().into_bytes();
363                data.insert(command.key.clone(), new_value.clone());
364                *version += 1;
365                CommandResult::success(Some(new_value), index)
366            }
367            _ => CommandResult::error(
368                format!("Command type {:?} not supported by in-memory state machine", command.command_type),
369                index
370            ),
371        }
372    }
373}
374
375impl StateMachineBackend for StateMachine {
376    fn apply(&self, command: &Command, index: u64) -> CommandResult {
377        let mut data = self.data.write().expect("state machine data lock poisoned");
378        let mut last_applied = self.last_applied.write().expect("state machine last_applied lock poisoned");
379        let mut version = self.version.write().expect("state machine version lock poisoned");
380
381        if index <= *last_applied {
382            return CommandResult::error("Already applied", *last_applied);
383        }
384
385        let result = self.apply_kv_command(command, index, &mut data, &mut version);
386        *last_applied = index;
387        result
388    }
389
390    fn get(&self, key: &str) -> Option<Vec<u8>> {
391        let data = self.data.read().expect("state machine data lock poisoned");
392        data.get(key).cloned()
393    }
394
395    fn last_applied(&self) -> u64 {
396        *self.last_applied.read().expect("state machine last_applied lock poisoned")
397    }
398
399    fn version(&self) -> u64 {
400        *self.version.read().expect("state machine version lock poisoned")
401    }
402
403    fn len(&self) -> usize {
404        let data = self.data.read().expect("state machine data lock poisoned");
405        data.len()
406    }
407
408    fn snapshot(&self) -> Snapshot {
409        let data = self.data.read().expect("state machine data lock poisoned");
410        let last_applied = *self.last_applied.read().expect("state machine last_applied lock poisoned");
411        let version = *self.version.read().expect("state machine version lock poisoned");
412
413        Snapshot {
414            data: data.clone(),
415            last_applied,
416            version,
417        }
418    }
419
420    fn restore(&self, snapshot: Snapshot) {
421        let mut data = self.data.write().expect("state machine data lock poisoned");
422        let mut last_applied = self.last_applied.write().expect("state machine last_applied lock poisoned");
423        let mut version = self.version.write().expect("state machine version lock poisoned");
424
425        *data = snapshot.data;
426        *last_applied = snapshot.last_applied;
427        *version = snapshot.version;
428    }
429}
430
431impl Default for StateMachine {
432    fn default() -> Self {
433        Self::new()
434    }
435}
436
437// =============================================================================
438// Database State Machine (Storage-backed)
439// =============================================================================
440
441/// A state machine that delegates operations to the actual database storage.
442/// This allows Raft to replicate real database operations.
443pub struct DatabaseStateMachine<F: DatabaseOperationHandler> {
444    handler: F,
445    last_applied: RwLock<u64>,
446    version: RwLock<u64>,
447    /// Cached key-value data for snapshot/restore and simple KV operations
448    kv_cache: RwLock<HashMap<String, Vec<u8>>>,
449}
450
451/// Trait for handling database operations.
452/// Implement this to connect Raft replication to your storage layer.
453pub trait DatabaseOperationHandler: Send + Sync {
454    /// Execute a SQL statement.
455    fn execute_sql(&self, sql: &str) -> Result<Vec<u8>, String>;
456
457    /// Insert a document into a collection.
458    fn insert_document(&self, collection: &str, document: &[u8]) -> Result<String, String>;
459
460    /// Update a document in a collection.
461    fn update_document(&self, collection: &str, doc_id: &str, document: &[u8]) -> Result<(), String>;
462
463    /// Delete a document from a collection.
464    fn delete_document(&self, collection: &str, doc_id: &str) -> Result<(), String>;
465
466    /// Create a graph node.
467    fn create_node(&self, label: &str, properties: &[u8]) -> Result<String, String>;
468
469    /// Delete a graph node.
470    fn delete_node(&self, node_id: &str) -> Result<(), String>;
471
472    /// Create a graph edge.
473    fn create_edge(&self, edge_type: &str, from_node: &str, to_node: &str, properties: &[u8]) -> Result<String, String>;
474
475    /// Delete a graph edge.
476    fn delete_edge(&self, edge_id: &str) -> Result<(), String>;
477
478    /// Begin a transaction.
479    fn begin_transaction(&self, tx_id: u64) -> Result<(), String>;
480
481    /// Commit a transaction.
482    fn commit_transaction(&self, tx_id: u64) -> Result<(), String>;
483
484    /// Rollback a transaction.
485    fn rollback_transaction(&self, tx_id: u64) -> Result<(), String>;
486
487    /// Create a full snapshot of the database state.
488    fn create_snapshot(&self) -> Result<Vec<u8>, String>;
489
490    /// Restore from a snapshot.
491    fn restore_snapshot(&self, data: &[u8]) -> Result<(), String>;
492}
493
494impl<F: DatabaseOperationHandler> DatabaseStateMachine<F> {
495    /// Create a new database state machine with the given operation handler.
496    pub fn new(handler: F) -> Self {
497        Self {
498            handler,
499            last_applied: RwLock::new(0),
500            version: RwLock::new(0),
501            kv_cache: RwLock::new(HashMap::new()),
502        }
503    }
504
505    fn apply_database_command(&self, command: &Command, index: u64) -> CommandResult {
506        match command.command_type {
507            // Key-value operations use the cache
508            CommandType::Get => {
509                let cache = self.kv_cache.read().expect("database state machine kv_cache lock poisoned");
510                let value = cache.get(&command.key).cloned();
511                CommandResult::success(value, index)
512            }
513            CommandType::Set => {
514                if let Some(ref value) = command.value {
515                    let mut cache = self.kv_cache.write().expect("database state machine kv_cache lock poisoned");
516                    cache.insert(command.key.clone(), value.clone());
517                    CommandResult::success(None, index)
518                } else {
519                    CommandResult::error("No value provided", index)
520                }
521            }
522            CommandType::Delete => {
523                let mut cache = self.kv_cache.write().expect("database state machine kv_cache lock poisoned");
524                let old = cache.remove(&command.key);
525                CommandResult::success(old, index)
526            }
527
528            // SQL operations
529            CommandType::SqlExecute => {
530                if let Some(sql) = command.sql_statement() {
531                    match self.handler.execute_sql(sql) {
532                        Ok(result) => CommandResult::success(Some(result), index),
533                        Err(e) => CommandResult::error(e, index),
534                    }
535                } else {
536                    CommandResult::error("No SQL statement provided", index)
537                }
538            }
539
540            // Document operations
541            CommandType::DocumentInsert => {
542                if let Some(ref doc) = command.value {
543                    match self.handler.insert_document(&command.key, doc) {
544                        Ok(doc_id) => CommandResult::success(Some(doc_id.into_bytes()), index),
545                        Err(e) => CommandResult::error(e, index),
546                    }
547                } else {
548                    CommandResult::error("No document provided", index)
549                }
550            }
551            CommandType::DocumentUpdate => {
552                if let (Some(doc_id), Some(ref doc)) = (command.metadata.get("doc_id"), &command.value) {
553                    match self.handler.update_document(&command.key, doc_id, doc) {
554                        Ok(()) => CommandResult::success(None, index),
555                        Err(e) => CommandResult::error(e, index),
556                    }
557                } else {
558                    CommandResult::error("Missing doc_id or document", index)
559                }
560            }
561            CommandType::DocumentDelete => {
562                if let Some(doc_id) = command.metadata.get("doc_id") {
563                    match self.handler.delete_document(&command.key, doc_id) {
564                        Ok(()) => CommandResult::success(None, index),
565                        Err(e) => CommandResult::error(e, index),
566                    }
567                } else {
568                    CommandResult::error("Missing doc_id", index)
569                }
570            }
571
572            // Graph operations
573            CommandType::GraphCreateNode => {
574                if let Some(ref props) = command.value {
575                    match self.handler.create_node(&command.key, props) {
576                        Ok(node_id) => CommandResult::success(Some(node_id.into_bytes()), index),
577                        Err(e) => CommandResult::error(e, index),
578                    }
579                } else {
580                    CommandResult::error("No properties provided", index)
581                }
582            }
583            CommandType::GraphDeleteNode => {
584                match self.handler.delete_node(&command.key) {
585                    Ok(()) => CommandResult::success(None, index),
586                    Err(e) => CommandResult::error(e, index),
587                }
588            }
589            CommandType::GraphCreateEdge => {
590                if let (Some(from), Some(to), Some(ref props)) = (
591                    command.metadata.get("from"),
592                    command.metadata.get("to"),
593                    &command.value,
594                ) {
595                    match self.handler.create_edge(&command.key, from, to, props) {
596                        Ok(edge_id) => CommandResult::success(Some(edge_id.into_bytes()), index),
597                        Err(e) => CommandResult::error(e, index),
598                    }
599                } else {
600                    CommandResult::error("Missing from, to, or properties", index)
601                }
602            }
603            CommandType::GraphDeleteEdge => {
604                match self.handler.delete_edge(&command.key) {
605                    Ok(()) => CommandResult::success(None, index),
606                    Err(e) => CommandResult::error(e, index),
607                }
608            }
609
610            // Transaction operations
611            CommandType::TransactionBegin => {
612                if let Some(tx_id) = command.transaction_id() {
613                    match self.handler.begin_transaction(tx_id) {
614                        Ok(()) => CommandResult::success(None, index),
615                        Err(e) => CommandResult::error(e, index),
616                    }
617                } else {
618                    CommandResult::error("Missing transaction ID", index)
619                }
620            }
621            CommandType::TransactionCommit => {
622                if let Some(tx_id) = command.transaction_id() {
623                    match self.handler.commit_transaction(tx_id) {
624                        Ok(()) => CommandResult::success(None, index),
625                        Err(e) => CommandResult::error(e, index),
626                    }
627                } else {
628                    CommandResult::error("Missing transaction ID", index)
629                }
630            }
631            CommandType::TransactionRollback => {
632                if let Some(tx_id) = command.transaction_id() {
633                    match self.handler.rollback_transaction(tx_id) {
634                        Ok(()) => CommandResult::success(None, index),
635                        Err(e) => CommandResult::error(e, index),
636                    }
637                } else {
638                    CommandResult::error("Missing transaction ID", index)
639                }
640            }
641
642            // Other commands
643            CommandType::CompareAndSwap | CommandType::Increment => {
644                CommandResult::error("Use key-value state machine for these operations", index)
645            }
646            CommandType::Custom => {
647                CommandResult::error("Custom commands not handled", index)
648            }
649        }
650    }
651}
652
653impl<F: DatabaseOperationHandler> StateMachineBackend for DatabaseStateMachine<F> {
654    fn apply(&self, command: &Command, index: u64) -> CommandResult {
655        let mut last_applied = self.last_applied.write().expect("database state machine last_applied lock poisoned");
656        let mut version = self.version.write().expect("database state machine version lock poisoned");
657
658        if index <= *last_applied {
659            return CommandResult::error("Already applied", *last_applied);
660        }
661
662        let result = self.apply_database_command(command, index);
663
664        if result.success {
665            *version += 1;
666        }
667        *last_applied = index;
668
669        result
670    }
671
672    fn get(&self, key: &str) -> Option<Vec<u8>> {
673        let cache = self.kv_cache.read().expect("database state machine kv_cache lock poisoned");
674        cache.get(key).cloned()
675    }
676
677    fn last_applied(&self) -> u64 {
678        *self.last_applied.read().expect("database state machine last_applied lock poisoned")
679    }
680
681    fn version(&self) -> u64 {
682        *self.version.read().expect("database state machine version lock poisoned")
683    }
684
685    fn len(&self) -> usize {
686        let cache = self.kv_cache.read().expect("database state machine kv_cache lock poisoned");
687        cache.len()
688    }
689
690    fn snapshot(&self) -> Snapshot {
691        // For database state machine, snapshot includes both KV cache and database state
692        let cache = self.kv_cache.read().expect("database state machine kv_cache lock poisoned");
693        let last_applied = *self.last_applied.read().expect("database state machine last_applied lock poisoned");
694        let version = *self.version.read().expect("database state machine version lock poisoned");
695
696        // Try to include database snapshot in the data
697        let mut data = cache.clone();
698        if let Ok(db_snapshot) = self.handler.create_snapshot() {
699            data.insert("__db_snapshot__".to_string(), db_snapshot);
700        }
701
702        Snapshot {
703            data,
704            last_applied,
705            version,
706        }
707    }
708
709    fn restore(&self, snapshot: Snapshot) {
710        let mut cache = self.kv_cache.write().expect("database state machine kv_cache lock poisoned");
711        let mut last_applied = self.last_applied.write().expect("database state machine last_applied lock poisoned");
712        let mut version = self.version.write().expect("database state machine version lock poisoned");
713
714        // Restore database state if present
715        if let Some(db_snapshot) = snapshot.data.get("__db_snapshot__") {
716            let _ = self.handler.restore_snapshot(db_snapshot);
717        }
718
719        // Restore KV cache (excluding the special key)
720        *cache = snapshot.data.into_iter()
721            .filter(|(k, _)| k != "__db_snapshot__")
722            .collect();
723        *last_applied = snapshot.last_applied;
724        *version = snapshot.version;
725    }
726}
727
728// =============================================================================
729// No-op Database Handler (for testing)
730// =============================================================================
731
732/// A no-op handler that logs operations but doesn't persist anything.
733/// Useful for testing Raft replication without a real database.
734#[derive(Default)]
735pub struct NoOpDatabaseHandler;
736
737impl DatabaseOperationHandler for NoOpDatabaseHandler {
738    fn execute_sql(&self, _sql: &str) -> Result<Vec<u8>, String> {
739        Ok(b"OK".to_vec())
740    }
741
742    fn insert_document(&self, _collection: &str, _document: &[u8]) -> Result<String, String> {
743        Ok("doc-001".to_string())
744    }
745
746    fn update_document(&self, _collection: &str, _doc_id: &str, _document: &[u8]) -> Result<(), String> {
747        Ok(())
748    }
749
750    fn delete_document(&self, _collection: &str, _doc_id: &str) -> Result<(), String> {
751        Ok(())
752    }
753
754    fn create_node(&self, _label: &str, _properties: &[u8]) -> Result<String, String> {
755        Ok("node-001".to_string())
756    }
757
758    fn delete_node(&self, _node_id: &str) -> Result<(), String> {
759        Ok(())
760    }
761
762    fn create_edge(&self, _edge_type: &str, _from_node: &str, _to_node: &str, _properties: &[u8]) -> Result<String, String> {
763        Ok("edge-001".to_string())
764    }
765
766    fn delete_edge(&self, _edge_id: &str) -> Result<(), String> {
767        Ok(())
768    }
769
770    fn begin_transaction(&self, _tx_id: u64) -> Result<(), String> {
771        Ok(())
772    }
773
774    fn commit_transaction(&self, _tx_id: u64) -> Result<(), String> {
775        Ok(())
776    }
777
778    fn rollback_transaction(&self, _tx_id: u64) -> Result<(), String> {
779        Ok(())
780    }
781
782    fn create_snapshot(&self) -> Result<Vec<u8>, String> {
783        Ok(Vec::new())
784    }
785
786    fn restore_snapshot(&self, _data: &[u8]) -> Result<(), String> {
787        Ok(())
788    }
789}
790
791// =============================================================================
792// Snapshot
793// =============================================================================
794
795/// A snapshot of the state machine.
796#[derive(Debug, Clone, Serialize, Deserialize)]
797pub struct Snapshot {
798    pub data: HashMap<String, Vec<u8>>,
799    pub last_applied: u64,
800    pub version: u64,
801}
802
803impl Snapshot {
804    /// Serialize the snapshot.
805    pub fn to_bytes(&self) -> Vec<u8> {
806        serde_json::to_vec(self).unwrap_or_default()
807    }
808
809    /// Deserialize a snapshot.
810    pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
811        serde_json::from_slice(bytes).ok()
812    }
813}
814
815// =============================================================================
816// Tests
817// =============================================================================
818
819#[cfg(test)]
820mod tests {
821    use super::*;
822
823    #[test]
824    fn test_command() {
825        let cmd = Command::set("key1", b"value1".to_vec());
826        assert_eq!(cmd.command_type, CommandType::Set);
827        assert_eq!(cmd.key, "key1");
828
829        let bytes = cmd.to_bytes();
830        let restored = Command::from_bytes(&bytes).unwrap();
831        assert_eq!(restored.key, "key1");
832    }
833
834    #[test]
835    fn test_state_machine_set_get() {
836        let sm = StateMachine::new();
837
838        let cmd = Command::set("key1", b"value1".to_vec());
839        let result = sm.apply(&cmd, 1);
840        assert!(result.success);
841        assert_eq!(sm.last_applied(), 1);
842
843        let value = sm.get("key1").unwrap();
844        assert_eq!(value, b"value1");
845    }
846
847    #[test]
848    fn test_state_machine_delete() {
849        let sm = StateMachine::new();
850
851        sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
852        assert!(sm.get("key1").is_some());
853
854        sm.apply(&Command::delete("key1"), 2);
855        assert!(sm.get("key1").is_none());
856    }
857
858    #[test]
859    fn test_state_machine_increment() {
860        let sm = StateMachine::new();
861
862        sm.apply(&Command::set("counter", b"0".to_vec()), 1);
863
864        let cmd = Command {
865            command_type: CommandType::Increment,
866            key: "counter".to_string(),
867            value: None,
868            metadata: HashMap::new(),
869        };
870
871        sm.apply(&cmd, 2);
872        let value = sm.get("counter").unwrap();
873        assert_eq!(String::from_utf8(value).unwrap(), "1");
874    }
875
876    #[test]
877    fn test_snapshot() {
878        let sm = StateMachine::new();
879
880        sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
881        sm.apply(&Command::set("key2", b"value2".to_vec()), 2);
882
883        let snapshot = sm.snapshot();
884        assert_eq!(snapshot.last_applied, 2);
885        assert_eq!(snapshot.data.len(), 2);
886
887        let new_sm = StateMachine::new();
888        new_sm.restore(snapshot);
889
890        assert_eq!(new_sm.get("key1").unwrap(), b"value1");
891        assert_eq!(new_sm.get("key2").unwrap(), b"value2");
892        assert_eq!(new_sm.last_applied(), 2);
893    }
894
895    #[test]
896    fn test_duplicate_apply() {
897        let sm = StateMachine::new();
898
899        let result = sm.apply(&Command::set("key1", b"v1".to_vec()), 1);
900        assert!(result.success);
901
902        let result = sm.apply(&Command::set("key1", b"v2".to_vec()), 1);
903        assert!(!result.success);
904
905        assert_eq!(sm.get("key1").unwrap(), b"v1");
906    }
907
908    #[test]
909    fn test_sql_command() {
910        let cmd = Command::sql("INSERT INTO users (name) VALUES ('test')");
911        assert_eq!(cmd.command_type, CommandType::SqlExecute);
912        assert_eq!(cmd.sql_statement(), Some("INSERT INTO users (name) VALUES ('test')"));
913        assert!(cmd.is_write());
914    }
915
916    #[test]
917    fn test_document_commands() {
918        let insert = Command::document_insert("users", b"{\"name\": \"test\"}".to_vec());
919        assert_eq!(insert.command_type, CommandType::DocumentInsert);
920        assert_eq!(insert.key, "users");
921
922        let update = Command::document_update("users", "doc123", b"{\"name\": \"updated\"}".to_vec());
923        assert_eq!(update.command_type, CommandType::DocumentUpdate);
924        assert_eq!(update.metadata.get("doc_id"), Some(&"doc123".to_string()));
925
926        let delete = Command::document_delete("users", "doc123");
927        assert_eq!(delete.command_type, CommandType::DocumentDelete);
928    }
929
930    #[test]
931    fn test_graph_commands() {
932        let node = Command::graph_create_node("Person", b"{\"name\": \"Alice\"}".to_vec());
933        assert_eq!(node.command_type, CommandType::GraphCreateNode);
934        assert_eq!(node.key, "Person");
935
936        let edge = Command::graph_create_edge("KNOWS", "node1", "node2", b"{}".to_vec());
937        assert_eq!(edge.command_type, CommandType::GraphCreateEdge);
938        assert_eq!(edge.metadata.get("from"), Some(&"node1".to_string()));
939        assert_eq!(edge.metadata.get("to"), Some(&"node2".to_string()));
940    }
941
942    #[test]
943    fn test_transaction_commands() {
944        let begin = Command::tx_begin(123);
945        assert_eq!(begin.command_type, CommandType::TransactionBegin);
946        assert_eq!(begin.transaction_id(), Some(123));
947
948        let commit = Command::tx_commit(123);
949        assert_eq!(commit.command_type, CommandType::TransactionCommit);
950
951        let rollback = Command::tx_rollback(123);
952        assert_eq!(rollback.command_type, CommandType::TransactionRollback);
953    }
954
955    #[test]
956    fn test_database_state_machine() {
957        let handler = NoOpDatabaseHandler;
958        let sm = DatabaseStateMachine::new(handler);
959
960        // SQL command
961        let sql_cmd = Command::sql("INSERT INTO test VALUES (1)");
962        let result = sm.apply(&sql_cmd, 1);
963        assert!(result.success);
964        assert_eq!(result.value, Some(b"OK".to_vec()));
965
966        // Document insert
967        let doc_cmd = Command::document_insert("users", b"{}".to_vec());
968        let result = sm.apply(&doc_cmd, 2);
969        assert!(result.success);
970        assert_eq!(result.value, Some(b"doc-001".to_vec()));
971
972        // Transaction
973        let tx_begin = Command::tx_begin(1);
974        let result = sm.apply(&tx_begin, 3);
975        assert!(result.success);
976
977        let tx_commit = Command::tx_commit(1);
978        let result = sm.apply(&tx_commit, 4);
979        assert!(result.success);
980
981        assert_eq!(sm.last_applied(), 4);
982    }
983
984    #[test]
985    fn test_database_state_machine_snapshot() {
986        let handler = NoOpDatabaseHandler;
987        let sm = DatabaseStateMachine::new(handler);
988
989        // Add some KV data
990        sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
991        sm.apply(&Command::set("key2", b"value2".to_vec()), 2);
992
993        let snapshot = sm.snapshot();
994        assert_eq!(snapshot.last_applied, 2);
995
996        // Restore to new state machine
997        let handler2 = NoOpDatabaseHandler;
998        let sm2 = DatabaseStateMachine::new(handler2);
999        sm2.restore(snapshot);
1000
1001        assert_eq!(sm2.get("key1"), Some(b"value1".to_vec()));
1002        assert_eq!(sm2.get("key2"), Some(b"value2".to_vec()));
1003        assert_eq!(sm2.last_applied(), 2);
1004    }
1005
1006    #[test]
1007    fn test_compare_and_swap() {
1008        let sm = StateMachine::new();
1009
1010        // Set initial value
1011        sm.apply(&Command::set("key1", b"old".to_vec()), 1);
1012
1013        // CAS with correct expected value
1014        let mut cmd = Command {
1015            command_type: CommandType::CompareAndSwap,
1016            key: "key1".to_string(),
1017            value: Some(b"new".to_vec()),
1018            metadata: HashMap::new(),
1019        };
1020        cmd.metadata.insert("expected".to_string(), "old".to_string());
1021
1022        let result = sm.apply(&cmd, 2);
1023        assert!(result.success);
1024        assert_eq!(result.value, Some(b"true".to_vec()));
1025        assert_eq!(sm.get("key1"), Some(b"new".to_vec()));
1026
1027        // CAS with wrong expected value
1028        cmd.metadata.insert("expected".to_string(), "wrong".to_string());
1029        cmd.value = Some(b"newer".to_vec());
1030        let result = sm.apply(&cmd, 3);
1031        assert!(result.success);
1032        assert_eq!(result.value, Some(b"false".to_vec()));
1033        assert_eq!(sm.get("key1"), Some(b"new".to_vec())); // Unchanged
1034    }
1035}