1use serde::{Deserialize, Serialize};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct OdinMessage {
9 pub id: String,
11 pub message_type: MessageType,
13 pub source_node: String,
15 pub target_node: String,
17 pub content: String,
19 pub priority: MessagePriority,
21 pub timestamp: u64,
23 pub metadata: std::collections::HashMap<String, String>,
25 pub sequence: u64,
27 pub checksum: Option<String>,
29}
30
31impl OdinMessage {
32 pub fn new(
34 message_type: MessageType,
35 source_node: &str,
36 target_node: &str,
37 content: &str,
38 priority: MessagePriority,
39 ) -> Self {
40 let timestamp = SystemTime::now()
41 .duration_since(UNIX_EPOCH)
42 .unwrap_or_default()
43 .as_secs();
44
45 Self {
46 id: uuid::Uuid::new_v4().to_string(),
47 message_type,
48 source_node: source_node.to_string(),
49 target_node: target_node.to_string(),
50 content: content.to_string(),
51 priority,
52 timestamp,
53 metadata: std::collections::HashMap::new(),
54 sequence: 0,
55 checksum: None,
56 }
57 }
58
59 pub fn with_metadata(mut self, key: String, value: String) -> Self {
61 self.metadata.insert(key, value);
62 self
63 }
64
65 pub fn with_sequence(mut self, sequence: u64) -> Self {
67 self.sequence = sequence;
68 self
69 }
70
71 pub fn with_checksum(mut self) -> Self {
73 self.checksum = Some(self.calculate_checksum());
74 self
75 }
76
77 pub fn validate(&self) -> bool {
79 if let Some(checksum) = &self.checksum {
80 &self.calculate_checksum() == checksum
81 } else {
82 true }
84 }
85
86 fn calculate_checksum(&self) -> String {
88 use std::collections::hash_map::DefaultHasher;
89 use std::hash::{Hash, Hasher};
90
91 let mut hasher = DefaultHasher::new();
92 self.id.hash(&mut hasher);
93 self.source_node.hash(&mut hasher);
94 self.target_node.hash(&mut hasher);
95 self.content.hash(&mut hasher);
96 self.timestamp.hash(&mut hasher);
97
98 format!("{:x}", hasher.finish())
99 }
100
101 pub fn size(&self) -> usize {
103 serde_json::to_string(self)
104 .map(|s| s.len())
105 .unwrap_or(0)
106 }
107
108 pub fn is_expired(&self, ttl_seconds: u64) -> bool {
110 let current_time = SystemTime::now()
111 .duration_since(UNIX_EPOCH)
112 .unwrap_or_default()
113 .as_secs();
114
115 current_time > self.timestamp + ttl_seconds
116 }
117
118 pub fn create_reply(&self, content: &str, priority: MessagePriority) -> Self {
120 Self::new(
121 MessageType::Reply,
122 &self.target_node,
123 &self.source_node,
124 content,
125 priority,
126 )
127 .with_metadata("reply_to".to_string(), self.id.clone())
128 }
129
130 pub fn create_ack(&self) -> Self {
132 Self::new(
133 MessageType::Acknowledgment,
134 &self.target_node,
135 &self.source_node,
136 "ack",
137 MessagePriority::Low,
138 )
139 .with_metadata("ack_for".to_string(), self.id.clone())
140 }
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
145pub enum MessageType {
146 Standard,
148 Broadcast,
150 Reply,
152 Acknowledgment,
154 Heartbeat,
156 System,
158 Error,
160}
161
162impl std::fmt::Display for MessageType {
163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 match self {
165 MessageType::Standard => write!(f, "standard"),
166 MessageType::Broadcast => write!(f, "broadcast"),
167 MessageType::Reply => write!(f, "reply"),
168 MessageType::Acknowledgment => write!(f, "ack"),
169 MessageType::Heartbeat => write!(f, "heartbeat"),
170 MessageType::System => write!(f, "system"),
171 MessageType::Error => write!(f, "error"),
172 }
173 }
174}
175
176#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
178pub enum MessagePriority {
179 Low = 0,
181 Normal = 1,
183 High = 2,
185 Critical = 3,
187}
188
189impl std::fmt::Display for MessagePriority {
190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191 match self {
192 MessagePriority::Low => write!(f, "low"),
193 MessagePriority::Normal => write!(f, "normal"),
194 MessagePriority::High => write!(f, "high"),
195 MessagePriority::Critical => write!(f, "critical"),
196 }
197 }
198}
199
200impl Default for MessagePriority {
201 fn default() -> Self {
202 MessagePriority::Normal
203 }
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct MessageBatch {
209 pub batch_id: String,
211 pub messages: Vec<OdinMessage>,
213 pub timestamp: u64,
215 pub metadata: std::collections::HashMap<String, String>,
217}
218
219impl MessageBatch {
220 pub fn new() -> Self {
222 let timestamp = SystemTime::now()
223 .duration_since(UNIX_EPOCH)
224 .unwrap_or_default()
225 .as_secs();
226
227 Self {
228 batch_id: uuid::Uuid::new_v4().to_string(),
229 messages: Vec::new(),
230 timestamp,
231 metadata: std::collections::HashMap::new(),
232 }
233 }
234
235 pub fn add_message(mut self, message: OdinMessage) -> Self {
237 self.messages.push(message);
238 self
239 }
240
241 pub fn len(&self) -> usize {
243 self.messages.len()
244 }
245
246 pub fn is_empty(&self) -> bool {
248 self.messages.is_empty()
249 }
250
251 pub fn total_size(&self) -> usize {
253 self.messages.iter().map(|m| m.size()).sum::<usize>()
254 + serde_json::to_string(&self.batch_id).unwrap_or_default().len()
255 + 8 }
257
258 pub fn split(self, max_size: usize) -> Vec<MessageBatch> {
260 let mut batches = Vec::new();
261 let mut current_batch = MessageBatch::new();
262
263 for message in self.messages {
264 if current_batch.len() >= max_size && !current_batch.is_empty() {
265 batches.push(current_batch);
266 current_batch = MessageBatch::new();
267 }
268 current_batch = current_batch.add_message(message);
269 }
270
271 if !current_batch.is_empty() {
272 batches.push(current_batch);
273 }
274
275 batches
276 }
277}
278
279impl Default for MessageBatch {
280 fn default() -> Self {
281 Self::new()
282 }
283}
284
285#[derive(Debug, Clone)]
287pub struct MessageFilter {
288 pub message_type: Option<MessageType>,
290 pub source_pattern: Option<String>,
292 pub target_pattern: Option<String>,
294 pub min_priority: Option<MessagePriority>,
296 pub content_pattern: Option<String>,
298 pub max_age_seconds: Option<u64>,
300}
301
302impl MessageFilter {
303 pub fn new() -> Self {
305 Self {
306 message_type: None,
307 source_pattern: None,
308 target_pattern: None,
309 min_priority: None,
310 content_pattern: None,
311 max_age_seconds: None,
312 }
313 }
314
315 pub fn with_type(mut self, message_type: MessageType) -> Self {
317 self.message_type = Some(message_type);
318 self
319 }
320
321 pub fn with_source(mut self, pattern: String) -> Self {
323 self.source_pattern = Some(pattern);
324 self
325 }
326
327 pub fn with_target(mut self, pattern: String) -> Self {
329 self.target_pattern = Some(pattern);
330 self
331 }
332
333 pub fn with_min_priority(mut self, priority: MessagePriority) -> Self {
335 self.min_priority = Some(priority);
336 self
337 }
338
339 pub fn with_content(mut self, pattern: String) -> Self {
341 self.content_pattern = Some(pattern);
342 self
343 }
344
345 pub fn with_max_age(mut self, seconds: u64) -> Self {
347 self.max_age_seconds = Some(seconds);
348 self
349 }
350
351 pub fn matches(&self, message: &OdinMessage) -> bool {
353 if let Some(msg_type) = self.message_type {
355 if message.message_type != msg_type {
356 return false;
357 }
358 }
359
360 if let Some(pattern) = &self.source_pattern {
362 if !message.source_node.contains(pattern) {
363 return false;
364 }
365 }
366
367 if let Some(pattern) = &self.target_pattern {
369 if !message.target_node.contains(pattern) {
370 return false;
371 }
372 }
373
374 if let Some(min_priority) = self.min_priority {
376 if message.priority < min_priority {
377 return false;
378 }
379 }
380
381 if let Some(pattern) = &self.content_pattern {
383 if !message.content.contains(pattern) {
384 return false;
385 }
386 }
387
388 if let Some(max_age) = self.max_age_seconds {
390 if message.is_expired(max_age) {
391 return false;
392 }
393 }
394
395 true
396 }
397}
398
399impl Default for MessageFilter {
400 fn default() -> Self {
401 Self::new()
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use std::thread;
409 use std::time::Duration;
410
411 #[test]
412 fn test_message_creation() {
413 let message = OdinMessage::new(
414 MessageType::Standard,
415 "source-node",
416 "target-node",
417 "Hello, World!",
418 MessagePriority::Normal,
419 );
420
421 assert!(!message.id.is_empty());
422 assert_eq!(message.message_type, MessageType::Standard);
423 assert_eq!(message.source_node, "source-node");
424 assert_eq!(message.target_node, "target-node");
425 assert_eq!(message.content, "Hello, World!");
426 assert_eq!(message.priority, MessagePriority::Normal);
427 assert!(message.timestamp > 0);
428 }
429
430 #[test]
431 fn test_message_with_metadata() {
432 let message = OdinMessage::new(
433 MessageType::Standard,
434 "source",
435 "target",
436 "content",
437 MessagePriority::Normal,
438 )
439 .with_metadata("key1".to_string(), "value1".to_string())
440 .with_metadata("key2".to_string(), "value2".to_string());
441
442 assert_eq!(message.metadata.len(), 2);
443 assert_eq!(message.metadata.get("key1"), Some(&"value1".to_string()));
444 assert_eq!(message.metadata.get("key2"), Some(&"value2".to_string()));
445 }
446
447 #[test]
448 fn test_message_checksum() {
449 let message = OdinMessage::new(
450 MessageType::Standard,
451 "source",
452 "target",
453 "content",
454 MessagePriority::Normal,
455 )
456 .with_checksum();
457
458 assert!(message.checksum.is_some());
459 assert!(message.validate());
460 }
461
462 #[test]
463 fn test_message_expiration() {
464 let mut message = OdinMessage::new(
465 MessageType::Standard,
466 "source",
467 "target",
468 "content",
469 MessagePriority::Normal,
470 );
471
472 message.timestamp = SystemTime::now()
474 .duration_since(UNIX_EPOCH)
475 .unwrap_or_default()
476 .as_secs() - 10;
477
478 assert!(message.is_expired(5)); assert!(!message.is_expired(15)); }
481
482 #[test]
483 fn test_message_reply() {
484 let original = OdinMessage::new(
485 MessageType::Standard,
486 "alice",
487 "bob",
488 "Hello",
489 MessagePriority::Normal,
490 );
491
492 let reply = original.create_reply("Hello back!", MessagePriority::Normal);
493
494 assert_eq!(reply.message_type, MessageType::Reply);
495 assert_eq!(reply.source_node, "bob");
496 assert_eq!(reply.target_node, "alice");
497 assert_eq!(reply.content, "Hello back!");
498 assert_eq!(reply.metadata.get("reply_to"), Some(&original.id));
499 }
500
501 #[test]
502 fn test_message_batch() {
503 let mut batch = MessageBatch::new();
504
505 let message1 = OdinMessage::new(
506 MessageType::Standard,
507 "source",
508 "target1",
509 "message 1",
510 MessagePriority::Normal,
511 );
512
513 let message2 = OdinMessage::new(
514 MessageType::Standard,
515 "source",
516 "target2",
517 "message 2",
518 MessagePriority::High,
519 );
520
521 batch = batch
522 .add_message(message1)
523 .add_message(message2);
524
525 assert_eq!(batch.len(), 2);
526 assert!(!batch.is_empty());
527 assert!(batch.total_size() > 0);
528 }
529
530 #[test]
531 fn test_message_filter() {
532 let message = OdinMessage::new(
533 MessageType::Standard,
534 "alice",
535 "bob",
536 "hello world",
537 MessagePriority::High,
538 );
539
540 let filter = MessageFilter::new()
541 .with_type(MessageType::Standard)
542 .with_source("alice".to_string())
543 .with_min_priority(MessagePriority::Normal);
544
545 assert!(filter.matches(&message));
546
547 let strict_filter = MessageFilter::new()
548 .with_min_priority(MessagePriority::Critical);
549
550 assert!(!strict_filter.matches(&message));
551 }
552
553 #[test]
554 fn test_priority_ordering() {
555 assert!(MessagePriority::Critical > MessagePriority::High);
556 assert!(MessagePriority::High > MessagePriority::Normal);
557 assert!(MessagePriority::Normal > MessagePriority::Low);
558 }
559}