surrealcs_kernel/messages/client/
message.rs

1//! Defines the messages to be sent between actors on the same machine. The reason why there is a difference between these
2//! messages and messages sent over TCP is that the channel operators cannot be sent over a network to different machines.
3use std::fmt::Debug;
4
5use nanoservices_utils::errors::{NanoServiceError, NanoServiceErrorStatus};
6use tokio::sync::mpsc;
7
8use crate::messages::server::interface::{ServerMessage, ServerTransactionMessage};
9use crate::messages::server::wrapper::WrappedServerMessage;
10
11/// The message that is to be sent between actors on the same machine.
12///
13/// # Fields
14/// * `TransactionOperation`: a transaction operation that is to be sent to the key value store and thus over a TCP connection
15/// * `Register`: a message to register a transaction actor with the router
16/// * `Ping`: a message to ping the connection to ensure it is still alive (router ID, connection ID)
17/// * `Deregister`: a message to deregister a transaction actor from the router
18/// * `CloseConnection`: a message to close the connection
19/// * `Registered`: a message to confirm that a transaction actor has been registered
20/// * `Unregistered`: a message to confirm that a transaction actor has been deregistered
21/// * `Error`: a message to send an error to the client
22#[derive(Debug, Clone)]
23pub enum TransactionMessage {
24	TransactionOperation(WrappedServerMessage),
25	Register(mpsc::Sender<TransactionMessage>),
26	Ping((usize, String)),
27	Deregister(usize),
28	CloseConnection,
29	// below are returns
30	Registered(usize),
31	Unregistered,
32	Error(NanoServiceError),
33}
34
35impl TransactionMessage {
36	/// Extracts the transaction operation and server ID.
37	///
38	/// # Returns
39	/// A tuple containing the server ID and the transaction operation.
40	pub fn extract_transaction_operation(
41		self,
42	) -> Result<(usize, ServerTransactionMessage), NanoServiceError> {
43		match self {
44			TransactionMessage::TransactionOperation(op) => {
45				let transaction = match op.message {
46					ServerMessage::Error(trans) => return Err(trans),
47					ServerMessage::SendOperation(trans) => trans,
48					ServerMessage::BeginTransaction(trans) => trans,
49					ServerMessage::CommitTransaction => ServerTransactionMessage::Commit,
50					ServerMessage::RollbackTransaction => ServerTransactionMessage::Rollback,
51					_ => {
52						return Err(NanoServiceError::new(
53							format!("ServerMessage not SendOperation: {:?}", op.message),
54							NanoServiceErrorStatus::Unknown,
55						))
56					}
57				};
58				let server_id = match op.server_id {
59					Some(id) => id,
60					None => {
61						return Err(NanoServiceError::new(
62							"Server id not found".to_string(),
63							NanoServiceErrorStatus::Unknown,
64						))
65					}
66				};
67				Ok((server_id, transaction))
68			}
69			TransactionMessage::Error(e) => {
70				tracing::error!("message error: {:?}", e);
71				Err(e)
72			}
73			_ => Err(NanoServiceError::new(
74				"TransactionMessage not TransactionOperation".to_string(),
75				NanoServiceErrorStatus::Unknown,
76			)),
77		}
78	}
79}
80
81#[cfg(test)]
82mod tests {
83
84	use super::*;
85	use crate::messages::server::kv_operations::MessagePut;
86
87	static CONNECTION_ID: &str = "1-1234567890";
88	static _TRANSACTION_ID: &str = "1-1-1234567890";
89
90	#[test]
91	fn test_extract_transaction_operation() {
92		let transaction = ServerTransactionMessage::Put(MessagePut {
93			key: b"key".to_vec(),
94			value: b"value".to_vec(),
95			version: None,
96		});
97		let mut wrapped = WrappedServerMessage::new(
98			1,
99			ServerMessage::SendOperation(transaction),
100			CONNECTION_ID.into(),
101		);
102		wrapped.server_id = Some(1);
103		let message = TransactionMessage::TransactionOperation(wrapped);
104
105		let result = message.extract_transaction_operation().unwrap();
106		assert_eq!(result.0, 1);
107
108		// extract the transaction
109		let extracted_transaction = match result.1 {
110			ServerTransactionMessage::Put(transaction) => transaction,
111			_ => panic!("Transaction not Put"),
112		};
113		assert_eq!(extracted_transaction.key, b"key".to_vec());
114		assert_eq!(extracted_transaction.value, b"value".to_vec());
115	}
116
117	#[test]
118	fn test_extract_transaction_operation_error_wrong_transaction_message_type() {
119		let message = TransactionMessage::Register(mpsc::channel(1).0);
120		let result = message.extract_transaction_operation();
121
122		assert!(result.is_err());
123		assert_eq!("TransactionMessage not TransactionOperation", result.err().unwrap().message);
124	}
125
126	#[test]
127	fn test_extract_transaction_no_server_id() {
128		let transaction = ServerTransactionMessage::Put(MessagePut {
129			key: b"key".to_vec(),
130			value: b"value".to_vec(),
131			version: None,
132		});
133		let wrapped = WrappedServerMessage::new(
134			1,
135			ServerMessage::SendOperation(transaction),
136			CONNECTION_ID.into(),
137		);
138		let message = TransactionMessage::TransactionOperation(wrapped);
139		let result = message.extract_transaction_operation();
140
141		assert!(result.is_err());
142		assert_eq!("Server id not found", result.err().unwrap().message);
143	}
144}