odin_protocol/
message.rs

1//! Message types and utilities for ODIN Protocol
2
3use serde::{Deserialize, Serialize};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6/// ODIN Protocol message
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct OdinMessage {
9    /// Unique message identifier
10    pub id: String,
11    /// Message type
12    pub message_type: MessageType,
13    /// Source node identifier
14    pub source_node: String,
15    /// Target node identifier
16    pub target_node: String,
17    /// Message content
18    pub content: String,
19    /// Message priority
20    pub priority: MessagePriority,
21    /// Timestamp when message was created
22    pub timestamp: u64,
23    /// Message metadata
24    pub metadata: std::collections::HashMap<String, String>,
25    /// Message sequence number
26    pub sequence: u64,
27    /// Checksum for integrity verification
28    pub checksum: Option<String>,
29}
30
31impl OdinMessage {
32    /// Create a new ODIN message
33    pub fn new(
34        message_type: MessageType,
35        source_node: &str,
36        target_node: &str,
37        content: &str,
38        priority: MessagePriority,
39    ) -> Self {
40        let timestamp = SystemTime::now()
41            .duration_since(UNIX_EPOCH)
42            .unwrap_or_default()
43            .as_secs();
44            
45        Self {
46            id: uuid::Uuid::new_v4().to_string(),
47            message_type,
48            source_node: source_node.to_string(),
49            target_node: target_node.to_string(),
50            content: content.to_string(),
51            priority,
52            timestamp,
53            metadata: std::collections::HashMap::new(),
54            sequence: 0,
55            checksum: None,
56        }
57    }
58    
59    /// Add metadata to the message
60    pub fn with_metadata(mut self, key: String, value: String) -> Self {
61        self.metadata.insert(key, value);
62        self
63    }
64    
65    /// Set sequence number
66    pub fn with_sequence(mut self, sequence: u64) -> Self {
67        self.sequence = sequence;
68        self
69    }
70    
71    /// Calculate and set checksum
72    pub fn with_checksum(mut self) -> Self {
73        self.checksum = Some(self.calculate_checksum());
74        self
75    }
76    
77    /// Validate message integrity
78    pub fn validate(&self) -> bool {
79        if let Some(checksum) = &self.checksum {
80            &self.calculate_checksum() == checksum
81        } else {
82            true // No checksum to validate
83        }
84    }
85    
86    /// Calculate message checksum
87    fn calculate_checksum(&self) -> String {
88        use std::collections::hash_map::DefaultHasher;
89        use std::hash::{Hash, Hasher};
90        
91        let mut hasher = DefaultHasher::new();
92        self.id.hash(&mut hasher);
93        self.source_node.hash(&mut hasher);
94        self.target_node.hash(&mut hasher);
95        self.content.hash(&mut hasher);
96        self.timestamp.hash(&mut hasher);
97        
98        format!("{:x}", hasher.finish())
99    }
100    
101    /// Get message size in bytes
102    pub fn size(&self) -> usize {
103        serde_json::to_string(self)
104            .map(|s| s.len())
105            .unwrap_or(0)
106    }
107    
108    /// Check if message is expired based on TTL
109    pub fn is_expired(&self, ttl_seconds: u64) -> bool {
110        let current_time = SystemTime::now()
111            .duration_since(UNIX_EPOCH)
112            .unwrap_or_default()
113            .as_secs();
114            
115        current_time > self.timestamp + ttl_seconds
116    }
117    
118    /// Create a reply message
119    pub fn create_reply(&self, content: &str, priority: MessagePriority) -> Self {
120        Self::new(
121            MessageType::Reply,
122            &self.target_node,
123            &self.source_node,
124            content,
125            priority,
126        )
127        .with_metadata("reply_to".to_string(), self.id.clone())
128    }
129    
130    /// Create an acknowledgment message
131    pub fn create_ack(&self) -> Self {
132        Self::new(
133            MessageType::Acknowledgment,
134            &self.target_node,
135            &self.source_node,
136            "ack",
137            MessagePriority::Low,
138        )
139        .with_metadata("ack_for".to_string(), self.id.clone())
140    }
141}
142
143/// Message type enumeration
144#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
145pub enum MessageType {
146    /// Standard message
147    Standard,
148    /// Broadcast message
149    Broadcast,
150    /// Reply to a previous message
151    Reply,
152    /// Acknowledgment message
153    Acknowledgment,
154    /// Heartbeat message
155    Heartbeat,
156    /// System control message
157    System,
158    /// Error message
159    Error,
160}
161
162impl std::fmt::Display for MessageType {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        match self {
165            MessageType::Standard => write!(f, "standard"),
166            MessageType::Broadcast => write!(f, "broadcast"),
167            MessageType::Reply => write!(f, "reply"),
168            MessageType::Acknowledgment => write!(f, "ack"),
169            MessageType::Heartbeat => write!(f, "heartbeat"),
170            MessageType::System => write!(f, "system"),
171            MessageType::Error => write!(f, "error"),
172        }
173    }
174}
175
176/// Message priority levels
177#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
178pub enum MessagePriority {
179    /// Low priority (best effort)
180    Low = 0,
181    /// Normal priority
182    Normal = 1,
183    /// High priority
184    High = 2,
185    /// Critical priority (immediate delivery)
186    Critical = 3,
187}
188
189impl std::fmt::Display for MessagePriority {
190    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191        match self {
192            MessagePriority::Low => write!(f, "low"),
193            MessagePriority::Normal => write!(f, "normal"),
194            MessagePriority::High => write!(f, "high"),
195            MessagePriority::Critical => write!(f, "critical"),
196        }
197    }
198}
199
200impl Default for MessagePriority {
201    fn default() -> Self {
202        MessagePriority::Normal
203    }
204}
205
206/// Message batch for efficient bulk operations
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct MessageBatch {
209    /// Batch identifier
210    pub batch_id: String,
211    /// Messages in the batch
212    pub messages: Vec<OdinMessage>,
213    /// Batch creation timestamp
214    pub timestamp: u64,
215    /// Batch metadata
216    pub metadata: std::collections::HashMap<String, String>,
217}
218
219impl MessageBatch {
220    /// Create a new message batch
221    pub fn new() -> Self {
222        let timestamp = SystemTime::now()
223            .duration_since(UNIX_EPOCH)
224            .unwrap_or_default()
225            .as_secs();
226            
227        Self {
228            batch_id: uuid::Uuid::new_v4().to_string(),
229            messages: Vec::new(),
230            timestamp,
231            metadata: std::collections::HashMap::new(),
232        }
233    }
234    
235    /// Add a message to the batch
236    pub fn add_message(mut self, message: OdinMessage) -> Self {
237        self.messages.push(message);
238        self
239    }
240    
241    /// Get batch size in messages
242    pub fn len(&self) -> usize {
243        self.messages.len()
244    }
245    
246    /// Check if batch is empty
247    pub fn is_empty(&self) -> bool {
248        self.messages.is_empty()
249    }
250    
251    /// Get total size in bytes
252    pub fn total_size(&self) -> usize {
253        self.messages.iter().map(|m| m.size()).sum::<usize>()
254            + serde_json::to_string(&self.batch_id).unwrap_or_default().len()
255            + 8 // timestamp size
256    }
257    
258    /// Split batch into smaller batches
259    pub fn split(self, max_size: usize) -> Vec<MessageBatch> {
260        let mut batches = Vec::new();
261        let mut current_batch = MessageBatch::new();
262        
263        for message in self.messages {
264            if current_batch.len() >= max_size && !current_batch.is_empty() {
265                batches.push(current_batch);
266                current_batch = MessageBatch::new();
267            }
268            current_batch = current_batch.add_message(message);
269        }
270        
271        if !current_batch.is_empty() {
272            batches.push(current_batch);
273        }
274        
275        batches
276    }
277}
278
279impl Default for MessageBatch {
280    fn default() -> Self {
281        Self::new()
282    }
283}
284
285/// Message filter for selective message processing
286#[derive(Debug, Clone)]
287pub struct MessageFilter {
288    /// Filter by message type
289    pub message_type: Option<MessageType>,
290    /// Filter by source node pattern
291    pub source_pattern: Option<String>,
292    /// Filter by target node pattern
293    pub target_pattern: Option<String>,
294    /// Filter by minimum priority
295    pub min_priority: Option<MessagePriority>,
296    /// Filter by content pattern
297    pub content_pattern: Option<String>,
298    /// Filter by age (max seconds)
299    pub max_age_seconds: Option<u64>,
300}
301
302impl MessageFilter {
303    /// Create a new empty filter
304    pub fn new() -> Self {
305        Self {
306            message_type: None,
307            source_pattern: None,
308            target_pattern: None,
309            min_priority: None,
310            content_pattern: None,
311            max_age_seconds: None,
312        }
313    }
314    
315    /// Filter by message type
316    pub fn with_type(mut self, message_type: MessageType) -> Self {
317        self.message_type = Some(message_type);
318        self
319    }
320    
321    /// Filter by source pattern
322    pub fn with_source(mut self, pattern: String) -> Self {
323        self.source_pattern = Some(pattern);
324        self
325    }
326    
327    /// Filter by target pattern
328    pub fn with_target(mut self, pattern: String) -> Self {
329        self.target_pattern = Some(pattern);
330        self
331    }
332    
333    /// Filter by minimum priority
334    pub fn with_min_priority(mut self, priority: MessagePriority) -> Self {
335        self.min_priority = Some(priority);
336        self
337    }
338    
339    /// Filter by content pattern
340    pub fn with_content(mut self, pattern: String) -> Self {
341        self.content_pattern = Some(pattern);
342        self
343    }
344    
345    /// Filter by maximum age
346    pub fn with_max_age(mut self, seconds: u64) -> Self {
347        self.max_age_seconds = Some(seconds);
348        self
349    }
350    
351    /// Check if message matches the filter
352    pub fn matches(&self, message: &OdinMessage) -> bool {
353        // Check message type
354        if let Some(msg_type) = self.message_type {
355            if message.message_type != msg_type {
356                return false;
357            }
358        }
359        
360        // Check source pattern
361        if let Some(pattern) = &self.source_pattern {
362            if !message.source_node.contains(pattern) {
363                return false;
364            }
365        }
366        
367        // Check target pattern
368        if let Some(pattern) = &self.target_pattern {
369            if !message.target_node.contains(pattern) {
370                return false;
371            }
372        }
373        
374        // Check minimum priority
375        if let Some(min_priority) = self.min_priority {
376            if message.priority < min_priority {
377                return false;
378            }
379        }
380        
381        // Check content pattern
382        if let Some(pattern) = &self.content_pattern {
383            if !message.content.contains(pattern) {
384                return false;
385            }
386        }
387        
388        // Check max age
389        if let Some(max_age) = self.max_age_seconds {
390            if message.is_expired(max_age) {
391                return false;
392            }
393        }
394        
395        true
396    }
397}
398
399impl Default for MessageFilter {
400    fn default() -> Self {
401        Self::new()
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use std::thread;
409    use std::time::Duration;
410    
411    #[test]
412    fn test_message_creation() {
413        let message = OdinMessage::new(
414            MessageType::Standard,
415            "source-node",
416            "target-node",
417            "Hello, World!",
418            MessagePriority::Normal,
419        );
420        
421        assert!(!message.id.is_empty());
422        assert_eq!(message.message_type, MessageType::Standard);
423        assert_eq!(message.source_node, "source-node");
424        assert_eq!(message.target_node, "target-node");
425        assert_eq!(message.content, "Hello, World!");
426        assert_eq!(message.priority, MessagePriority::Normal);
427        assert!(message.timestamp > 0);
428    }
429    
430    #[test]
431    fn test_message_with_metadata() {
432        let message = OdinMessage::new(
433            MessageType::Standard,
434            "source",
435            "target",
436            "content",
437            MessagePriority::Normal,
438        )
439        .with_metadata("key1".to_string(), "value1".to_string())
440        .with_metadata("key2".to_string(), "value2".to_string());
441        
442        assert_eq!(message.metadata.len(), 2);
443        assert_eq!(message.metadata.get("key1"), Some(&"value1".to_string()));
444        assert_eq!(message.metadata.get("key2"), Some(&"value2".to_string()));
445    }
446    
447    #[test]
448    fn test_message_checksum() {
449        let message = OdinMessage::new(
450            MessageType::Standard,
451            "source",
452            "target",
453            "content",
454            MessagePriority::Normal,
455        )
456        .with_checksum();
457        
458        assert!(message.checksum.is_some());
459        assert!(message.validate());
460    }
461    
462    #[test]
463    fn test_message_expiration() {
464        let mut message = OdinMessage::new(
465            MessageType::Standard,
466            "source",
467            "target",
468            "content",
469            MessagePriority::Normal,
470        );
471        
472        // Set timestamp to 10 seconds ago
473        message.timestamp = SystemTime::now()
474            .duration_since(UNIX_EPOCH)
475            .unwrap_or_default()
476            .as_secs() - 10;
477        
478        assert!(message.is_expired(5)); // Should be expired with 5 second TTL
479        assert!(!message.is_expired(15)); // Should not be expired with 15 second TTL
480    }
481    
482    #[test]
483    fn test_message_reply() {
484        let original = OdinMessage::new(
485            MessageType::Standard,
486            "alice",
487            "bob",
488            "Hello",
489            MessagePriority::Normal,
490        );
491        
492        let reply = original.create_reply("Hello back!", MessagePriority::Normal);
493        
494        assert_eq!(reply.message_type, MessageType::Reply);
495        assert_eq!(reply.source_node, "bob");
496        assert_eq!(reply.target_node, "alice");
497        assert_eq!(reply.content, "Hello back!");
498        assert_eq!(reply.metadata.get("reply_to"), Some(&original.id));
499    }
500    
501    #[test]
502    fn test_message_batch() {
503        let mut batch = MessageBatch::new();
504        
505        let message1 = OdinMessage::new(
506            MessageType::Standard,
507            "source",
508            "target1",
509            "message 1",
510            MessagePriority::Normal,
511        );
512        
513        let message2 = OdinMessage::new(
514            MessageType::Standard,
515            "source",
516            "target2",
517            "message 2",
518            MessagePriority::High,
519        );
520        
521        batch = batch
522            .add_message(message1)
523            .add_message(message2);
524        
525        assert_eq!(batch.len(), 2);
526        assert!(!batch.is_empty());
527        assert!(batch.total_size() > 0);
528    }
529    
530    #[test]
531    fn test_message_filter() {
532        let message = OdinMessage::new(
533            MessageType::Standard,
534            "alice",
535            "bob",
536            "hello world",
537            MessagePriority::High,
538        );
539        
540        let filter = MessageFilter::new()
541            .with_type(MessageType::Standard)
542            .with_source("alice".to_string())
543            .with_min_priority(MessagePriority::Normal);
544        
545        assert!(filter.matches(&message));
546        
547        let strict_filter = MessageFilter::new()
548            .with_min_priority(MessagePriority::Critical);
549        
550        assert!(!strict_filter.matches(&message));
551    }
552    
553    #[test]
554    fn test_priority_ordering() {
555        assert!(MessagePriority::Critical > MessagePriority::High);
556        assert!(MessagePriority::High > MessagePriority::Normal);
557        assert!(MessagePriority::Normal > MessagePriority::Low);
558    }
559}