1use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::RwLock;
12
13pub trait StateMachineBackend: Send + Sync {
21 fn apply(&self, command: &Command, index: u64) -> CommandResult;
23
24 fn get(&self, key: &str) -> Option<Vec<u8>>;
26
27 fn last_applied(&self) -> u64;
29
30 fn version(&self) -> u64;
32
33 fn len(&self) -> usize;
35
36 fn is_empty(&self) -> bool {
38 self.len() == 0
39 }
40
41 fn snapshot(&self) -> Snapshot;
43
44 fn restore(&self, snapshot: Snapshot);
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct Command {
56 pub command_type: CommandType,
57 pub key: String,
58 pub value: Option<Vec<u8>>,
59 pub metadata: HashMap<String, String>,
60}
61
62impl Command {
63 pub fn get(key: impl Into<String>) -> Self {
65 Self {
66 command_type: CommandType::Get,
67 key: key.into(),
68 value: None,
69 metadata: HashMap::new(),
70 }
71 }
72
73 pub fn set(key: impl Into<String>, value: Vec<u8>) -> Self {
75 Self {
76 command_type: CommandType::Set,
77 key: key.into(),
78 value: Some(value),
79 metadata: HashMap::new(),
80 }
81 }
82
83 pub fn delete(key: impl Into<String>) -> Self {
85 Self {
86 command_type: CommandType::Delete,
87 key: key.into(),
88 value: None,
89 metadata: HashMap::new(),
90 }
91 }
92
93 pub fn sql(statement: impl Into<String>) -> Self {
95 Self {
96 command_type: CommandType::SqlExecute,
97 key: String::new(),
98 value: Some(statement.into().into_bytes()),
99 metadata: HashMap::new(),
100 }
101 }
102
103 pub fn document_insert(collection: impl Into<String>, document: Vec<u8>) -> Self {
105 Self {
106 command_type: CommandType::DocumentInsert,
107 key: collection.into(),
108 value: Some(document),
109 metadata: HashMap::new(),
110 }
111 }
112
113 pub fn document_update(collection: impl Into<String>, doc_id: impl Into<String>, document: Vec<u8>) -> Self {
115 let mut metadata = HashMap::new();
116 metadata.insert("doc_id".to_string(), doc_id.into());
117 Self {
118 command_type: CommandType::DocumentUpdate,
119 key: collection.into(),
120 value: Some(document),
121 metadata,
122 }
123 }
124
125 pub fn document_delete(collection: impl Into<String>, doc_id: impl Into<String>) -> Self {
127 let mut metadata = HashMap::new();
128 metadata.insert("doc_id".to_string(), doc_id.into());
129 Self {
130 command_type: CommandType::DocumentDelete,
131 key: collection.into(),
132 value: None,
133 metadata,
134 }
135 }
136
137 pub fn graph_create_node(label: impl Into<String>, properties: Vec<u8>) -> Self {
139 Self {
140 command_type: CommandType::GraphCreateNode,
141 key: label.into(),
142 value: Some(properties),
143 metadata: HashMap::new(),
144 }
145 }
146
147 pub fn graph_create_edge(edge_type: impl Into<String>, from_node: impl Into<String>, to_node: impl Into<String>, properties: Vec<u8>) -> Self {
149 let mut metadata = HashMap::new();
150 metadata.insert("from".to_string(), from_node.into());
151 metadata.insert("to".to_string(), to_node.into());
152 Self {
153 command_type: CommandType::GraphCreateEdge,
154 key: edge_type.into(),
155 value: Some(properties),
156 metadata,
157 }
158 }
159
160 pub fn tx_begin(tx_id: u64) -> Self {
162 let mut metadata = HashMap::new();
163 metadata.insert("tx_id".to_string(), tx_id.to_string());
164 Self {
165 command_type: CommandType::TransactionBegin,
166 key: String::new(),
167 value: None,
168 metadata,
169 }
170 }
171
172 pub fn tx_commit(tx_id: u64) -> Self {
174 let mut metadata = HashMap::new();
175 metadata.insert("tx_id".to_string(), tx_id.to_string());
176 Self {
177 command_type: CommandType::TransactionCommit,
178 key: String::new(),
179 value: None,
180 metadata,
181 }
182 }
183
184 pub fn tx_rollback(tx_id: u64) -> Self {
186 let mut metadata = HashMap::new();
187 metadata.insert("tx_id".to_string(), tx_id.to_string());
188 Self {
189 command_type: CommandType::TransactionRollback,
190 key: String::new(),
191 value: None,
192 metadata,
193 }
194 }
195
196 pub fn to_bytes(&self) -> Vec<u8> {
198 serde_json::to_vec(self).unwrap_or_default()
199 }
200
201 pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
203 serde_json::from_slice(bytes).ok()
204 }
205
206 pub fn is_write(&self) -> bool {
208 !matches!(self.command_type, CommandType::Get)
209 }
210
211 pub fn sql_statement(&self) -> Option<&str> {
213 if self.command_type == CommandType::SqlExecute {
214 self.value.as_ref().and_then(|v| std::str::from_utf8(v).ok())
215 } else {
216 None
217 }
218 }
219
220 pub fn transaction_id(&self) -> Option<u64> {
222 self.metadata.get("tx_id").and_then(|s| s.parse().ok())
223 }
224}
225
226#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
232pub enum CommandType {
233 Get,
235 Set,
236 Delete,
237 CompareAndSwap,
238 Increment,
239
240 SqlExecute,
242
243 DocumentInsert,
245 DocumentUpdate,
246 DocumentDelete,
247
248 GraphCreateNode,
250 GraphDeleteNode,
251 GraphCreateEdge,
252 GraphDeleteEdge,
253
254 TransactionBegin,
256 TransactionCommit,
257 TransactionRollback,
258
259 Custom,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct CommandResult {
270 pub success: bool,
271 pub value: Option<Vec<u8>>,
272 pub error: Option<String>,
273 pub applied_index: u64,
274}
275
276impl CommandResult {
277 pub fn success(value: Option<Vec<u8>>, applied_index: u64) -> Self {
278 Self {
279 success: true,
280 value,
281 error: None,
282 applied_index,
283 }
284 }
285
286 pub fn error(message: impl Into<String>, applied_index: u64) -> Self {
287 Self {
288 success: false,
289 value: None,
290 error: Some(message.into()),
291 applied_index,
292 }
293 }
294}
295
296pub struct StateMachine {
303 data: RwLock<HashMap<String, Vec<u8>>>,
304 last_applied: RwLock<u64>,
305 version: RwLock<u64>,
306}
307
308impl StateMachine {
309 pub fn new() -> Self {
311 Self {
312 data: RwLock::new(HashMap::new()),
313 last_applied: RwLock::new(0),
314 version: RwLock::new(0),
315 }
316 }
317
318 fn apply_kv_command(&self, command: &Command, index: u64, data: &mut HashMap<String, Vec<u8>>, version: &mut u64) -> CommandResult {
319 match command.command_type {
320 CommandType::Get => {
321 let value = data.get(&command.key).cloned();
322 CommandResult::success(value, index)
323 }
324 CommandType::Set => {
325 if let Some(ref value) = command.value {
326 data.insert(command.key.clone(), value.clone());
327 *version += 1;
328 CommandResult::success(None, index)
329 } else {
330 CommandResult::error("No value provided", index)
331 }
332 }
333 CommandType::Delete => {
334 let old = data.remove(&command.key);
335 *version += 1;
336 CommandResult::success(old, index)
337 }
338 CommandType::CompareAndSwap => {
339 let expected = command.metadata.get("expected").map(|s| s.as_bytes().to_vec());
341 let current = data.get(&command.key).cloned();
342
343 if current == expected {
344 if let Some(ref new_value) = command.value {
345 data.insert(command.key.clone(), new_value.clone());
346 *version += 1;
347 CommandResult::success(Some(b"true".to_vec()), index)
348 } else {
349 CommandResult::error("No new value provided", index)
350 }
351 } else {
352 CommandResult::success(Some(b"false".to_vec()), index)
353 }
354 }
355 CommandType::Increment => {
356 let current = data
357 .get(&command.key)
358 .and_then(|v| String::from_utf8(v.clone()).ok())
359 .and_then(|s| s.parse::<i64>().ok())
360 .unwrap_or(0);
361
362 let new_value = (current + 1).to_string().into_bytes();
363 data.insert(command.key.clone(), new_value.clone());
364 *version += 1;
365 CommandResult::success(Some(new_value), index)
366 }
367 _ => CommandResult::error(
368 format!("Command type {:?} not supported by in-memory state machine", command.command_type),
369 index
370 ),
371 }
372 }
373}
374
375impl StateMachineBackend for StateMachine {
376 fn apply(&self, command: &Command, index: u64) -> CommandResult {
377 let mut data = self.data.write().expect("state machine data lock poisoned");
378 let mut last_applied = self.last_applied.write().expect("state machine last_applied lock poisoned");
379 let mut version = self.version.write().expect("state machine version lock poisoned");
380
381 if index <= *last_applied {
382 return CommandResult::error("Already applied", *last_applied);
383 }
384
385 let result = self.apply_kv_command(command, index, &mut data, &mut version);
386 *last_applied = index;
387 result
388 }
389
390 fn get(&self, key: &str) -> Option<Vec<u8>> {
391 let data = self.data.read().expect("state machine data lock poisoned");
392 data.get(key).cloned()
393 }
394
395 fn last_applied(&self) -> u64 {
396 *self.last_applied.read().expect("state machine last_applied lock poisoned")
397 }
398
399 fn version(&self) -> u64 {
400 *self.version.read().expect("state machine version lock poisoned")
401 }
402
403 fn len(&self) -> usize {
404 let data = self.data.read().expect("state machine data lock poisoned");
405 data.len()
406 }
407
408 fn snapshot(&self) -> Snapshot {
409 let data = self.data.read().expect("state machine data lock poisoned");
410 let last_applied = *self.last_applied.read().expect("state machine last_applied lock poisoned");
411 let version = *self.version.read().expect("state machine version lock poisoned");
412
413 Snapshot {
414 data: data.clone(),
415 last_applied,
416 version,
417 }
418 }
419
420 fn restore(&self, snapshot: Snapshot) {
421 let mut data = self.data.write().expect("state machine data lock poisoned");
422 let mut last_applied = self.last_applied.write().expect("state machine last_applied lock poisoned");
423 let mut version = self.version.write().expect("state machine version lock poisoned");
424
425 *data = snapshot.data;
426 *last_applied = snapshot.last_applied;
427 *version = snapshot.version;
428 }
429}
430
431impl Default for StateMachine {
432 fn default() -> Self {
433 Self::new()
434 }
435}
436
437pub struct DatabaseStateMachine<F: DatabaseOperationHandler> {
444 handler: F,
445 last_applied: RwLock<u64>,
446 version: RwLock<u64>,
447 kv_cache: RwLock<HashMap<String, Vec<u8>>>,
449}
450
451pub trait DatabaseOperationHandler: Send + Sync {
454 fn execute_sql(&self, sql: &str) -> Result<Vec<u8>, String>;
456
457 fn insert_document(&self, collection: &str, document: &[u8]) -> Result<String, String>;
459
460 fn update_document(&self, collection: &str, doc_id: &str, document: &[u8]) -> Result<(), String>;
462
463 fn delete_document(&self, collection: &str, doc_id: &str) -> Result<(), String>;
465
466 fn create_node(&self, label: &str, properties: &[u8]) -> Result<String, String>;
468
469 fn delete_node(&self, node_id: &str) -> Result<(), String>;
471
472 fn create_edge(&self, edge_type: &str, from_node: &str, to_node: &str, properties: &[u8]) -> Result<String, String>;
474
475 fn delete_edge(&self, edge_id: &str) -> Result<(), String>;
477
478 fn begin_transaction(&self, tx_id: u64) -> Result<(), String>;
480
481 fn commit_transaction(&self, tx_id: u64) -> Result<(), String>;
483
484 fn rollback_transaction(&self, tx_id: u64) -> Result<(), String>;
486
487 fn create_snapshot(&self) -> Result<Vec<u8>, String>;
489
490 fn restore_snapshot(&self, data: &[u8]) -> Result<(), String>;
492}
493
494impl<F: DatabaseOperationHandler> DatabaseStateMachine<F> {
495 pub fn new(handler: F) -> Self {
497 Self {
498 handler,
499 last_applied: RwLock::new(0),
500 version: RwLock::new(0),
501 kv_cache: RwLock::new(HashMap::new()),
502 }
503 }
504
505 fn apply_database_command(&self, command: &Command, index: u64) -> CommandResult {
506 match command.command_type {
507 CommandType::Get => {
509 let cache = self.kv_cache.read().expect("database state machine kv_cache lock poisoned");
510 let value = cache.get(&command.key).cloned();
511 CommandResult::success(value, index)
512 }
513 CommandType::Set => {
514 if let Some(ref value) = command.value {
515 let mut cache = self.kv_cache.write().expect("database state machine kv_cache lock poisoned");
516 cache.insert(command.key.clone(), value.clone());
517 CommandResult::success(None, index)
518 } else {
519 CommandResult::error("No value provided", index)
520 }
521 }
522 CommandType::Delete => {
523 let mut cache = self.kv_cache.write().expect("database state machine kv_cache lock poisoned");
524 let old = cache.remove(&command.key);
525 CommandResult::success(old, index)
526 }
527
528 CommandType::SqlExecute => {
530 if let Some(sql) = command.sql_statement() {
531 match self.handler.execute_sql(sql) {
532 Ok(result) => CommandResult::success(Some(result), index),
533 Err(e) => CommandResult::error(e, index),
534 }
535 } else {
536 CommandResult::error("No SQL statement provided", index)
537 }
538 }
539
540 CommandType::DocumentInsert => {
542 if let Some(ref doc) = command.value {
543 match self.handler.insert_document(&command.key, doc) {
544 Ok(doc_id) => CommandResult::success(Some(doc_id.into_bytes()), index),
545 Err(e) => CommandResult::error(e, index),
546 }
547 } else {
548 CommandResult::error("No document provided", index)
549 }
550 }
551 CommandType::DocumentUpdate => {
552 if let (Some(doc_id), Some(ref doc)) = (command.metadata.get("doc_id"), &command.value) {
553 match self.handler.update_document(&command.key, doc_id, doc) {
554 Ok(()) => CommandResult::success(None, index),
555 Err(e) => CommandResult::error(e, index),
556 }
557 } else {
558 CommandResult::error("Missing doc_id or document", index)
559 }
560 }
561 CommandType::DocumentDelete => {
562 if let Some(doc_id) = command.metadata.get("doc_id") {
563 match self.handler.delete_document(&command.key, doc_id) {
564 Ok(()) => CommandResult::success(None, index),
565 Err(e) => CommandResult::error(e, index),
566 }
567 } else {
568 CommandResult::error("Missing doc_id", index)
569 }
570 }
571
572 CommandType::GraphCreateNode => {
574 if let Some(ref props) = command.value {
575 match self.handler.create_node(&command.key, props) {
576 Ok(node_id) => CommandResult::success(Some(node_id.into_bytes()), index),
577 Err(e) => CommandResult::error(e, index),
578 }
579 } else {
580 CommandResult::error("No properties provided", index)
581 }
582 }
583 CommandType::GraphDeleteNode => {
584 match self.handler.delete_node(&command.key) {
585 Ok(()) => CommandResult::success(None, index),
586 Err(e) => CommandResult::error(e, index),
587 }
588 }
589 CommandType::GraphCreateEdge => {
590 if let (Some(from), Some(to), Some(ref props)) = (
591 command.metadata.get("from"),
592 command.metadata.get("to"),
593 &command.value,
594 ) {
595 match self.handler.create_edge(&command.key, from, to, props) {
596 Ok(edge_id) => CommandResult::success(Some(edge_id.into_bytes()), index),
597 Err(e) => CommandResult::error(e, index),
598 }
599 } else {
600 CommandResult::error("Missing from, to, or properties", index)
601 }
602 }
603 CommandType::GraphDeleteEdge => {
604 match self.handler.delete_edge(&command.key) {
605 Ok(()) => CommandResult::success(None, index),
606 Err(e) => CommandResult::error(e, index),
607 }
608 }
609
610 CommandType::TransactionBegin => {
612 if let Some(tx_id) = command.transaction_id() {
613 match self.handler.begin_transaction(tx_id) {
614 Ok(()) => CommandResult::success(None, index),
615 Err(e) => CommandResult::error(e, index),
616 }
617 } else {
618 CommandResult::error("Missing transaction ID", index)
619 }
620 }
621 CommandType::TransactionCommit => {
622 if let Some(tx_id) = command.transaction_id() {
623 match self.handler.commit_transaction(tx_id) {
624 Ok(()) => CommandResult::success(None, index),
625 Err(e) => CommandResult::error(e, index),
626 }
627 } else {
628 CommandResult::error("Missing transaction ID", index)
629 }
630 }
631 CommandType::TransactionRollback => {
632 if let Some(tx_id) = command.transaction_id() {
633 match self.handler.rollback_transaction(tx_id) {
634 Ok(()) => CommandResult::success(None, index),
635 Err(e) => CommandResult::error(e, index),
636 }
637 } else {
638 CommandResult::error("Missing transaction ID", index)
639 }
640 }
641
642 CommandType::CompareAndSwap | CommandType::Increment => {
644 CommandResult::error("Use key-value state machine for these operations", index)
645 }
646 CommandType::Custom => {
647 CommandResult::error("Custom commands not handled", index)
648 }
649 }
650 }
651}
652
653impl<F: DatabaseOperationHandler> StateMachineBackend for DatabaseStateMachine<F> {
654 fn apply(&self, command: &Command, index: u64) -> CommandResult {
655 let mut last_applied = self.last_applied.write().expect("database state machine last_applied lock poisoned");
656 let mut version = self.version.write().expect("database state machine version lock poisoned");
657
658 if index <= *last_applied {
659 return CommandResult::error("Already applied", *last_applied);
660 }
661
662 let result = self.apply_database_command(command, index);
663
664 if result.success {
665 *version += 1;
666 }
667 *last_applied = index;
668
669 result
670 }
671
672 fn get(&self, key: &str) -> Option<Vec<u8>> {
673 let cache = self.kv_cache.read().expect("database state machine kv_cache lock poisoned");
674 cache.get(key).cloned()
675 }
676
677 fn last_applied(&self) -> u64 {
678 *self.last_applied.read().expect("database state machine last_applied lock poisoned")
679 }
680
681 fn version(&self) -> u64 {
682 *self.version.read().expect("database state machine version lock poisoned")
683 }
684
685 fn len(&self) -> usize {
686 let cache = self.kv_cache.read().expect("database state machine kv_cache lock poisoned");
687 cache.len()
688 }
689
690 fn snapshot(&self) -> Snapshot {
691 let cache = self.kv_cache.read().expect("database state machine kv_cache lock poisoned");
693 let last_applied = *self.last_applied.read().expect("database state machine last_applied lock poisoned");
694 let version = *self.version.read().expect("database state machine version lock poisoned");
695
696 let mut data = cache.clone();
698 if let Ok(db_snapshot) = self.handler.create_snapshot() {
699 data.insert("__db_snapshot__".to_string(), db_snapshot);
700 }
701
702 Snapshot {
703 data,
704 last_applied,
705 version,
706 }
707 }
708
709 fn restore(&self, snapshot: Snapshot) {
710 let mut cache = self.kv_cache.write().expect("database state machine kv_cache lock poisoned");
711 let mut last_applied = self.last_applied.write().expect("database state machine last_applied lock poisoned");
712 let mut version = self.version.write().expect("database state machine version lock poisoned");
713
714 if let Some(db_snapshot) = snapshot.data.get("__db_snapshot__") {
716 let _ = self.handler.restore_snapshot(db_snapshot);
717 }
718
719 *cache = snapshot.data.into_iter()
721 .filter(|(k, _)| k != "__db_snapshot__")
722 .collect();
723 *last_applied = snapshot.last_applied;
724 *version = snapshot.version;
725 }
726}
727
728#[derive(Default)]
735pub struct NoOpDatabaseHandler;
736
737impl DatabaseOperationHandler for NoOpDatabaseHandler {
738 fn execute_sql(&self, _sql: &str) -> Result<Vec<u8>, String> {
739 Ok(b"OK".to_vec())
740 }
741
742 fn insert_document(&self, _collection: &str, _document: &[u8]) -> Result<String, String> {
743 Ok("doc-001".to_string())
744 }
745
746 fn update_document(&self, _collection: &str, _doc_id: &str, _document: &[u8]) -> Result<(), String> {
747 Ok(())
748 }
749
750 fn delete_document(&self, _collection: &str, _doc_id: &str) -> Result<(), String> {
751 Ok(())
752 }
753
754 fn create_node(&self, _label: &str, _properties: &[u8]) -> Result<String, String> {
755 Ok("node-001".to_string())
756 }
757
758 fn delete_node(&self, _node_id: &str) -> Result<(), String> {
759 Ok(())
760 }
761
762 fn create_edge(&self, _edge_type: &str, _from_node: &str, _to_node: &str, _properties: &[u8]) -> Result<String, String> {
763 Ok("edge-001".to_string())
764 }
765
766 fn delete_edge(&self, _edge_id: &str) -> Result<(), String> {
767 Ok(())
768 }
769
770 fn begin_transaction(&self, _tx_id: u64) -> Result<(), String> {
771 Ok(())
772 }
773
774 fn commit_transaction(&self, _tx_id: u64) -> Result<(), String> {
775 Ok(())
776 }
777
778 fn rollback_transaction(&self, _tx_id: u64) -> Result<(), String> {
779 Ok(())
780 }
781
782 fn create_snapshot(&self) -> Result<Vec<u8>, String> {
783 Ok(Vec::new())
784 }
785
786 fn restore_snapshot(&self, _data: &[u8]) -> Result<(), String> {
787 Ok(())
788 }
789}
790
791#[derive(Debug, Clone, Serialize, Deserialize)]
797pub struct Snapshot {
798 pub data: HashMap<String, Vec<u8>>,
799 pub last_applied: u64,
800 pub version: u64,
801}
802
803impl Snapshot {
804 pub fn to_bytes(&self) -> Vec<u8> {
806 serde_json::to_vec(self).unwrap_or_default()
807 }
808
809 pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
811 serde_json::from_slice(bytes).ok()
812 }
813}
814
815#[cfg(test)]
820mod tests {
821 use super::*;
822
823 #[test]
824 fn test_command() {
825 let cmd = Command::set("key1", b"value1".to_vec());
826 assert_eq!(cmd.command_type, CommandType::Set);
827 assert_eq!(cmd.key, "key1");
828
829 let bytes = cmd.to_bytes();
830 let restored = Command::from_bytes(&bytes).unwrap();
831 assert_eq!(restored.key, "key1");
832 }
833
834 #[test]
835 fn test_state_machine_set_get() {
836 let sm = StateMachine::new();
837
838 let cmd = Command::set("key1", b"value1".to_vec());
839 let result = sm.apply(&cmd, 1);
840 assert!(result.success);
841 assert_eq!(sm.last_applied(), 1);
842
843 let value = sm.get("key1").unwrap();
844 assert_eq!(value, b"value1");
845 }
846
847 #[test]
848 fn test_state_machine_delete() {
849 let sm = StateMachine::new();
850
851 sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
852 assert!(sm.get("key1").is_some());
853
854 sm.apply(&Command::delete("key1"), 2);
855 assert!(sm.get("key1").is_none());
856 }
857
858 #[test]
859 fn test_state_machine_increment() {
860 let sm = StateMachine::new();
861
862 sm.apply(&Command::set("counter", b"0".to_vec()), 1);
863
864 let cmd = Command {
865 command_type: CommandType::Increment,
866 key: "counter".to_string(),
867 value: None,
868 metadata: HashMap::new(),
869 };
870
871 sm.apply(&cmd, 2);
872 let value = sm.get("counter").unwrap();
873 assert_eq!(String::from_utf8(value).unwrap(), "1");
874 }
875
876 #[test]
877 fn test_snapshot() {
878 let sm = StateMachine::new();
879
880 sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
881 sm.apply(&Command::set("key2", b"value2".to_vec()), 2);
882
883 let snapshot = sm.snapshot();
884 assert_eq!(snapshot.last_applied, 2);
885 assert_eq!(snapshot.data.len(), 2);
886
887 let new_sm = StateMachine::new();
888 new_sm.restore(snapshot);
889
890 assert_eq!(new_sm.get("key1").unwrap(), b"value1");
891 assert_eq!(new_sm.get("key2").unwrap(), b"value2");
892 assert_eq!(new_sm.last_applied(), 2);
893 }
894
895 #[test]
896 fn test_duplicate_apply() {
897 let sm = StateMachine::new();
898
899 let result = sm.apply(&Command::set("key1", b"v1".to_vec()), 1);
900 assert!(result.success);
901
902 let result = sm.apply(&Command::set("key1", b"v2".to_vec()), 1);
903 assert!(!result.success);
904
905 assert_eq!(sm.get("key1").unwrap(), b"v1");
906 }
907
908 #[test]
909 fn test_sql_command() {
910 let cmd = Command::sql("INSERT INTO users (name) VALUES ('test')");
911 assert_eq!(cmd.command_type, CommandType::SqlExecute);
912 assert_eq!(cmd.sql_statement(), Some("INSERT INTO users (name) VALUES ('test')"));
913 assert!(cmd.is_write());
914 }
915
916 #[test]
917 fn test_document_commands() {
918 let insert = Command::document_insert("users", b"{\"name\": \"test\"}".to_vec());
919 assert_eq!(insert.command_type, CommandType::DocumentInsert);
920 assert_eq!(insert.key, "users");
921
922 let update = Command::document_update("users", "doc123", b"{\"name\": \"updated\"}".to_vec());
923 assert_eq!(update.command_type, CommandType::DocumentUpdate);
924 assert_eq!(update.metadata.get("doc_id"), Some(&"doc123".to_string()));
925
926 let delete = Command::document_delete("users", "doc123");
927 assert_eq!(delete.command_type, CommandType::DocumentDelete);
928 }
929
930 #[test]
931 fn test_graph_commands() {
932 let node = Command::graph_create_node("Person", b"{\"name\": \"Alice\"}".to_vec());
933 assert_eq!(node.command_type, CommandType::GraphCreateNode);
934 assert_eq!(node.key, "Person");
935
936 let edge = Command::graph_create_edge("KNOWS", "node1", "node2", b"{}".to_vec());
937 assert_eq!(edge.command_type, CommandType::GraphCreateEdge);
938 assert_eq!(edge.metadata.get("from"), Some(&"node1".to_string()));
939 assert_eq!(edge.metadata.get("to"), Some(&"node2".to_string()));
940 }
941
942 #[test]
943 fn test_transaction_commands() {
944 let begin = Command::tx_begin(123);
945 assert_eq!(begin.command_type, CommandType::TransactionBegin);
946 assert_eq!(begin.transaction_id(), Some(123));
947
948 let commit = Command::tx_commit(123);
949 assert_eq!(commit.command_type, CommandType::TransactionCommit);
950
951 let rollback = Command::tx_rollback(123);
952 assert_eq!(rollback.command_type, CommandType::TransactionRollback);
953 }
954
955 #[test]
956 fn test_database_state_machine() {
957 let handler = NoOpDatabaseHandler;
958 let sm = DatabaseStateMachine::new(handler);
959
960 let sql_cmd = Command::sql("INSERT INTO test VALUES (1)");
962 let result = sm.apply(&sql_cmd, 1);
963 assert!(result.success);
964 assert_eq!(result.value, Some(b"OK".to_vec()));
965
966 let doc_cmd = Command::document_insert("users", b"{}".to_vec());
968 let result = sm.apply(&doc_cmd, 2);
969 assert!(result.success);
970 assert_eq!(result.value, Some(b"doc-001".to_vec()));
971
972 let tx_begin = Command::tx_begin(1);
974 let result = sm.apply(&tx_begin, 3);
975 assert!(result.success);
976
977 let tx_commit = Command::tx_commit(1);
978 let result = sm.apply(&tx_commit, 4);
979 assert!(result.success);
980
981 assert_eq!(sm.last_applied(), 4);
982 }
983
984 #[test]
985 fn test_database_state_machine_snapshot() {
986 let handler = NoOpDatabaseHandler;
987 let sm = DatabaseStateMachine::new(handler);
988
989 sm.apply(&Command::set("key1", b"value1".to_vec()), 1);
991 sm.apply(&Command::set("key2", b"value2".to_vec()), 2);
992
993 let snapshot = sm.snapshot();
994 assert_eq!(snapshot.last_applied, 2);
995
996 let handler2 = NoOpDatabaseHandler;
998 let sm2 = DatabaseStateMachine::new(handler2);
999 sm2.restore(snapshot);
1000
1001 assert_eq!(sm2.get("key1"), Some(b"value1".to_vec()));
1002 assert_eq!(sm2.get("key2"), Some(b"value2".to_vec()));
1003 assert_eq!(sm2.last_applied(), 2);
1004 }
1005
1006 #[test]
1007 fn test_compare_and_swap() {
1008 let sm = StateMachine::new();
1009
1010 sm.apply(&Command::set("key1", b"old".to_vec()), 1);
1012
1013 let mut cmd = Command {
1015 command_type: CommandType::CompareAndSwap,
1016 key: "key1".to_string(),
1017 value: Some(b"new".to_vec()),
1018 metadata: HashMap::new(),
1019 };
1020 cmd.metadata.insert("expected".to_string(), "old".to_string());
1021
1022 let result = sm.apply(&cmd, 2);
1023 assert!(result.success);
1024 assert_eq!(result.value, Some(b"true".to_vec()));
1025 assert_eq!(sm.get("key1"), Some(b"new".to_vec()));
1026
1027 cmd.metadata.insert("expected".to_string(), "wrong".to_string());
1029 cmd.value = Some(b"newer".to_vec());
1030 let result = sm.apply(&cmd, 3);
1031 assert!(result.success);
1032 assert_eq!(result.value, Some(b"false".to_vec()));
1033 assert_eq!(sm.get("key1"), Some(b"new".to_vec())); }
1035}