use serde::{Deserialize, Serialize};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OdinMessage {
pub id: String,
pub message_type: MessageType,
pub source_node: String,
pub target_node: String,
pub content: String,
pub priority: MessagePriority,
pub timestamp: u64,
pub metadata: std::collections::HashMap<String, String>,
pub sequence: u64,
pub checksum: Option<String>,
}
impl OdinMessage {
pub fn new(
message_type: MessageType,
source_node: &str,
target_node: &str,
content: &str,
priority: MessagePriority,
) -> Self {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
Self {
id: uuid::Uuid::new_v4().to_string(),
message_type,
source_node: source_node.to_string(),
target_node: target_node.to_string(),
content: content.to_string(),
priority,
timestamp,
metadata: std::collections::HashMap::new(),
sequence: 0,
checksum: None,
}
}
pub fn with_metadata(mut self, key: String, value: String) -> Self {
self.metadata.insert(key, value);
self
}
pub fn with_sequence(mut self, sequence: u64) -> Self {
self.sequence = sequence;
self
}
pub fn with_checksum(mut self) -> Self {
self.checksum = Some(self.calculate_checksum());
self
}
pub fn validate(&self) -> bool {
if let Some(checksum) = &self.checksum {
&self.calculate_checksum() == checksum
} else {
true }
}
fn calculate_checksum(&self) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
self.id.hash(&mut hasher);
self.source_node.hash(&mut hasher);
self.target_node.hash(&mut hasher);
self.content.hash(&mut hasher);
self.timestamp.hash(&mut hasher);
format!("{:x}", hasher.finish())
}
pub fn size(&self) -> usize {
serde_json::to_string(self)
.map(|s| s.len())
.unwrap_or(0)
}
pub fn is_expired(&self, ttl_seconds: u64) -> bool {
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
current_time > self.timestamp + ttl_seconds
}
pub fn create_reply(&self, content: &str, priority: MessagePriority) -> Self {
Self::new(
MessageType::Reply,
&self.target_node,
&self.source_node,
content,
priority,
)
.with_metadata("reply_to".to_string(), self.id.clone())
}
pub fn create_ack(&self) -> Self {
Self::new(
MessageType::Acknowledgment,
&self.target_node,
&self.source_node,
"ack",
MessagePriority::Low,
)
.with_metadata("ack_for".to_string(), self.id.clone())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum MessageType {
Standard,
Broadcast,
Reply,
Acknowledgment,
Heartbeat,
System,
Error,
}
impl std::fmt::Display for MessageType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MessageType::Standard => write!(f, "standard"),
MessageType::Broadcast => write!(f, "broadcast"),
MessageType::Reply => write!(f, "reply"),
MessageType::Acknowledgment => write!(f, "ack"),
MessageType::Heartbeat => write!(f, "heartbeat"),
MessageType::System => write!(f, "system"),
MessageType::Error => write!(f, "error"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum MessagePriority {
Low = 0,
Normal = 1,
High = 2,
Critical = 3,
}
impl std::fmt::Display for MessagePriority {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MessagePriority::Low => write!(f, "low"),
MessagePriority::Normal => write!(f, "normal"),
MessagePriority::High => write!(f, "high"),
MessagePriority::Critical => write!(f, "critical"),
}
}
}
impl Default for MessagePriority {
fn default() -> Self {
MessagePriority::Normal
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageBatch {
pub batch_id: String,
pub messages: Vec<OdinMessage>,
pub timestamp: u64,
pub metadata: std::collections::HashMap<String, String>,
}
impl MessageBatch {
pub fn new() -> Self {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
Self {
batch_id: uuid::Uuid::new_v4().to_string(),
messages: Vec::new(),
timestamp,
metadata: std::collections::HashMap::new(),
}
}
pub fn add_message(mut self, message: OdinMessage) -> Self {
self.messages.push(message);
self
}
pub fn len(&self) -> usize {
self.messages.len()
}
pub fn is_empty(&self) -> bool {
self.messages.is_empty()
}
pub fn total_size(&self) -> usize {
self.messages.iter().map(|m| m.size()).sum::<usize>()
+ serde_json::to_string(&self.batch_id).unwrap_or_default().len()
+ 8 }
pub fn split(self, max_size: usize) -> Vec<MessageBatch> {
let mut batches = Vec::new();
let mut current_batch = MessageBatch::new();
for message in self.messages {
if current_batch.len() >= max_size && !current_batch.is_empty() {
batches.push(current_batch);
current_batch = MessageBatch::new();
}
current_batch = current_batch.add_message(message);
}
if !current_batch.is_empty() {
batches.push(current_batch);
}
batches
}
}
impl Default for MessageBatch {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct MessageFilter {
pub message_type: Option<MessageType>,
pub source_pattern: Option<String>,
pub target_pattern: Option<String>,
pub min_priority: Option<MessagePriority>,
pub content_pattern: Option<String>,
pub max_age_seconds: Option<u64>,
}
impl MessageFilter {
pub fn new() -> Self {
Self {
message_type: None,
source_pattern: None,
target_pattern: None,
min_priority: None,
content_pattern: None,
max_age_seconds: None,
}
}
pub fn with_type(mut self, message_type: MessageType) -> Self {
self.message_type = Some(message_type);
self
}
pub fn with_source(mut self, pattern: String) -> Self {
self.source_pattern = Some(pattern);
self
}
pub fn with_target(mut self, pattern: String) -> Self {
self.target_pattern = Some(pattern);
self
}
pub fn with_min_priority(mut self, priority: MessagePriority) -> Self {
self.min_priority = Some(priority);
self
}
pub fn with_content(mut self, pattern: String) -> Self {
self.content_pattern = Some(pattern);
self
}
pub fn with_max_age(mut self, seconds: u64) -> Self {
self.max_age_seconds = Some(seconds);
self
}
pub fn matches(&self, message: &OdinMessage) -> bool {
if let Some(msg_type) = self.message_type {
if message.message_type != msg_type {
return false;
}
}
if let Some(pattern) = &self.source_pattern {
if !message.source_node.contains(pattern) {
return false;
}
}
if let Some(pattern) = &self.target_pattern {
if !message.target_node.contains(pattern) {
return false;
}
}
if let Some(min_priority) = self.min_priority {
if message.priority < min_priority {
return false;
}
}
if let Some(pattern) = &self.content_pattern {
if !message.content.contains(pattern) {
return false;
}
}
if let Some(max_age) = self.max_age_seconds {
if message.is_expired(max_age) {
return false;
}
}
true
}
}
impl Default for MessageFilter {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn test_message_creation() {
let message = OdinMessage::new(
MessageType::Standard,
"source-node",
"target-node",
"Hello, World!",
MessagePriority::Normal,
);
assert!(!message.id.is_empty());
assert_eq!(message.message_type, MessageType::Standard);
assert_eq!(message.source_node, "source-node");
assert_eq!(message.target_node, "target-node");
assert_eq!(message.content, "Hello, World!");
assert_eq!(message.priority, MessagePriority::Normal);
assert!(message.timestamp > 0);
}
#[test]
fn test_message_with_metadata() {
let message = OdinMessage::new(
MessageType::Standard,
"source",
"target",
"content",
MessagePriority::Normal,
)
.with_metadata("key1".to_string(), "value1".to_string())
.with_metadata("key2".to_string(), "value2".to_string());
assert_eq!(message.metadata.len(), 2);
assert_eq!(message.metadata.get("key1"), Some(&"value1".to_string()));
assert_eq!(message.metadata.get("key2"), Some(&"value2".to_string()));
}
#[test]
fn test_message_checksum() {
let message = OdinMessage::new(
MessageType::Standard,
"source",
"target",
"content",
MessagePriority::Normal,
)
.with_checksum();
assert!(message.checksum.is_some());
assert!(message.validate());
}
#[test]
fn test_message_expiration() {
let mut message = OdinMessage::new(
MessageType::Standard,
"source",
"target",
"content",
MessagePriority::Normal,
);
message.timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs() - 10;
assert!(message.is_expired(5)); assert!(!message.is_expired(15)); }
#[test]
fn test_message_reply() {
let original = OdinMessage::new(
MessageType::Standard,
"alice",
"bob",
"Hello",
MessagePriority::Normal,
);
let reply = original.create_reply("Hello back!", MessagePriority::Normal);
assert_eq!(reply.message_type, MessageType::Reply);
assert_eq!(reply.source_node, "bob");
assert_eq!(reply.target_node, "alice");
assert_eq!(reply.content, "Hello back!");
assert_eq!(reply.metadata.get("reply_to"), Some(&original.id));
}
#[test]
fn test_message_batch() {
let mut batch = MessageBatch::new();
let message1 = OdinMessage::new(
MessageType::Standard,
"source",
"target1",
"message 1",
MessagePriority::Normal,
);
let message2 = OdinMessage::new(
MessageType::Standard,
"source",
"target2",
"message 2",
MessagePriority::High,
);
batch = batch
.add_message(message1)
.add_message(message2);
assert_eq!(batch.len(), 2);
assert!(!batch.is_empty());
assert!(batch.total_size() > 0);
}
#[test]
fn test_message_filter() {
let message = OdinMessage::new(
MessageType::Standard,
"alice",
"bob",
"hello world",
MessagePriority::High,
);
let filter = MessageFilter::new()
.with_type(MessageType::Standard)
.with_source("alice".to_string())
.with_min_priority(MessagePriority::Normal);
assert!(filter.matches(&message));
let strict_filter = MessageFilter::new()
.with_min_priority(MessagePriority::Critical);
assert!(!strict_filter.matches(&message));
}
#[test]
fn test_priority_ordering() {
assert!(MessagePriority::Critical > MessagePriority::High);
assert!(MessagePriority::High > MessagePriority::Normal);
assert!(MessagePriority::Normal > MessagePriority::Low);
}
}