surrealcs_kernel/messages/client/
message.rs1use std::fmt::Debug;
4
5use nanoservices_utils::errors::{NanoServiceError, NanoServiceErrorStatus};
6use tokio::sync::mpsc;
7
8use crate::messages::server::interface::{ServerMessage, ServerTransactionMessage};
9use crate::messages::server::wrapper::WrappedServerMessage;
10
11#[derive(Debug, Clone)]
23pub enum TransactionMessage {
24 TransactionOperation(WrappedServerMessage),
25 Register(mpsc::Sender<TransactionMessage>),
26 Ping((usize, String)),
27 Deregister(usize),
28 CloseConnection,
29 Registered(usize),
31 Unregistered,
32 Error(NanoServiceError),
33}
34
35impl TransactionMessage {
36 pub fn extract_transaction_operation(
41 self,
42 ) -> Result<(usize, ServerTransactionMessage), NanoServiceError> {
43 match self {
44 TransactionMessage::TransactionOperation(op) => {
45 let transaction = match op.message {
46 ServerMessage::Error(trans) => return Err(trans),
47 ServerMessage::SendOperation(trans) => trans,
48 ServerMessage::BeginTransaction(trans) => trans,
49 ServerMessage::CommitTransaction => ServerTransactionMessage::Commit,
50 ServerMessage::RollbackTransaction => ServerTransactionMessage::Rollback,
51 _ => {
52 return Err(NanoServiceError::new(
53 format!("ServerMessage not SendOperation: {:?}", op.message),
54 NanoServiceErrorStatus::Unknown,
55 ))
56 }
57 };
58 let server_id = match op.server_id {
59 Some(id) => id,
60 None => {
61 return Err(NanoServiceError::new(
62 "Server id not found".to_string(),
63 NanoServiceErrorStatus::Unknown,
64 ))
65 }
66 };
67 Ok((server_id, transaction))
68 }
69 TransactionMessage::Error(e) => {
70 tracing::error!("message error: {:?}", e);
71 Err(e)
72 }
73 _ => Err(NanoServiceError::new(
74 "TransactionMessage not TransactionOperation".to_string(),
75 NanoServiceErrorStatus::Unknown,
76 )),
77 }
78 }
79}
80
81#[cfg(test)]
82mod tests {
83
84 use super::*;
85 use crate::messages::server::kv_operations::MessagePut;
86
87 static CONNECTION_ID: &str = "1-1234567890";
88 static _TRANSACTION_ID: &str = "1-1-1234567890";
89
90 #[test]
91 fn test_extract_transaction_operation() {
92 let transaction = ServerTransactionMessage::Put(MessagePut {
93 key: b"key".to_vec(),
94 value: b"value".to_vec(),
95 version: None,
96 });
97 let mut wrapped = WrappedServerMessage::new(
98 1,
99 ServerMessage::SendOperation(transaction),
100 CONNECTION_ID.into(),
101 );
102 wrapped.server_id = Some(1);
103 let message = TransactionMessage::TransactionOperation(wrapped);
104
105 let result = message.extract_transaction_operation().unwrap();
106 assert_eq!(result.0, 1);
107
108 let extracted_transaction = match result.1 {
110 ServerTransactionMessage::Put(transaction) => transaction,
111 _ => panic!("Transaction not Put"),
112 };
113 assert_eq!(extracted_transaction.key, b"key".to_vec());
114 assert_eq!(extracted_transaction.value, b"value".to_vec());
115 }
116
117 #[test]
118 fn test_extract_transaction_operation_error_wrong_transaction_message_type() {
119 let message = TransactionMessage::Register(mpsc::channel(1).0);
120 let result = message.extract_transaction_operation();
121
122 assert!(result.is_err());
123 assert_eq!("TransactionMessage not TransactionOperation", result.err().unwrap().message);
124 }
125
126 #[test]
127 fn test_extract_transaction_no_server_id() {
128 let transaction = ServerTransactionMessage::Put(MessagePut {
129 key: b"key".to_vec(),
130 value: b"value".to_vec(),
131 version: None,
132 });
133 let wrapped = WrappedServerMessage::new(
134 1,
135 ServerMessage::SendOperation(transaction),
136 CONNECTION_ID.into(),
137 );
138 let message = TransactionMessage::TransactionOperation(wrapped);
139 let result = message.extract_transaction_operation();
140
141 assert!(result.is_err());
142 assert_eq!("Server id not found", result.err().unwrap().message);
143 }
144}