1use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::RwLock;
12
13pub trait StateMachineBackend: Send + Sync {
21 fn apply(&self, command: &Command, index: u64) -> CommandResult;
23
24 fn get(&self, key: &str) -> Option<Vec<u8>>;
26
27 fn last_applied(&self) -> u64;
29
30 fn version(&self) -> u64;
32
33 fn len(&self) -> usize;
35
36 fn is_empty(&self) -> bool {
38 self.len() == 0
39 }
40
41 fn snapshot(&self) -> Snapshot;
43
44 fn restore(&self, snapshot: Snapshot);
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct Command {
56 pub command_type: CommandType,
57 pub key: String,
58 pub value: Option<Vec<u8>>,
59 pub metadata: HashMap<String, String>,
60}
61
62impl Command {
63 pub fn get(key: impl Into<String>) -> Self {
65 Self {
66 command_type: CommandType::Get,
67 key: key.into(),
68 value: None,
69 metadata: HashMap::new(),
70 }
71 }
72
73 pub fn set(key: impl Into<String>, value: Vec<u8>) -> Self {
75 Self {
76 command_type: CommandType::Set,
77 key: key.into(),
78 value: Some(value),
79 metadata: HashMap::new(),
80 }
81 }
82
83 pub fn delete(key: impl Into<String>) -> Self {
85 Self {
86 command_type: CommandType::Delete,
87 key: key.into(),
88 value: None,
89 metadata: HashMap::new(),
90 }
91 }
92
93 pub fn sql(statement: impl Into<String>) -> Self {
95 Self {
96 command_type: CommandType::SqlExecute,
97 key: String::new(),
98 value: Some(statement.into().into_bytes()),
99 metadata: HashMap::new(),
100 }
101 }
102
103 pub fn document_insert(collection: impl Into<String>, document: Vec<u8>) -> Self {
105 Self {
106 command_type: CommandType::DocumentInsert,
107 key: collection.into(),
108 value: Some(document),
109 metadata: HashMap::new(),
110 }
111 }
112
113 pub fn document_update(
115 collection: impl Into<String>,
116 doc_id: impl Into<String>,
117 document: Vec<u8>,
118 ) -> Self {
119 let mut metadata = HashMap::new();
120 metadata.insert("doc_id".to_string(), doc_id.into());
121 Self {
122 command_type: CommandType::DocumentUpdate,
123 key: collection.into(),
124 value: Some(document),
125 metadata,
126 }
127 }
128
129 pub fn document_delete(collection: impl Into<String>, doc_id: impl Into<String>) -> Self {
131 let mut metadata = HashMap::new();
132 metadata.insert("doc_id".to_string(), doc_id.into());
133 Self {
134 command_type: CommandType::DocumentDelete,
135 key: collection.into(),
136 value: None,
137 metadata,
138 }
139 }
140
141 pub fn graph_create_node(label: impl Into<String>, properties: Vec<u8>) -> Self {
143 Self {
144 command_type: CommandType::GraphCreateNode,
145 key: label.into(),
146 value: Some(properties),
147 metadata: HashMap::new(),
148 }
149 }
150
151 pub fn graph_create_edge(
153 edge_type: impl Into<String>,
154 from_node: impl Into<String>,
155 to_node: impl Into<String>,
156 properties: Vec<u8>,
157 ) -> Self {
158 let mut metadata = HashMap::new();
159 metadata.insert("from".to_string(), from_node.into());
160 metadata.insert("to".to_string(), to_node.into());
161 Self {
162 command_type: CommandType::GraphCreateEdge,
163 key: edge_type.into(),
164 value: Some(properties),
165 metadata,
166 }
167 }
168
169 pub fn tx_begin(tx_id: u64) -> Self {
171 let mut metadata = HashMap::new();
172 metadata.insert("tx_id".to_string(), tx_id.to_string());
173 Self {
174 command_type: CommandType::TransactionBegin,
175 key: String::new(),
176 value: None,
177 metadata,
178 }
179 }
180
181 pub fn tx_commit(tx_id: u64) -> Self {
183 let mut metadata = HashMap::new();
184 metadata.insert("tx_id".to_string(), tx_id.to_string());
185 Self {
186 command_type: CommandType::TransactionCommit,
187 key: String::new(),
188 value: None,
189 metadata,
190 }
191 }
192
193 pub fn tx_rollback(tx_id: u64) -> Self {
195 let mut metadata = HashMap::new();
196 metadata.insert("tx_id".to_string(), tx_id.to_string());
197 Self {
198 command_type: CommandType::TransactionRollback,
199 key: String::new(),
200 value: None,
201 metadata,
202 }
203 }
204
205 pub fn to_bytes(&self) -> Vec<u8> {
207 serde_json::to_vec(self).unwrap_or_default()
208 }
209
210 pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
212 serde_json::from_slice(bytes).ok()
213 }
214
215 pub fn is_write(&self) -> bool {
217 !matches!(self.command_type, CommandType::Get)
218 }
219
220 pub fn sql_statement(&self) -> Option<&str> {
222 if self.command_type == CommandType::SqlExecute {
223 self.value
224 .as_ref()
225 .and_then(|v| std::str::from_utf8(v).ok())
226 } else {
227 None
228 }
229 }
230
231 pub fn transaction_id(&self) -> Option<u64> {
233 self.metadata.get("tx_id").and_then(|s| s.parse().ok())
234 }
235}
236
237#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
243pub enum CommandType {
244 Get,
246 Set,
247 Delete,
248 CompareAndSwap,
249 Increment,
250
251 SqlExecute,
253
254 DocumentInsert,
256 DocumentUpdate,
257 DocumentDelete,
258
259 GraphCreateNode,
261 GraphDeleteNode,
262 GraphCreateEdge,
263 GraphDeleteEdge,
264
265 TransactionBegin,
267 TransactionCommit,
268 TransactionRollback,
269
270 Custom,
272}
273
274#[derive(Debug, Clone, Serialize, Deserialize)]
280pub struct CommandResult {
281 pub success: bool,
282 pub value: Option<Vec<u8>>,
283 pub error: Option<String>,
284 pub applied_index: u64,
285}
286
287impl CommandResult {
288 pub fn success(value: Option<Vec<u8>>, applied_index: u64) -> Self {
289 Self {
290 success: true,
291 value,
292 error: None,
293 applied_index,
294 }
295 }
296
297 pub fn error(message: impl Into<String>, applied_index: u64) -> Self {
298 Self {
299 success: false,
300 value: None,
301 error: Some(message.into()),
302 applied_index,
303 }
304 }
305}
306
307pub struct StateMachine {
314 data: RwLock<HashMap<String, Vec<u8>>>,
315 last_applied: RwLock<u64>,
316 version: RwLock<u64>,
317}
318
319impl StateMachine {
320 pub fn new() -> Self {
322 Self {
323 data: RwLock::new(HashMap::new()),
324 last_applied: RwLock::new(0),
325 version: RwLock::new(0),
326 }
327 }
328
329 fn apply_kv_command(
330 &self,
331 command: &Command,
332 index: u64,
333 data: &mut HashMap<String, Vec<u8>>,
334 version: &mut u64,
335 ) -> CommandResult {
336 match command.command_type {
337 CommandType::Get => {
338 let value = data.get(&command.key).cloned();
339 CommandResult::success(value, index)
340 }
341 CommandType::Set => {
342 if let Some(ref value) = command.value {
343 data.insert(command.key.clone(), value.clone());
344 *version += 1;
345 CommandResult::success(None, index)
346 } else {
347 CommandResult::error("No value provided", index)
348 }
349 }
350 CommandType::Delete => {
351 let old = data.remove(&command.key);
352 *version += 1;
353 CommandResult::success(old, index)
354 }
355 CommandType::CompareAndSwap => {
356 let expected = command
358 .metadata
359 .get("expected")
360 .map(|s| s.as_bytes().to_vec());
361 let current = data.get(&command.key).cloned();
362
363 if current == expected {
364 if let Some(ref new_value) = command.value {
365 data.insert(command.key.clone(), new_value.clone());
366 *version += 1;
367 CommandResult::success(Some(b"true".to_vec()), index)
368 } else {
369 CommandResult::error("No new value provided", index)
370 }
371 } else {
372 CommandResult::success(Some(b"false".to_vec()), index)
373 }
374 }
375 CommandType::Increment => {
376 let current = data
377 .get(&command.key)
378 .and_then(|v| String::from_utf8(v.clone()).ok())
379 .and_then(|s| s.parse::<i64>().ok())
380 .unwrap_or(0);
381
382 let new_value = (current + 1).to_string().into_bytes();
383 data.insert(command.key.clone(), new_value.clone());
384 *version += 1;
385 CommandResult::success(Some(new_value), index)
386 }
387 _ => CommandResult::error(
388 format!(
389 "Command type {:?} not supported by in-memory state machine",
390 command.command_type
391 ),
392 index,
393 ),
394 }
395 }
396}
397
398impl StateMachineBackend for StateMachine {
399 fn apply(&self, command: &Command, index: u64) -> CommandResult {
400 let mut data = self.data.write().expect("state machine data lock poisoned");
401 let mut last_applied = self
402 .last_applied
403 .write()
404 .expect("state machine last_applied lock poisoned");
405 let mut version = self
406 .version
407 .write()
408 .expect("state machine version lock poisoned");
409
410 if index <= *last_applied {
411 return CommandResult::error("Already applied", *last_applied);
412 }
413
414 let result = self.apply_kv_command(command, index, &mut data, &mut version);
415 *last_applied = index;
416 result
417 }
418
419 fn get(&self, key: &str) -> Option<Vec<u8>> {
420 let data = self.data.read().expect("state machine data lock poisoned");
421 data.get(key).cloned()
422 }
423
424 fn last_applied(&self) -> u64 {
425 *self
426 .last_applied
427 .read()
428 .expect("state machine last_applied lock poisoned")
429 }
430
431 fn version(&self) -> u64 {
432 *self
433 .version
434 .read()
435 .expect("state machine version lock poisoned")
436 }
437
438 fn len(&self) -> usize {
439 let data = self.data.read().expect("state machine data lock poisoned");
440 data.len()
441 }
442
443 fn snapshot(&self) -> Snapshot {
444 let data = self.data.read().expect("state machine data lock poisoned");
445 let last_applied = *self
446 .last_applied
447 .read()
448 .expect("state machine last_applied lock poisoned");
449 let version = *self
450 .version
451 .read()
452 .expect("state machine version lock poisoned");
453
454 Snapshot {
455 data: data.clone(),
456 last_applied,
457 version,
458 }
459 }
460
461 fn restore(&self, snapshot: Snapshot) {
462 let mut data = self.data.write().expect("state machine data lock poisoned");
463 let mut last_applied = self
464 .last_applied
465 .write()
466 .expect("state machine last_applied lock poisoned");
467 let mut version = self
468 .version
469 .write()
470 .expect("state machine version lock poisoned");
471
472 *data = snapshot.data;
473 *last_applied = snapshot.last_applied;
474 *version = snapshot.version;
475 }
476}
477
478impl Default for StateMachine {
479 fn default() -> Self {
480 Self::new()
481 }
482}
483
484pub struct DatabaseStateMachine<F: DatabaseOperationHandler> {
491 handler: F,
492 last_applied: RwLock<u64>,
493 version: RwLock<u64>,
494 kv_cache: RwLock<HashMap<String, Vec<u8>>>,
496}
497
498pub trait DatabaseOperationHandler: Send + Sync {
501 fn execute_sql(&self, sql: &str) -> Result<Vec<u8>, String>;
503
504 fn insert_document(&self, collection: &str, document: &[u8]) -> Result<String, String>;
506
507 fn update_document(
509 &self,
510 collection: &str,
511 doc_id: &str,
512 document: &[u8],
513 ) -> Result<(), String>;
514
515 fn delete_document(&self, collection: &str, doc_id: &str) -> Result<(), String>;
517
518 fn create_node(&self, label: &str, properties: &[u8]) -> Result<String, String>;
520
521 fn delete_node(&self, node_id: &str) -> Result<(), String>;
523
524 fn create_edge(
526 &self,
527 edge_type: &str,
528 from_node: &str,
529 to_node: &str,
530 properties: &[u8],
531 ) -> Result<String, String>;
532
533 fn delete_edge(&self, edge_id: &str) -> Result<(), String>;
535
536 fn begin_transaction(&self, tx_id: u64) -> Result<(), String>;
538
539 fn commit_transaction(&self, tx_id: u64) -> Result<(), String>;
541
542 fn rollback_transaction(&self, tx_id: u64) -> Result<(), String>;
544
545 fn create_snapshot(&self) -> Result<Vec<u8>, String>;
547
548 fn restore_snapshot(&self, data: &[u8]) -> Result<(), String>;
550}
551
552impl<F: DatabaseOperationHandler> DatabaseStateMachine<F> {
553 pub fn new(handler: F) -> Self {
555 Self {
556 handler,
557 last_applied: RwLock::new(0),
558 version: RwLock::new(0),
559 kv_cache: RwLock::new(HashMap::new()),
560 }
561 }
562
563 fn apply_database_command(&self, command: &Command, index: u64) -> CommandResult {
564 match command.command_type {
565 CommandType::Get => {
567 let cache = self
568 .kv_cache
569 .read()
570 .expect("database state machine kv_cache lock poisoned");
571 let value = cache.get(&command.key).cloned();
572 CommandResult::success(value, index)
573 }
574 CommandType::Set => {
575 if let Some(ref value) = command.value {
576 let mut cache = self
577 .kv_cache
578 .write()
579 .expect("database state machine kv_cache lock poisoned");
580 cache.insert(command.key.clone(), value.clone());
581 CommandResult::success(None, index)
582 } else {
583 CommandResult::error("No value provided", index)
584 }
585 }
586 CommandType::Delete => {
587 let mut cache = self
588 .kv_cache
589 .write()
590 .expect("database state machine kv_cache lock poisoned");
591 let old = cache.remove(&command.key);
592 CommandResult::success(old, index)
593 }
594
595 CommandType::SqlExecute => {
597 if let Some(sql) = command.sql_statement() {
598 match self.handler.execute_sql(sql) {
599 Ok(result) => CommandResult::success(Some(result), index),
600 Err(e) => CommandResult::error(e, index),
601 }
602 } else {
603 CommandResult::error("No SQL statement provided", index)
604 }
605 }
606
607 CommandType::DocumentInsert => {
609 if let Some(ref doc) = command.value {
610 match self.handler.insert_document(&command.key, doc) {
611 Ok(doc_id) => CommandResult::success(Some(doc_id.into_bytes()), index),
612 Err(e) => CommandResult::error(e, index),
613 }
614 } else {
615 CommandResult::error("No document provided", index)
616 }
617 }
618 CommandType::DocumentUpdate => {
619 if let (Some(doc_id), Some(ref doc)) =
620 (command.metadata.get("doc_id"), &command.value)
621 {
622 match self.handler.update_document(&command.key, doc_id, doc) {
623 Ok(()) => CommandResult::success(None, index),
624 Err(e) => CommandResult::error(e, index),
625 }
626 } else {
627 CommandResult::error("Missing doc_id or document", index)
628 }
629 }
630 CommandType::DocumentDelete => {
631 if let Some(doc_id) = command.metadata.get("doc_id") {
632 match self.handler.delete_document(&command.key, doc_id) {
633 Ok(()) => CommandResult::success(None, index),
634 Err(e) => CommandResult::error(e, index),
635 }
636 } else {
637 CommandResult::error("Missing doc_id", index)
638 }
639 }
640
641 CommandType::GraphCreateNode => {
643 if let Some(ref props) = command.value {
644 match self.handler.create_node(&command.key, props) {
645 Ok(node_id) => CommandResult::success(Some(node_id.into_bytes()), index),
646 Err(e) => CommandResult::error(e, index),
647 }
648 } else {
649 CommandResult::error("No properties provided", index)
650 }
651 }
652 CommandType::GraphDeleteNode => match self.handler.delete_node(&command.key) {
653 Ok(()) => CommandResult::success(None, index),
654 Err(e) => CommandResult::error(e, index),
655 },
656 CommandType::GraphCreateEdge => {
657 if let (Some(from), Some(to), Some(ref props)) = (
658 command.metadata.get("from"),
659 command.metadata.get("to"),
660 &command.value,
661 ) {
662 match self.handler.create_edge(&command.key, from, to, props) {
663 Ok(edge_id) => CommandResult::success(Some(edge_id.into_bytes()), index),
664 Err(e) => CommandResult::error(e, index),
665 }
666 } else {
667 CommandResult::error("Missing from, to, or properties", index)
668 }
669 }
670 CommandType::GraphDeleteEdge => match self.handler.delete_edge(&command.key) {
671 Ok(()) => CommandResult::success(None, index),
672 Err(e) => CommandResult::error(e, index),
673 },
674
675 CommandType::TransactionBegin => {
677 if let Some(tx_id) = command.transaction_id() {
678 match self.handler.begin_transaction(tx_id) {
679 Ok(()) => CommandResult::success(None, index),
680 Err(e) => CommandResult::error(e, index),
681 }
682 } else {
683 CommandResult::error("Missing transaction ID", index)
684 }
685 }
686 CommandType::TransactionCommit => {
687 if let Some(tx_id) = command.transaction_id() {
688 match self.handler.commit_transaction(tx_id) {
689 Ok(()) => CommandResult::success(None, index),
690 Err(e) => CommandResult::error(e, index),
691 }
692 } else {
693 CommandResult::error("Missing transaction ID", index)
694 }
695 }
696 CommandType::TransactionRollback => {
697 if let Some(tx_id) = command.transaction_id() {
698 match self.handler.rollback_transaction(tx_id) {
699 Ok(()) => CommandResult::success(None, index),
700 Err(e) => CommandResult::error(e, index),
701 }
702 } else {
703 CommandResult::error("Missing transaction ID", index)
704 }
705 }
706
707 CommandType::CompareAndSwap | CommandType::Increment => {
709 CommandResult::error("Use key-value state machine for these operations", index)
710 }
711 CommandType::Custom => CommandResult::error("Custom commands not handled", index),
712 }
713 }
714}
715
716impl<F: DatabaseOperationHandler> StateMachineBackend for DatabaseStateMachine<F> {
717 fn apply(&self, command: &Command, index: u64) -> CommandResult {
718 let mut last_applied = self
719 .last_applied
720 .write()
721 .expect("database state machine last_applied lock poisoned");
722 let mut version = self
723 .version
724 .write()
725 .expect("database state machine version lock poisoned");
726
727 if index <= *last_applied {
728 return CommandResult::error("Already applied", *last_applied);
729 }
730
731 let result = self.apply_database_command(command, index);
732
733 if result.success {
734 *version += 1;
735 }
736 *last_applied = index;
737
738 result
739 }
740
741 fn get(&self, key: &str) -> Option<Vec<u8>> {
742 let cache = self
743 .kv_cache
744 .read()
745 .expect("database state machine kv_cache lock poisoned");
746 cache.get(key).cloned()
747 }
748
749 fn last_applied(&self) -> u64 {
750 *self
751 .last_applied
752 .read()
753 .expect("database state machine last_applied lock poisoned")
754 }
755
756 fn version(&self) -> u64 {
757 *self
758 .version
759 .read()
760 .expect("database state machine version lock poisoned")
761 }
762
763 fn len(&self) -> usize {
764 let cache = self
765 .kv_cache
766 .read()
767 .expect("database state machine kv_cache lock poisoned");
768 cache.len()
769 }
770
771 fn snapshot(&self) -> Snapshot {
772 let cache = self
774 .kv_cache
775 .read()
776 .expect("database state machine kv_cache lock poisoned");
777 let last_applied = *self
778 .last_applied
779 .read()
780 .expect("database state machine last_applied lock poisoned");
781 let version = *self
782 .version
783 .read()
784 .expect("database state machine version lock poisoned");
785
786 let mut data = cache.clone();
788 if let Ok(db_snapshot) = self.handler.create_snapshot() {
789 data.insert("__db_snapshot__".to_string(), db_snapshot);
790 }
791
792 Snapshot {
793 data,
794 last_applied,
795 version,
796 }
797 }
798
799 fn restore(&self, snapshot: Snapshot) {
800 let mut cache = self
801 .kv_cache
802 .write()
803 .expect("database state machine kv_cache lock poisoned");
804 let mut last_applied = self
805 .last_applied
806 .write()
807 .expect("database state machine last_applied lock poisoned");
808 let mut version = self
809 .version
810 .write()
811 .expect("database state machine version lock poisoned");
812
813 if let Some(db_snapshot) = snapshot.data.get("__db_snapshot__") {
815 let _ = self.handler.restore_snapshot(db_snapshot);
816 }
817
818 *cache = snapshot
820 .data
821 .into_iter()
822 .filter(|(k, _)| k != "__db_snapshot__")
823 .collect();
824 *last_applied = snapshot.last_applied;
825 *version = snapshot.version;
826 }
827}
828
829#[derive(Default)]
836pub struct NoOpDatabaseHandler;
837
838impl DatabaseOperationHandler for NoOpDatabaseHandler {
839 fn execute_sql(&self, _sql: &str) -> Result<Vec<u8>, String> {
840 Ok(b"OK".to_vec())
841 }
842
843 fn insert_document(&self, _collection: &str, _document: &[u8]) -> Result<String, String> {
844 Ok("doc-001".to_string())
845 }
846
847 fn update_document(
848 &self,
849 _collection: &str,
850 _doc_id: &str,
851 _document: &[u8],
852 ) -> Result<(), String> {
853 Ok(())
854 }
855
856 fn delete_document(&self, _collection: &str, _doc_id: &str) -> Result<(), String> {
857 Ok(())
858 }
859
860 fn create_node(&self, _label: &str, _properties: &[u8]) -> Result<String, String> {
861 Ok("node-001".to_string())
862 }
863
864 fn delete_node(&self, _node_id: &str) -> Result<(), String> {
865 Ok(())
866 }
867
868 fn create_edge(
869 &self,
870 _edge_type: &str,
871 _from_node: &str,
872 _to_node: &str,
873 _properties: &[u8],
874 ) -> Result<String, String> {
875 Ok("edge-001".to_string())
876 }
877
878 fn delete_edge(&self, _edge_id: &str) -> Result<(), String> {
879 Ok(())
880 }
881
882 fn begin_transaction(&self, _tx_id: u64) -> Result<(), String> {
883 Ok(())
884 }
885
886 fn commit_transaction(&self, _tx_id: u64) -> Result<(), String> {
887 Ok(())
888 }
889
890 fn rollback_transaction(&self, _tx_id: u64) -> Result<(), String> {
891 Ok(())
892 }
893
894 fn create_snapshot(&self) -> Result<Vec<u8>, String> {
895 Ok(Vec::new())
896 }
897
898 fn restore_snapshot(&self, _data: &[u8]) -> Result<(), String> {
899 Ok(())
900 }
901}
902
903#[derive(Debug, Clone, Serialize, Deserialize)]
909pub struct Snapshot {
910 pub data: HashMap<String, Vec<u8>>,
911 pub last_applied: u64,
912 pub version: u64,
913}
914
915impl Snapshot {
916 pub fn to_bytes(&self) -> Vec<u8> {
918 serde_json::to_vec(self).unwrap_or_default()
919 }
920
921 pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
923 serde_json::from_slice(bytes).ok()
924 }
925}
926
927#[cfg(test)]
932mod tests {
933 use super::*;
934
935 #[test]
936 fn test_command() {
937 let cmd = Command::set("key1", b"value1".to_vec());
938 assert_eq!(cmd.command_type, CommandType::Set);
939 assert_eq!(cmd.key, "key1");
940
941 let bytes = cmd.to_bytes();
942 let restored = Command::from_bytes(&bytes).unwrap();
943 assert_eq!(restored.key, "key1");
944 }
945
946 #[test]
947 fn test_state_machine_set_get() {
948 let sm = StateMachine::new();
949
950 let cmd = Command::set("key1", b"value1".to_vec());
951 let result = sm.apply(&cmd, 1);
952 assert!(result.success);
953 assert_eq!(sm.last_applied(), 1);
954
955 let value = sm.get("key1").unwrap();
956 assert_eq!(value, b"value1");
957 }
958
959 #[test]
960 fn test_state_machine_delete() {
961 let sm = StateMachine::new();
962
963 sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
964 assert!(sm.get("key1").is_some());
965
966 sm.apply(&Command::delete("key1"), 2);
967 assert!(sm.get("key1").is_none());
968 }
969
970 #[test]
971 fn test_state_machine_increment() {
972 let sm = StateMachine::new();
973
974 sm.apply(&Command::set("counter", b"0".to_vec()), 1);
975
976 let cmd = Command {
977 command_type: CommandType::Increment,
978 key: "counter".to_string(),
979 value: None,
980 metadata: HashMap::new(),
981 };
982
983 sm.apply(&cmd, 2);
984 let value = sm.get("counter").unwrap();
985 assert_eq!(String::from_utf8(value).unwrap(), "1");
986 }
987
988 #[test]
989 fn test_snapshot() {
990 let sm = StateMachine::new();
991
992 sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
993 sm.apply(&Command::set("key2", b"value2".to_vec()), 2);
994
995 let snapshot = sm.snapshot();
996 assert_eq!(snapshot.last_applied, 2);
997 assert_eq!(snapshot.data.len(), 2);
998
999 let new_sm = StateMachine::new();
1000 new_sm.restore(snapshot);
1001
1002 assert_eq!(new_sm.get("key1").unwrap(), b"value1");
1003 assert_eq!(new_sm.get("key2").unwrap(), b"value2");
1004 assert_eq!(new_sm.last_applied(), 2);
1005 }
1006
1007 #[test]
1008 fn test_duplicate_apply() {
1009 let sm = StateMachine::new();
1010
1011 let result = sm.apply(&Command::set("key1", b"v1".to_vec()), 1);
1012 assert!(result.success);
1013
1014 let result = sm.apply(&Command::set("key1", b"v2".to_vec()), 1);
1015 assert!(!result.success);
1016
1017 assert_eq!(sm.get("key1").unwrap(), b"v1");
1018 }
1019
1020 #[test]
1021 fn test_sql_command() {
1022 let cmd = Command::sql("INSERT INTO users (name) VALUES ('test')");
1023 assert_eq!(cmd.command_type, CommandType::SqlExecute);
1024 assert_eq!(
1025 cmd.sql_statement(),
1026 Some("INSERT INTO users (name) VALUES ('test')")
1027 );
1028 assert!(cmd.is_write());
1029 }
1030
1031 #[test]
1032 fn test_document_commands() {
1033 let insert = Command::document_insert("users", b"{\"name\": \"test\"}".to_vec());
1034 assert_eq!(insert.command_type, CommandType::DocumentInsert);
1035 assert_eq!(insert.key, "users");
1036
1037 let update =
1038 Command::document_update("users", "doc123", b"{\"name\": \"updated\"}".to_vec());
1039 assert_eq!(update.command_type, CommandType::DocumentUpdate);
1040 assert_eq!(update.metadata.get("doc_id"), Some(&"doc123".to_string()));
1041
1042 let delete = Command::document_delete("users", "doc123");
1043 assert_eq!(delete.command_type, CommandType::DocumentDelete);
1044 }
1045
1046 #[test]
1047 fn test_graph_commands() {
1048 let node = Command::graph_create_node("Person", b"{\"name\": \"Alice\"}".to_vec());
1049 assert_eq!(node.command_type, CommandType::GraphCreateNode);
1050 assert_eq!(node.key, "Person");
1051
1052 let edge = Command::graph_create_edge("KNOWS", "node1", "node2", b"{}".to_vec());
1053 assert_eq!(edge.command_type, CommandType::GraphCreateEdge);
1054 assert_eq!(edge.metadata.get("from"), Some(&"node1".to_string()));
1055 assert_eq!(edge.metadata.get("to"), Some(&"node2".to_string()));
1056 }
1057
1058 #[test]
1059 fn test_transaction_commands() {
1060 let begin = Command::tx_begin(123);
1061 assert_eq!(begin.command_type, CommandType::TransactionBegin);
1062 assert_eq!(begin.transaction_id(), Some(123));
1063
1064 let commit = Command::tx_commit(123);
1065 assert_eq!(commit.command_type, CommandType::TransactionCommit);
1066
1067 let rollback = Command::tx_rollback(123);
1068 assert_eq!(rollback.command_type, CommandType::TransactionRollback);
1069 }
1070
1071 #[test]
1072 fn test_database_state_machine() {
1073 let handler = NoOpDatabaseHandler;
1074 let sm = DatabaseStateMachine::new(handler);
1075
1076 let sql_cmd = Command::sql("INSERT INTO test VALUES (1)");
1078 let result = sm.apply(&sql_cmd, 1);
1079 assert!(result.success);
1080 assert_eq!(result.value, Some(b"OK".to_vec()));
1081
1082 let doc_cmd = Command::document_insert("users", b"{}".to_vec());
1084 let result = sm.apply(&doc_cmd, 2);
1085 assert!(result.success);
1086 assert_eq!(result.value, Some(b"doc-001".to_vec()));
1087
1088 let tx_begin = Command::tx_begin(1);
1090 let result = sm.apply(&tx_begin, 3);
1091 assert!(result.success);
1092
1093 let tx_commit = Command::tx_commit(1);
1094 let result = sm.apply(&tx_commit, 4);
1095 assert!(result.success);
1096
1097 assert_eq!(sm.last_applied(), 4);
1098 }
1099
1100 #[test]
1101 fn test_database_state_machine_snapshot() {
1102 let handler = NoOpDatabaseHandler;
1103 let sm = DatabaseStateMachine::new(handler);
1104
1105 sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
1107 sm.apply(&Command::set("key2", b"value2".to_vec()), 2);
1108
1109 let snapshot = sm.snapshot();
1110 assert_eq!(snapshot.last_applied, 2);
1111
1112 let handler2 = NoOpDatabaseHandler;
1114 let sm2 = DatabaseStateMachine::new(handler2);
1115 sm2.restore(snapshot);
1116
1117 assert_eq!(sm2.get("key1"), Some(b"value1".to_vec()));
1118 assert_eq!(sm2.get("key2"), Some(b"value2".to_vec()));
1119 assert_eq!(sm2.last_applied(), 2);
1120 }
1121
1122 #[test]
1123 fn test_compare_and_swap() {
1124 let sm = StateMachine::new();
1125
1126 sm.apply(&Command::set("key1", b"old".to_vec()), 1);
1128
1129 let mut cmd = Command {
1131 command_type: CommandType::CompareAndSwap,
1132 key: "key1".to_string(),
1133 value: Some(b"new".to_vec()),
1134 metadata: HashMap::new(),
1135 };
1136 cmd.metadata
1137 .insert("expected".to_string(), "old".to_string());
1138
1139 let result = sm.apply(&cmd, 2);
1140 assert!(result.success);
1141 assert_eq!(result.value, Some(b"true".to_vec()));
1142 assert_eq!(sm.get("key1"), Some(b"new".to_vec()));
1143
1144 cmd.metadata
1146 .insert("expected".to_string(), "wrong".to_string());
1147 cmd.value = Some(b"newer".to_vec());
1148 let result = sm.apply(&cmd, 3);
1149 assert!(result.success);
1150 assert_eq!(result.value, Some(b"false".to_vec()));
1151 assert_eq!(sm.get("key1"), Some(b"new".to_vec())); }
1153}