use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use crc32fast::Hasher;
use num_derive::{FromPrimitive, ToPrimitive};
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use serde_json::Value;
static TRANSACTION_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
pub fn next_transaction_id() -> u32 {
TRANSACTION_ID_COUNTER.fetch_add(1, Ordering::SeqCst)
}
pub fn current_timecode() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize_repr, Deserialize_repr, FromPrimitive, ToPrimitive)]
#[repr(u8)]
pub enum MessageType {
NoOp = 0,
Response = 1,
Read = 2,
Write = 3,
Subscribe = 4,
Unsubscribe = 5,
Broadcast = 6,
Heartbeat = 7,
Control = 8,
Request = 10,
}
impl Default for MessageType {
fn default() -> Self {
MessageType::NoOp
}
}
impl From<u8> for MessageType {
fn from(value: u8) -> Self {
match value {
0 => MessageType::NoOp,
1 => MessageType::Response,
2 => MessageType::Read,
3 => MessageType::Write,
4 => MessageType::Subscribe,
5 => MessageType::Unsubscribe,
6 => MessageType::Broadcast,
7 => MessageType::Heartbeat,
8 => MessageType::Control,
10 => MessageType::Request,
_ => MessageType::NoOp,
}
}
}
impl From<MessageType> for u8 {
fn from(value: MessageType) -> Self {
value as u8
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommandMessage {
pub transaction_id: u32,
pub timecode: i64,
pub topic: String,
pub message_type: MessageType,
pub data: Value,
#[serde(default)]
pub crc: u32,
#[serde(default)]
pub success: bool,
#[serde(default)]
pub error_message: String,
}
impl Default for CommandMessage {
fn default() -> Self {
Self::new()
}
}
impl CommandMessage {
pub fn new() -> Self {
Self {
transaction_id: next_transaction_id(),
timecode: current_timecode(),
topic: String::new(),
message_type: MessageType::NoOp,
data: Value::Null,
crc: 0,
success: false,
error_message: String::new(),
}
}
pub fn request(topic: &str, data: Value) -> Self {
Self {
transaction_id: next_transaction_id(),
timecode: current_timecode(),
topic: topic.to_string(),
message_type: MessageType::Request,
data,
crc: 0,
success: false,
error_message: String::new(),
}
}
pub fn read(topic: &str) -> Self {
Self {
transaction_id: next_transaction_id(),
timecode: current_timecode(),
topic: topic.to_string(),
message_type: MessageType::Read,
data: Value::Null,
crc: 0,
success: false,
error_message: String::new(),
}
}
pub fn write(topic: &str, data: Value) -> Self {
Self {
transaction_id: next_transaction_id(),
timecode: current_timecode(),
topic: topic.to_string(),
message_type: MessageType::Write,
data,
crc: 0,
success: false,
error_message: String::new(),
}
}
pub fn subscribe(topic: &str) -> Self {
Self {
transaction_id: next_transaction_id(),
timecode: current_timecode(),
topic: topic.to_string(),
message_type: MessageType::Subscribe,
data: Value::Null,
crc: 0,
success: false,
error_message: String::new(),
}
}
pub fn unsubscribe(topic: &str) -> Self {
Self {
transaction_id: next_transaction_id(),
timecode: current_timecode(),
topic: topic.to_string(),
message_type: MessageType::Unsubscribe,
data: Value::Null,
crc: 0,
success: false,
error_message: String::new(),
}
}
pub fn response(transaction_id: u32, data: Value) -> Self {
Self {
transaction_id,
timecode: current_timecode(),
topic: String::new(),
message_type: MessageType::Response,
data,
crc: 0,
success: true,
error_message: String::new(),
}
}
pub fn error_response(transaction_id: u32, error: &str) -> Self {
Self {
transaction_id,
timecode: current_timecode(),
topic: String::new(),
message_type: MessageType::Response,
data: Value::Null,
crc: 0,
success: false,
error_message: error.to_string(),
}
}
pub fn broadcast(topic: &str, data: Value) -> Self {
Self {
transaction_id: next_transaction_id(),
timecode: current_timecode(),
topic: topic.to_string(),
message_type: MessageType::Broadcast,
data,
crc: 0,
success: true,
error_message: String::new(),
}
}
pub fn heartbeat() -> Self {
Self {
transaction_id: next_transaction_id(),
timecode: current_timecode(),
topic: String::new(),
message_type: MessageType::Heartbeat,
data: Value::Null,
crc: 0,
success: true,
error_message: String::new(),
}
}
pub fn control(topic: &str, data: Value) -> Self {
Self {
transaction_id: next_transaction_id(),
timecode: current_timecode(),
topic: topic.to_string(),
message_type: MessageType::Control,
data,
crc: 0,
success: false,
error_message: String::new(),
}
}
pub fn with_transaction_id(mut self, id: u32) -> Self {
self.transaction_id = id;
self
}
pub fn with_topic(mut self, topic: &str) -> Self {
self.topic = topic.to_string();
self
}
pub fn with_data(mut self, data: Value) -> Self {
self.data = data;
self
}
pub fn with_success(mut self, data: Value) -> Self {
self.success = true;
self.data = data;
self.error_message.clear();
self
}
pub fn with_error(mut self, error: &str) -> Self {
self.success = false;
self.error_message = error.to_string();
self
}
pub fn set_success(&mut self, data: Value) {
self.message_type = MessageType::Response;
self.success = true;
self.data = data;
self.error_message.clear();
}
pub fn set_error(&mut self, error: &str) {
self.message_type = MessageType::Response;
self.success = false;
self.error_message = error.to_string();
}
pub fn into_response(mut self, data: Value) -> Self {
self.message_type = MessageType::Response;
self.timecode = current_timecode();
self.success = true;
self.data = data;
self.error_message.clear();
self
}
pub fn into_error_response(mut self, error: &str) -> Self {
self.message_type = MessageType::Response;
self.timecode = current_timecode();
self.success = false;
self.error_message = error.to_string();
self
}
pub fn is_response(&self) -> bool {
self.message_type == MessageType::Response
}
pub fn is_broadcast(&self) -> bool {
self.message_type == MessageType::Broadcast
}
pub fn is_heartbeat(&self) -> bool {
self.message_type == MessageType::Heartbeat
}
pub fn is_request(&self) -> bool {
matches!(
self.message_type,
MessageType::Read
| MessageType::Write
| MessageType::Subscribe
| MessageType::Unsubscribe
| MessageType::Request
)
}
pub fn domain(&self) -> &str {
self.topic.split('.').next().unwrap_or("")
}
pub fn subtopic(&self) -> String {
self.topic.split_once('.')
.map(|(_, rest)| rest.to_ascii_lowercase())
.unwrap_or_default()
}
pub fn compute_crc(&self) -> u32 {
let mut hasher = Hasher::new();
hasher.update(&self.transaction_id.to_le_bytes());
hasher.update(&self.timecode.to_le_bytes());
hasher.update(self.topic.as_bytes());
hasher.update(&[self.message_type as u8]);
if let Ok(data_bytes) = serde_json::to_vec(&self.data) {
hasher.update(&data_bytes);
}
hasher.update(&[self.success as u8]);
hasher.update(self.error_message.as_bytes());
hasher.finalize()
}
pub fn update_crc(&mut self) {
self.crc = self.compute_crc();
}
pub fn verify_crc(&self) -> bool {
self.crc == 0 || self.crc == self.compute_crc()
}
}
#[deprecated(since = "0.6.0", note = "Use CommandMessage with topic field instead")]
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct Action {
pub domain: String,
pub fname: String,
pub args: serde_json::Value,
}
#[allow(deprecated)]
impl Action {
pub fn new() -> Self {
Self {
domain: String::new(),
fname: String::new(),
args: serde_json::Value::Null,
}
}
pub fn to_topic(&self) -> String {
if self.fname.is_empty() {
self.domain.to_lowercase()
} else {
format!("{}.{}", self.domain.to_lowercase(), self.fname.to_lowercase())
}
}
}
#[deprecated(since = "0.6.0", note = "Use CommandMessage.success and CommandMessage.error_message directly")]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommandMessageResult {
pub valid: bool,
pub data: Value,
pub success: bool,
pub error_message: String,
}
#[allow(deprecated)]
impl Default for CommandMessageResult {
fn default() -> Self {
CommandMessageResult {
valid: false,
data: Value::Null,
success: false,
error_message: String::new(),
}
}
}
#[allow(deprecated)]
impl CommandMessageResult {
pub fn new() -> Self {
Self::default()
}
pub fn success(data: Value) -> Self {
Self {
valid: true,
data,
success: true,
error_message: String::new(),
}
}
pub fn error(message: &str) -> Self {
Self {
valid: true,
data: Value::Null,
success: false,
error_message: message.to_string(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_read_request() {
let msg = CommandMessage::read("modbus.server1.holding_0");
assert_eq!(msg.message_type, MessageType::Read);
assert_eq!(msg.topic, "modbus.server1.holding_0");
assert_eq!(msg.domain(), "modbus");
assert_eq!(msg.subtopic(), "server1.holding_0");
assert!(msg.is_request());
assert!(!msg.is_response());
}
#[test]
fn test_write_request() {
let msg = CommandMessage::write("gnv.app.theme", serde_json::json!("dark"));
assert_eq!(msg.message_type, MessageType::Write);
assert_eq!(msg.data, serde_json::json!("dark"));
assert!(msg.is_request());
}
#[test]
fn test_response() {
let msg = CommandMessage::response(42, serde_json::json!({"value": 123}));
assert_eq!(msg.transaction_id, 42);
assert!(msg.is_response());
assert!(msg.success);
assert_eq!(msg.data, serde_json::json!({"value": 123}));
}
#[test]
fn test_error_response() {
let msg = CommandMessage::error_response(42, "Something went wrong");
assert_eq!(msg.transaction_id, 42);
assert!(msg.is_response());
assert!(!msg.success);
assert_eq!(msg.error_message, "Something went wrong");
}
#[test]
fn test_broadcast() {
let msg = CommandMessage::broadcast("ads.plc1.notifications", serde_json::json!({"symbol": "test"}));
assert!(msg.is_broadcast());
assert!(msg.success);
}
#[test]
fn test_into_response() {
let request = CommandMessage::read("test.value");
let tid = request.transaction_id;
let response = request.into_response(serde_json::json!(42));
assert_eq!(response.transaction_id, tid);
assert!(response.is_response());
assert!(response.success);
assert_eq!(response.data, serde_json::json!(42));
}
#[test]
fn test_into_error_response() {
let request = CommandMessage::read("test.value");
let tid = request.transaction_id;
let response = request.into_error_response("Not found");
assert_eq!(response.transaction_id, tid);
assert!(response.is_response());
assert!(!response.success);
assert_eq!(response.error_message, "Not found");
}
#[test]
fn test_crc() {
let mut msg = CommandMessage::read("test.topic");
msg.update_crc();
assert!(msg.verify_crc());
assert_ne!(msg.crc, 0);
let original_crc = msg.crc;
msg.topic = "modified.topic".to_string();
assert_ne!(msg.compute_crc(), original_crc);
}
#[test]
fn test_serialization() {
let msg = CommandMessage::write("test.value", serde_json::json!({"key": "value"}));
let json = serde_json::to_string(&msg).unwrap();
let decoded: CommandMessage = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.topic, msg.topic);
assert_eq!(decoded.message_type, msg.message_type);
assert_eq!(decoded.data, msg.data);
}
#[test]
fn test_domain_extraction() {
let msg = CommandMessage::read("ads.plc1.GM.stDataResults");
assert_eq!(msg.domain(), "ads");
assert_eq!(msg.subtopic(), "plc1.gm.stdataresults");
let msg2 = CommandMessage::read("simple");
assert_eq!(msg2.domain(), "simple");
assert_eq!(msg2.subtopic(), "");
}
}