Skip to main content

aegis_replication/
state.rs

1//! Aegis Replication State Machine
2//!
3//! State machine abstraction for replicated state with pluggable backends.
4//! Supports both key-value operations and full database operations (SQL, documents, graph).
5//!
6//! @version 0.1.0
7//! @author AutomataNexus Development Team
8
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::RwLock;
12
13// =============================================================================
14// State Machine Backend Trait
15// =============================================================================
16
17/// Trait for pluggable state machine backends.
18/// Implementations can delegate to in-memory storage, disk-based storage,
19/// or any other backend.
20pub trait StateMachineBackend: Send + Sync {
21    /// Apply a command to the state machine at the given log index.
22    fn apply(&self, command: &Command, index: u64) -> CommandResult;
23
24    /// Get a value by key.
25    fn get(&self, key: &str) -> Option<Vec<u8>>;
26
27    /// Get the last applied log index.
28    fn last_applied(&self) -> u64;
29
30    /// Get the current version/sequence number.
31    fn version(&self) -> u64;
32
33    /// Get the number of keys stored.
34    fn len(&self) -> usize;
35
36    /// Check if the state machine is empty.
37    fn is_empty(&self) -> bool {
38        self.len() == 0
39    }
40
41    /// Take a snapshot of the current state.
42    fn snapshot(&self) -> Snapshot;
43
44    /// Restore state from a snapshot.
45    fn restore(&self, snapshot: Snapshot);
46}
47
48// =============================================================================
49// Command
50// =============================================================================
51
52/// A command to be applied to the state machine.
53/// Supports both key-value and database operations.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct Command {
56    pub command_type: CommandType,
57    pub key: String,
58    pub value: Option<Vec<u8>>,
59    pub metadata: HashMap<String, String>,
60}
61
62impl Command {
63    /// Create a get command.
64    pub fn get(key: impl Into<String>) -> Self {
65        Self {
66            command_type: CommandType::Get,
67            key: key.into(),
68            value: None,
69            metadata: HashMap::new(),
70        }
71    }
72
73    /// Create a set command.
74    pub fn set(key: impl Into<String>, value: Vec<u8>) -> Self {
75        Self {
76            command_type: CommandType::Set,
77            key: key.into(),
78            value: Some(value),
79            metadata: HashMap::new(),
80        }
81    }
82
83    /// Create a delete command.
84    pub fn delete(key: impl Into<String>) -> Self {
85        Self {
86            command_type: CommandType::Delete,
87            key: key.into(),
88            value: None,
89            metadata: HashMap::new(),
90        }
91    }
92
93    /// Create a SQL execution command.
94    pub fn sql(statement: impl Into<String>) -> Self {
95        Self {
96            command_type: CommandType::SqlExecute,
97            key: String::new(),
98            value: Some(statement.into().into_bytes()),
99            metadata: HashMap::new(),
100        }
101    }
102
103    /// Create a document insert command.
104    pub fn document_insert(collection: impl Into<String>, document: Vec<u8>) -> Self {
105        Self {
106            command_type: CommandType::DocumentInsert,
107            key: collection.into(),
108            value: Some(document),
109            metadata: HashMap::new(),
110        }
111    }
112
113    /// Create a document update command.
114    pub fn document_update(
115        collection: impl Into<String>,
116        doc_id: impl Into<String>,
117        document: Vec<u8>,
118    ) -> Self {
119        let mut metadata = HashMap::new();
120        metadata.insert("doc_id".to_string(), doc_id.into());
121        Self {
122            command_type: CommandType::DocumentUpdate,
123            key: collection.into(),
124            value: Some(document),
125            metadata,
126        }
127    }
128
129    /// Create a document delete command.
130    pub fn document_delete(collection: impl Into<String>, doc_id: impl Into<String>) -> Self {
131        let mut metadata = HashMap::new();
132        metadata.insert("doc_id".to_string(), doc_id.into());
133        Self {
134            command_type: CommandType::DocumentDelete,
135            key: collection.into(),
136            value: None,
137            metadata,
138        }
139    }
140
141    /// Create a graph node create command.
142    pub fn graph_create_node(label: impl Into<String>, properties: Vec<u8>) -> Self {
143        Self {
144            command_type: CommandType::GraphCreateNode,
145            key: label.into(),
146            value: Some(properties),
147            metadata: HashMap::new(),
148        }
149    }
150
151    /// Create a graph edge create command.
152    pub fn graph_create_edge(
153        edge_type: impl Into<String>,
154        from_node: impl Into<String>,
155        to_node: impl Into<String>,
156        properties: Vec<u8>,
157    ) -> Self {
158        let mut metadata = HashMap::new();
159        metadata.insert("from".to_string(), from_node.into());
160        metadata.insert("to".to_string(), to_node.into());
161        Self {
162            command_type: CommandType::GraphCreateEdge,
163            key: edge_type.into(),
164            value: Some(properties),
165            metadata,
166        }
167    }
168
169    /// Create a transaction begin command.
170    pub fn tx_begin(tx_id: u64) -> Self {
171        let mut metadata = HashMap::new();
172        metadata.insert("tx_id".to_string(), tx_id.to_string());
173        Self {
174            command_type: CommandType::TransactionBegin,
175            key: String::new(),
176            value: None,
177            metadata,
178        }
179    }
180
181    /// Create a transaction commit command.
182    pub fn tx_commit(tx_id: u64) -> Self {
183        let mut metadata = HashMap::new();
184        metadata.insert("tx_id".to_string(), tx_id.to_string());
185        Self {
186            command_type: CommandType::TransactionCommit,
187            key: String::new(),
188            value: None,
189            metadata,
190        }
191    }
192
193    /// Create a transaction rollback command.
194    pub fn tx_rollback(tx_id: u64) -> Self {
195        let mut metadata = HashMap::new();
196        metadata.insert("tx_id".to_string(), tx_id.to_string());
197        Self {
198            command_type: CommandType::TransactionRollback,
199            key: String::new(),
200            value: None,
201            metadata,
202        }
203    }
204
205    /// Serialize the command.
206    pub fn to_bytes(&self) -> Vec<u8> {
207        serde_json::to_vec(self).unwrap_or_default()
208    }
209
210    /// Deserialize a command.
211    pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
212        serde_json::from_slice(bytes).ok()
213    }
214
215    /// Check if this command is a write operation (modifies state).
216    pub fn is_write(&self) -> bool {
217        !matches!(self.command_type, CommandType::Get)
218    }
219
220    /// Get the SQL statement if this is a SQL command.
221    pub fn sql_statement(&self) -> Option<&str> {
222        if self.command_type == CommandType::SqlExecute {
223            self.value
224                .as_ref()
225                .and_then(|v| std::str::from_utf8(v).ok())
226        } else {
227            None
228        }
229    }
230
231    /// Get the transaction ID if this is a transaction command.
232    pub fn transaction_id(&self) -> Option<u64> {
233        self.metadata.get("tx_id").and_then(|s| s.parse().ok())
234    }
235}
236
237// =============================================================================
238// Command Type
239// =============================================================================
240
241/// Type of command - supports key-value, SQL, document, graph, and transaction operations.
242#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
243pub enum CommandType {
244    // Key-value operations
245    Get,
246    Set,
247    Delete,
248    CompareAndSwap,
249    Increment,
250
251    // SQL operations
252    SqlExecute,
253
254    // Document operations
255    DocumentInsert,
256    DocumentUpdate,
257    DocumentDelete,
258
259    // Graph operations
260    GraphCreateNode,
261    GraphDeleteNode,
262    GraphCreateEdge,
263    GraphDeleteEdge,
264
265    // Transaction control
266    TransactionBegin,
267    TransactionCommit,
268    TransactionRollback,
269
270    // Generic custom operation
271    Custom,
272}
273
274// =============================================================================
275// Command Result
276// =============================================================================
277
278/// Result of applying a command.
279#[derive(Debug, Clone, Serialize, Deserialize)]
280pub struct CommandResult {
281    pub success: bool,
282    pub value: Option<Vec<u8>>,
283    pub error: Option<String>,
284    pub applied_index: u64,
285}
286
287impl CommandResult {
288    pub fn success(value: Option<Vec<u8>>, applied_index: u64) -> Self {
289        Self {
290            success: true,
291            value,
292            error: None,
293            applied_index,
294        }
295    }
296
297    pub fn error(message: impl Into<String>, applied_index: u64) -> Self {
298        Self {
299            success: false,
300            value: None,
301            error: Some(message.into()),
302            applied_index,
303        }
304    }
305}
306
307// =============================================================================
308// State Machine (In-Memory Implementation)
309// =============================================================================
310
311/// The replicated state machine with in-memory storage.
312/// For production use, consider using DatabaseStateMachine which persists to storage.
313pub struct StateMachine {
314    data: RwLock<HashMap<String, Vec<u8>>>,
315    last_applied: RwLock<u64>,
316    version: RwLock<u64>,
317}
318
319impl StateMachine {
320    /// Create a new in-memory state machine.
321    pub fn new() -> Self {
322        Self {
323            data: RwLock::new(HashMap::new()),
324            last_applied: RwLock::new(0),
325            version: RwLock::new(0),
326        }
327    }
328
329    fn apply_kv_command(
330        &self,
331        command: &Command,
332        index: u64,
333        data: &mut HashMap<String, Vec<u8>>,
334        version: &mut u64,
335    ) -> CommandResult {
336        match command.command_type {
337            CommandType::Get => {
338                let value = data.get(&command.key).cloned();
339                CommandResult::success(value, index)
340            }
341            CommandType::Set => {
342                if let Some(ref value) = command.value {
343                    data.insert(command.key.clone(), value.clone());
344                    *version += 1;
345                    CommandResult::success(None, index)
346                } else {
347                    CommandResult::error("No value provided", index)
348                }
349            }
350            CommandType::Delete => {
351                let old = data.remove(&command.key);
352                *version += 1;
353                CommandResult::success(old, index)
354            }
355            CommandType::CompareAndSwap => {
356                // Expected value is in metadata["expected"]
357                let expected = command
358                    .metadata
359                    .get("expected")
360                    .map(|s| s.as_bytes().to_vec());
361                let current = data.get(&command.key).cloned();
362
363                if current == expected {
364                    if let Some(ref new_value) = command.value {
365                        data.insert(command.key.clone(), new_value.clone());
366                        *version += 1;
367                        CommandResult::success(Some(b"true".to_vec()), index)
368                    } else {
369                        CommandResult::error("No new value provided", index)
370                    }
371                } else {
372                    CommandResult::success(Some(b"false".to_vec()), index)
373                }
374            }
375            CommandType::Increment => {
376                let current = data
377                    .get(&command.key)
378                    .and_then(|v| String::from_utf8(v.clone()).ok())
379                    .and_then(|s| s.parse::<i64>().ok())
380                    .unwrap_or(0);
381
382                let new_value = (current + 1).to_string().into_bytes();
383                data.insert(command.key.clone(), new_value.clone());
384                *version += 1;
385                CommandResult::success(Some(new_value), index)
386            }
387            _ => CommandResult::error(
388                format!(
389                    "Command type {:?} not supported by in-memory state machine",
390                    command.command_type
391                ),
392                index,
393            ),
394        }
395    }
396}
397
398impl StateMachineBackend for StateMachine {
399    fn apply(&self, command: &Command, index: u64) -> CommandResult {
400        let mut data = self.data.write().expect("state machine data lock poisoned");
401        let mut last_applied = self
402            .last_applied
403            .write()
404            .expect("state machine last_applied lock poisoned");
405        let mut version = self
406            .version
407            .write()
408            .expect("state machine version lock poisoned");
409
410        if index <= *last_applied {
411            return CommandResult::error("Already applied", *last_applied);
412        }
413
414        let result = self.apply_kv_command(command, index, &mut data, &mut version);
415        *last_applied = index;
416        result
417    }
418
419    fn get(&self, key: &str) -> Option<Vec<u8>> {
420        let data = self.data.read().expect("state machine data lock poisoned");
421        data.get(key).cloned()
422    }
423
424    fn last_applied(&self) -> u64 {
425        *self
426            .last_applied
427            .read()
428            .expect("state machine last_applied lock poisoned")
429    }
430
431    fn version(&self) -> u64 {
432        *self
433            .version
434            .read()
435            .expect("state machine version lock poisoned")
436    }
437
438    fn len(&self) -> usize {
439        let data = self.data.read().expect("state machine data lock poisoned");
440        data.len()
441    }
442
443    fn snapshot(&self) -> Snapshot {
444        let data = self.data.read().expect("state machine data lock poisoned");
445        let last_applied = *self
446            .last_applied
447            .read()
448            .expect("state machine last_applied lock poisoned");
449        let version = *self
450            .version
451            .read()
452            .expect("state machine version lock poisoned");
453
454        Snapshot {
455            data: data.clone(),
456            last_applied,
457            version,
458        }
459    }
460
461    fn restore(&self, snapshot: Snapshot) {
462        let mut data = self.data.write().expect("state machine data lock poisoned");
463        let mut last_applied = self
464            .last_applied
465            .write()
466            .expect("state machine last_applied lock poisoned");
467        let mut version = self
468            .version
469            .write()
470            .expect("state machine version lock poisoned");
471
472        *data = snapshot.data;
473        *last_applied = snapshot.last_applied;
474        *version = snapshot.version;
475    }
476}
477
478impl Default for StateMachine {
479    fn default() -> Self {
480        Self::new()
481    }
482}
483
484// =============================================================================
485// Database State Machine (Storage-backed)
486// =============================================================================
487
488/// A state machine that delegates operations to the actual database storage.
489/// This allows Raft to replicate real database operations.
490pub struct DatabaseStateMachine<F: DatabaseOperationHandler> {
491    handler: F,
492    last_applied: RwLock<u64>,
493    version: RwLock<u64>,
494    /// Cached key-value data for snapshot/restore and simple KV operations
495    kv_cache: RwLock<HashMap<String, Vec<u8>>>,
496}
497
498/// Trait for handling database operations.
499/// Implement this to connect Raft replication to your storage layer.
500pub trait DatabaseOperationHandler: Send + Sync {
501    /// Execute a SQL statement.
502    fn execute_sql(&self, sql: &str) -> Result<Vec<u8>, String>;
503
504    /// Insert a document into a collection.
505    fn insert_document(&self, collection: &str, document: &[u8]) -> Result<String, String>;
506
507    /// Update a document in a collection.
508    fn update_document(
509        &self,
510        collection: &str,
511        doc_id: &str,
512        document: &[u8],
513    ) -> Result<(), String>;
514
515    /// Delete a document from a collection.
516    fn delete_document(&self, collection: &str, doc_id: &str) -> Result<(), String>;
517
518    /// Create a graph node.
519    fn create_node(&self, label: &str, properties: &[u8]) -> Result<String, String>;
520
521    /// Delete a graph node.
522    fn delete_node(&self, node_id: &str) -> Result<(), String>;
523
524    /// Create a graph edge.
525    fn create_edge(
526        &self,
527        edge_type: &str,
528        from_node: &str,
529        to_node: &str,
530        properties: &[u8],
531    ) -> Result<String, String>;
532
533    /// Delete a graph edge.
534    fn delete_edge(&self, edge_id: &str) -> Result<(), String>;
535
536    /// Begin a transaction.
537    fn begin_transaction(&self, tx_id: u64) -> Result<(), String>;
538
539    /// Commit a transaction.
540    fn commit_transaction(&self, tx_id: u64) -> Result<(), String>;
541
542    /// Rollback a transaction.
543    fn rollback_transaction(&self, tx_id: u64) -> Result<(), String>;
544
545    /// Create a full snapshot of the database state.
546    fn create_snapshot(&self) -> Result<Vec<u8>, String>;
547
548    /// Restore from a snapshot.
549    fn restore_snapshot(&self, data: &[u8]) -> Result<(), String>;
550}
551
552impl<F: DatabaseOperationHandler> DatabaseStateMachine<F> {
553    /// Create a new database state machine with the given operation handler.
554    pub fn new(handler: F) -> Self {
555        Self {
556            handler,
557            last_applied: RwLock::new(0),
558            version: RwLock::new(0),
559            kv_cache: RwLock::new(HashMap::new()),
560        }
561    }
562
563    fn apply_database_command(&self, command: &Command, index: u64) -> CommandResult {
564        match command.command_type {
565            // Key-value operations use the cache
566            CommandType::Get => {
567                let cache = self
568                    .kv_cache
569                    .read()
570                    .expect("database state machine kv_cache lock poisoned");
571                let value = cache.get(&command.key).cloned();
572                CommandResult::success(value, index)
573            }
574            CommandType::Set => {
575                if let Some(ref value) = command.value {
576                    let mut cache = self
577                        .kv_cache
578                        .write()
579                        .expect("database state machine kv_cache lock poisoned");
580                    cache.insert(command.key.clone(), value.clone());
581                    CommandResult::success(None, index)
582                } else {
583                    CommandResult::error("No value provided", index)
584                }
585            }
586            CommandType::Delete => {
587                let mut cache = self
588                    .kv_cache
589                    .write()
590                    .expect("database state machine kv_cache lock poisoned");
591                let old = cache.remove(&command.key);
592                CommandResult::success(old, index)
593            }
594
595            // SQL operations
596            CommandType::SqlExecute => {
597                if let Some(sql) = command.sql_statement() {
598                    match self.handler.execute_sql(sql) {
599                        Ok(result) => CommandResult::success(Some(result), index),
600                        Err(e) => CommandResult::error(e, index),
601                    }
602                } else {
603                    CommandResult::error("No SQL statement provided", index)
604                }
605            }
606
607            // Document operations
608            CommandType::DocumentInsert => {
609                if let Some(ref doc) = command.value {
610                    match self.handler.insert_document(&command.key, doc) {
611                        Ok(doc_id) => CommandResult::success(Some(doc_id.into_bytes()), index),
612                        Err(e) => CommandResult::error(e, index),
613                    }
614                } else {
615                    CommandResult::error("No document provided", index)
616                }
617            }
618            CommandType::DocumentUpdate => {
619                if let (Some(doc_id), Some(ref doc)) =
620                    (command.metadata.get("doc_id"), &command.value)
621                {
622                    match self.handler.update_document(&command.key, doc_id, doc) {
623                        Ok(()) => CommandResult::success(None, index),
624                        Err(e) => CommandResult::error(e, index),
625                    }
626                } else {
627                    CommandResult::error("Missing doc_id or document", index)
628                }
629            }
630            CommandType::DocumentDelete => {
631                if let Some(doc_id) = command.metadata.get("doc_id") {
632                    match self.handler.delete_document(&command.key, doc_id) {
633                        Ok(()) => CommandResult::success(None, index),
634                        Err(e) => CommandResult::error(e, index),
635                    }
636                } else {
637                    CommandResult::error("Missing doc_id", index)
638                }
639            }
640
641            // Graph operations
642            CommandType::GraphCreateNode => {
643                if let Some(ref props) = command.value {
644                    match self.handler.create_node(&command.key, props) {
645                        Ok(node_id) => CommandResult::success(Some(node_id.into_bytes()), index),
646                        Err(e) => CommandResult::error(e, index),
647                    }
648                } else {
649                    CommandResult::error("No properties provided", index)
650                }
651            }
652            CommandType::GraphDeleteNode => match self.handler.delete_node(&command.key) {
653                Ok(()) => CommandResult::success(None, index),
654                Err(e) => CommandResult::error(e, index),
655            },
656            CommandType::GraphCreateEdge => {
657                if let (Some(from), Some(to), Some(ref props)) = (
658                    command.metadata.get("from"),
659                    command.metadata.get("to"),
660                    &command.value,
661                ) {
662                    match self.handler.create_edge(&command.key, from, to, props) {
663                        Ok(edge_id) => CommandResult::success(Some(edge_id.into_bytes()), index),
664                        Err(e) => CommandResult::error(e, index),
665                    }
666                } else {
667                    CommandResult::error("Missing from, to, or properties", index)
668                }
669            }
670            CommandType::GraphDeleteEdge => match self.handler.delete_edge(&command.key) {
671                Ok(()) => CommandResult::success(None, index),
672                Err(e) => CommandResult::error(e, index),
673            },
674
675            // Transaction operations
676            CommandType::TransactionBegin => {
677                if let Some(tx_id) = command.transaction_id() {
678                    match self.handler.begin_transaction(tx_id) {
679                        Ok(()) => CommandResult::success(None, index),
680                        Err(e) => CommandResult::error(e, index),
681                    }
682                } else {
683                    CommandResult::error("Missing transaction ID", index)
684                }
685            }
686            CommandType::TransactionCommit => {
687                if let Some(tx_id) = command.transaction_id() {
688                    match self.handler.commit_transaction(tx_id) {
689                        Ok(()) => CommandResult::success(None, index),
690                        Err(e) => CommandResult::error(e, index),
691                    }
692                } else {
693                    CommandResult::error("Missing transaction ID", index)
694                }
695            }
696            CommandType::TransactionRollback => {
697                if let Some(tx_id) = command.transaction_id() {
698                    match self.handler.rollback_transaction(tx_id) {
699                        Ok(()) => CommandResult::success(None, index),
700                        Err(e) => CommandResult::error(e, index),
701                    }
702                } else {
703                    CommandResult::error("Missing transaction ID", index)
704                }
705            }
706
707            // Other commands
708            CommandType::CompareAndSwap | CommandType::Increment => {
709                CommandResult::error("Use key-value state machine for these operations", index)
710            }
711            CommandType::Custom => CommandResult::error("Custom commands not handled", index),
712        }
713    }
714}
715
716impl<F: DatabaseOperationHandler> StateMachineBackend for DatabaseStateMachine<F> {
717    fn apply(&self, command: &Command, index: u64) -> CommandResult {
718        let mut last_applied = self
719            .last_applied
720            .write()
721            .expect("database state machine last_applied lock poisoned");
722        let mut version = self
723            .version
724            .write()
725            .expect("database state machine version lock poisoned");
726
727        if index <= *last_applied {
728            return CommandResult::error("Already applied", *last_applied);
729        }
730
731        let result = self.apply_database_command(command, index);
732
733        if result.success {
734            *version += 1;
735        }
736        *last_applied = index;
737
738        result
739    }
740
741    fn get(&self, key: &str) -> Option<Vec<u8>> {
742        let cache = self
743            .kv_cache
744            .read()
745            .expect("database state machine kv_cache lock poisoned");
746        cache.get(key).cloned()
747    }
748
749    fn last_applied(&self) -> u64 {
750        *self
751            .last_applied
752            .read()
753            .expect("database state machine last_applied lock poisoned")
754    }
755
756    fn version(&self) -> u64 {
757        *self
758            .version
759            .read()
760            .expect("database state machine version lock poisoned")
761    }
762
763    fn len(&self) -> usize {
764        let cache = self
765            .kv_cache
766            .read()
767            .expect("database state machine kv_cache lock poisoned");
768        cache.len()
769    }
770
771    fn snapshot(&self) -> Snapshot {
772        // For database state machine, snapshot includes both KV cache and database state
773        let cache = self
774            .kv_cache
775            .read()
776            .expect("database state machine kv_cache lock poisoned");
777        let last_applied = *self
778            .last_applied
779            .read()
780            .expect("database state machine last_applied lock poisoned");
781        let version = *self
782            .version
783            .read()
784            .expect("database state machine version lock poisoned");
785
786        // Try to include database snapshot in the data
787        let mut data = cache.clone();
788        if let Ok(db_snapshot) = self.handler.create_snapshot() {
789            data.insert("__db_snapshot__".to_string(), db_snapshot);
790        }
791
792        Snapshot {
793            data,
794            last_applied,
795            version,
796        }
797    }
798
799    fn restore(&self, snapshot: Snapshot) {
800        let mut cache = self
801            .kv_cache
802            .write()
803            .expect("database state machine kv_cache lock poisoned");
804        let mut last_applied = self
805            .last_applied
806            .write()
807            .expect("database state machine last_applied lock poisoned");
808        let mut version = self
809            .version
810            .write()
811            .expect("database state machine version lock poisoned");
812
813        // Restore database state if present
814        if let Some(db_snapshot) = snapshot.data.get("__db_snapshot__") {
815            let _ = self.handler.restore_snapshot(db_snapshot);
816        }
817
818        // Restore KV cache (excluding the special key)
819        *cache = snapshot
820            .data
821            .into_iter()
822            .filter(|(k, _)| k != "__db_snapshot__")
823            .collect();
824        *last_applied = snapshot.last_applied;
825        *version = snapshot.version;
826    }
827}
828
829// =============================================================================
830// No-op Database Handler (for testing)
831// =============================================================================
832
833/// A no-op handler that logs operations but doesn't persist anything.
834/// Useful for testing Raft replication without a real database.
835#[derive(Default)]
836pub struct NoOpDatabaseHandler;
837
838impl DatabaseOperationHandler for NoOpDatabaseHandler {
839    fn execute_sql(&self, _sql: &str) -> Result<Vec<u8>, String> {
840        Ok(b"OK".to_vec())
841    }
842
843    fn insert_document(&self, _collection: &str, _document: &[u8]) -> Result<String, String> {
844        Ok("doc-001".to_string())
845    }
846
847    fn update_document(
848        &self,
849        _collection: &str,
850        _doc_id: &str,
851        _document: &[u8],
852    ) -> Result<(), String> {
853        Ok(())
854    }
855
856    fn delete_document(&self, _collection: &str, _doc_id: &str) -> Result<(), String> {
857        Ok(())
858    }
859
860    fn create_node(&self, _label: &str, _properties: &[u8]) -> Result<String, String> {
861        Ok("node-001".to_string())
862    }
863
864    fn delete_node(&self, _node_id: &str) -> Result<(), String> {
865        Ok(())
866    }
867
868    fn create_edge(
869        &self,
870        _edge_type: &str,
871        _from_node: &str,
872        _to_node: &str,
873        _properties: &[u8],
874    ) -> Result<String, String> {
875        Ok("edge-001".to_string())
876    }
877
878    fn delete_edge(&self, _edge_id: &str) -> Result<(), String> {
879        Ok(())
880    }
881
882    fn begin_transaction(&self, _tx_id: u64) -> Result<(), String> {
883        Ok(())
884    }
885
886    fn commit_transaction(&self, _tx_id: u64) -> Result<(), String> {
887        Ok(())
888    }
889
890    fn rollback_transaction(&self, _tx_id: u64) -> Result<(), String> {
891        Ok(())
892    }
893
894    fn create_snapshot(&self) -> Result<Vec<u8>, String> {
895        Ok(Vec::new())
896    }
897
898    fn restore_snapshot(&self, _data: &[u8]) -> Result<(), String> {
899        Ok(())
900    }
901}
902
903// =============================================================================
904// Snapshot
905// =============================================================================
906
907/// A snapshot of the state machine.
908#[derive(Debug, Clone, Serialize, Deserialize)]
909pub struct Snapshot {
910    pub data: HashMap<String, Vec<u8>>,
911    pub last_applied: u64,
912    pub version: u64,
913}
914
915impl Snapshot {
916    /// Serialize the snapshot.
917    pub fn to_bytes(&self) -> Vec<u8> {
918        serde_json::to_vec(self).unwrap_or_default()
919    }
920
921    /// Deserialize a snapshot.
922    pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
923        serde_json::from_slice(bytes).ok()
924    }
925}
926
927// =============================================================================
928// Tests
929// =============================================================================
930
931#[cfg(test)]
932mod tests {
933    use super::*;
934
935    #[test]
936    fn test_command() {
937        let cmd = Command::set("key1", b"value1".to_vec());
938        assert_eq!(cmd.command_type, CommandType::Set);
939        assert_eq!(cmd.key, "key1");
940
941        let bytes = cmd.to_bytes();
942        let restored = Command::from_bytes(&bytes).unwrap();
943        assert_eq!(restored.key, "key1");
944    }
945
946    #[test]
947    fn test_state_machine_set_get() {
948        let sm = StateMachine::new();
949
950        let cmd = Command::set("key1", b"value1".to_vec());
951        let result = sm.apply(&cmd, 1);
952        assert!(result.success);
953        assert_eq!(sm.last_applied(), 1);
954
955        let value = sm.get("key1").unwrap();
956        assert_eq!(value, b"value1");
957    }
958
959    #[test]
960    fn test_state_machine_delete() {
961        let sm = StateMachine::new();
962
963        sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
964        assert!(sm.get("key1").is_some());
965
966        sm.apply(&Command::delete("key1"), 2);
967        assert!(sm.get("key1").is_none());
968    }
969
970    #[test]
971    fn test_state_machine_increment() {
972        let sm = StateMachine::new();
973
974        sm.apply(&Command::set("counter", b"0".to_vec()), 1);
975
976        let cmd = Command {
977            command_type: CommandType::Increment,
978            key: "counter".to_string(),
979            value: None,
980            metadata: HashMap::new(),
981        };
982
983        sm.apply(&cmd, 2);
984        let value = sm.get("counter").unwrap();
985        assert_eq!(String::from_utf8(value).unwrap(), "1");
986    }
987
988    #[test]
989    fn test_snapshot() {
990        let sm = StateMachine::new();
991
992        sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
993        sm.apply(&Command::set("key2", b"value2".to_vec()), 2);
994
995        let snapshot = sm.snapshot();
996        assert_eq!(snapshot.last_applied, 2);
997        assert_eq!(snapshot.data.len(), 2);
998
999        let new_sm = StateMachine::new();
1000        new_sm.restore(snapshot);
1001
1002        assert_eq!(new_sm.get("key1").unwrap(), b"value1");
1003        assert_eq!(new_sm.get("key2").unwrap(), b"value2");
1004        assert_eq!(new_sm.last_applied(), 2);
1005    }
1006
1007    #[test]
1008    fn test_duplicate_apply() {
1009        let sm = StateMachine::new();
1010
1011        let result = sm.apply(&Command::set("key1", b"v1".to_vec()), 1);
1012        assert!(result.success);
1013
1014        let result = sm.apply(&Command::set("key1", b"v2".to_vec()), 1);
1015        assert!(!result.success);
1016
1017        assert_eq!(sm.get("key1").unwrap(), b"v1");
1018    }
1019
1020    #[test]
1021    fn test_sql_command() {
1022        let cmd = Command::sql("INSERT INTO users (name) VALUES ('test')");
1023        assert_eq!(cmd.command_type, CommandType::SqlExecute);
1024        assert_eq!(
1025            cmd.sql_statement(),
1026            Some("INSERT INTO users (name) VALUES ('test')")
1027        );
1028        assert!(cmd.is_write());
1029    }
1030
1031    #[test]
1032    fn test_document_commands() {
1033        let insert = Command::document_insert("users", b"{\"name\": \"test\"}".to_vec());
1034        assert_eq!(insert.command_type, CommandType::DocumentInsert);
1035        assert_eq!(insert.key, "users");
1036
1037        let update =
1038            Command::document_update("users", "doc123", b"{\"name\": \"updated\"}".to_vec());
1039        assert_eq!(update.command_type, CommandType::DocumentUpdate);
1040        assert_eq!(update.metadata.get("doc_id"), Some(&"doc123".to_string()));
1041
1042        let delete = Command::document_delete("users", "doc123");
1043        assert_eq!(delete.command_type, CommandType::DocumentDelete);
1044    }
1045
1046    #[test]
1047    fn test_graph_commands() {
1048        let node = Command::graph_create_node("Person", b"{\"name\": \"Alice\"}".to_vec());
1049        assert_eq!(node.command_type, CommandType::GraphCreateNode);
1050        assert_eq!(node.key, "Person");
1051
1052        let edge = Command::graph_create_edge("KNOWS", "node1", "node2", b"{}".to_vec());
1053        assert_eq!(edge.command_type, CommandType::GraphCreateEdge);
1054        assert_eq!(edge.metadata.get("from"), Some(&"node1".to_string()));
1055        assert_eq!(edge.metadata.get("to"), Some(&"node2".to_string()));
1056    }
1057
1058    #[test]
1059    fn test_transaction_commands() {
1060        let begin = Command::tx_begin(123);
1061        assert_eq!(begin.command_type, CommandType::TransactionBegin);
1062        assert_eq!(begin.transaction_id(), Some(123));
1063
1064        let commit = Command::tx_commit(123);
1065        assert_eq!(commit.command_type, CommandType::TransactionCommit);
1066
1067        let rollback = Command::tx_rollback(123);
1068        assert_eq!(rollback.command_type, CommandType::TransactionRollback);
1069    }
1070
1071    #[test]
1072    fn test_database_state_machine() {
1073        let handler = NoOpDatabaseHandler;
1074        let sm = DatabaseStateMachine::new(handler);
1075
1076        // SQL command
1077        let sql_cmd = Command::sql("INSERT INTO test VALUES (1)");
1078        let result = sm.apply(&sql_cmd, 1);
1079        assert!(result.success);
1080        assert_eq!(result.value, Some(b"OK".to_vec()));
1081
1082        // Document insert
1083        let doc_cmd = Command::document_insert("users", b"{}".to_vec());
1084        let result = sm.apply(&doc_cmd, 2);
1085        assert!(result.success);
1086        assert_eq!(result.value, Some(b"doc-001".to_vec()));
1087
1088        // Transaction
1089        let tx_begin = Command::tx_begin(1);
1090        let result = sm.apply(&tx_begin, 3);
1091        assert!(result.success);
1092
1093        let tx_commit = Command::tx_commit(1);
1094        let result = sm.apply(&tx_commit, 4);
1095        assert!(result.success);
1096
1097        assert_eq!(sm.last_applied(), 4);
1098    }
1099
1100    #[test]
1101    fn test_database_state_machine_snapshot() {
1102        let handler = NoOpDatabaseHandler;
1103        let sm = DatabaseStateMachine::new(handler);
1104
1105        // Add some KV data
1106        sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
1107        sm.apply(&Command::set("key2", b"value2".to_vec()), 2);
1108
1109        let snapshot = sm.snapshot();
1110        assert_eq!(snapshot.last_applied, 2);
1111
1112        // Restore to new state machine
1113        let handler2 = NoOpDatabaseHandler;
1114        let sm2 = DatabaseStateMachine::new(handler2);
1115        sm2.restore(snapshot);
1116
1117        assert_eq!(sm2.get("key1"), Some(b"value1".to_vec()));
1118        assert_eq!(sm2.get("key2"), Some(b"value2".to_vec()));
1119        assert_eq!(sm2.last_applied(), 2);
1120    }
1121
1122    #[test]
1123    fn test_compare_and_swap() {
1124        let sm = StateMachine::new();
1125
1126        // Set initial value
1127        sm.apply(&Command::set("key1", b"old".to_vec()), 1);
1128
1129        // CAS with correct expected value
1130        let mut cmd = Command {
1131            command_type: CommandType::CompareAndSwap,
1132            key: "key1".to_string(),
1133            value: Some(b"new".to_vec()),
1134            metadata: HashMap::new(),
1135        };
1136        cmd.metadata
1137            .insert("expected".to_string(), "old".to_string());
1138
1139        let result = sm.apply(&cmd, 2);
1140        assert!(result.success);
1141        assert_eq!(result.value, Some(b"true".to_vec()));
1142        assert_eq!(sm.get("key1"), Some(b"new".to_vec()));
1143
1144        // CAS with wrong expected value
1145        cmd.metadata
1146            .insert("expected".to_string(), "wrong".to_string());
1147        cmd.value = Some(b"newer".to_vec());
1148        let result = sm.apply(&cmd, 3);
1149        assert!(result.success);
1150        assert_eq!(result.value, Some(b"false".to_vec()));
1151        assert_eq!(sm.get("key1"), Some(b"new".to_vec())); // Unchanged
1152    }
1153}