1use chrono::{DateTime, Duration, Utc};
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16use std::cmp::Ordering;
17use std::collections::{BinaryHeap, HashMap, VecDeque};
18use thiserror::Error;
19use tokio::sync::oneshot;
20
21pub type MessageBusResult<T> = Result<T, MessageBusError>;
23
24#[derive(Debug, Error)]
26pub enum MessageBusError {
27 #[error("Agent not found: {0}")]
29 AgentNotFound(String),
30
31 #[error("Queue is full for agent: {0}")]
33 QueueFull(String),
34
35 #[error("Message expired: {0}")]
37 MessageExpired(String),
38
39 #[error("Request timeout: {0}")]
41 RequestTimeout(String),
42
43 #[error("Invalid message: {0}")]
45 InvalidMessage(String),
46
47 #[error("Serialization error: {0}")]
49 SerializationError(String),
50
51 #[error("No response received for request: {0}")]
53 NoResponse(String),
54
55 #[error("Response channel closed: {0}")]
57 ChannelClosed(String),
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
62#[serde(rename_all = "camelCase")]
63pub enum MessageTarget {
64 Agent(String),
66 Broadcast,
68 Multiple(Vec<String>),
70}
71
72impl MessageTarget {
73 pub fn get_agent_id(&self) -> Option<String> {
75 match self {
76 MessageTarget::Agent(id) => Some(id.clone()),
77 _ => None,
78 }
79 }
80}
81
82#[derive(
84 Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash,
85)]
86#[serde(rename_all = "camelCase")]
87pub enum MessagePriority {
88 Low = 0,
90 #[default]
92 Normal = 1,
93 High = 2,
95 Critical = 3,
97}
98
99impl From<u8> for MessagePriority {
100 fn from(value: u8) -> Self {
101 match value {
102 0 => Self::Low,
103 1 => Self::Normal,
104 2 => Self::High,
105 _ => Self::Critical,
106 }
107 }
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112#[serde(rename_all = "camelCase")]
113pub struct AgentMessage {
114 pub id: String,
116 pub from: String,
118 pub to: MessageTarget,
120 pub message_type: String,
122 pub payload: Value,
124 pub timestamp: DateTime<Utc>,
126 pub priority: u8,
128 pub requires_response: bool,
130 pub response_to_id: Option<String>,
132 pub expires_at: Option<DateTime<Utc>>,
134}
135
136impl AgentMessage {
137 pub fn new(
139 from: impl Into<String>,
140 to: MessageTarget,
141 message_type: impl Into<String>,
142 payload: Value,
143 ) -> Self {
144 Self {
145 id: uuid::Uuid::new_v4().to_string(),
146 from: from.into(),
147 to,
148 message_type: message_type.into(),
149 payload,
150 timestamp: Utc::now(),
151 priority: MessagePriority::Normal as u8,
152 requires_response: false,
153 response_to_id: None,
154 expires_at: None,
155 }
156 }
157
158 pub fn broadcast(
160 from: impl Into<String>,
161 message_type: impl Into<String>,
162 payload: Value,
163 ) -> Self {
164 Self::new(from, MessageTarget::Broadcast, message_type, payload)
165 }
166
167 pub fn with_priority(mut self, priority: u8) -> Self {
169 self.priority = priority;
170 self
171 }
172
173 pub fn with_requires_response(mut self, requires: bool) -> Self {
175 self.requires_response = requires;
176 self
177 }
178
179 pub fn with_response_to(mut self, id: impl Into<String>) -> Self {
181 self.response_to_id = Some(id.into());
182 self
183 }
184
185 pub fn with_expiration(mut self, expires_at: DateTime<Utc>) -> Self {
187 self.expires_at = Some(expires_at);
188 self
189 }
190
191 pub fn expires_in(mut self, duration: Duration) -> Self {
193 self.expires_at = Some(Utc::now() + duration);
194 self
195 }
196
197 pub fn is_expired(&self) -> bool {
199 self.expires_at.map(|exp| Utc::now() > exp).unwrap_or(false)
200 }
201}
202
203#[derive(Debug, Clone)]
205struct PrioritizedMessage {
206 message: AgentMessage,
207}
208
209impl PartialEq for PrioritizedMessage {
210 fn eq(&self, other: &Self) -> bool {
211 self.message.id == other.message.id
212 }
213}
214
215impl Eq for PrioritizedMessage {}
216
217impl PartialOrd for PrioritizedMessage {
218 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
219 Some(self.cmp(other))
220 }
221}
222
223impl Ord for PrioritizedMessage {
224 fn cmp(&self, other: &Self) -> Ordering {
225 match self.message.priority.cmp(&other.message.priority) {
227 Ordering::Equal => other.message.timestamp.cmp(&self.message.timestamp),
228 other => other,
229 }
230 }
231}
232
233#[derive(Debug, Clone)]
235pub struct MessageSubscription {
236 pub agent_id: String,
238 pub message_types: Vec<String>,
240 pub active: bool,
242}
243
244impl MessageSubscription {
245 pub fn new(agent_id: impl Into<String>) -> Self {
247 Self {
248 agent_id: agent_id.into(),
249 message_types: Vec::new(),
250 active: true,
251 }
252 }
253
254 pub fn with_types(mut self, types: Vec<String>) -> Self {
256 self.message_types = types;
257 self
258 }
259
260 pub fn matches(&self, message_type: &str) -> bool {
262 self.active
263 && (self.message_types.is_empty()
264 || self.message_types.contains(&message_type.to_string()))
265 }
266}
267
268#[derive(Debug)]
270#[allow(dead_code)]
271struct PendingRequest {
272 request_id: String,
274 from: String,
276 to: String,
278 sent_at: DateTime<Utc>,
280 pub expires_at: DateTime<Utc>,
282 pub response_sender: Option<oneshot::Sender<Value>>,
284}
285
286#[derive(Debug)]
288pub struct AgentMessageBus {
289 message_queues: HashMap<String, BinaryHeap<PrioritizedMessage>>,
291 subscriptions: HashMap<String, MessageSubscription>,
293 message_history: VecDeque<AgentMessage>,
295 max_history_size: usize,
297 max_queue_size: usize,
299 pending_requests: HashMap<String, PendingRequest>,
301}
302
303impl Default for AgentMessageBus {
304 fn default() -> Self {
305 Self::new()
306 }
307}
308
309impl AgentMessageBus {
310 pub fn new() -> Self {
312 Self {
313 message_queues: HashMap::new(),
314 subscriptions: HashMap::new(),
315 message_history: VecDeque::new(),
316 max_history_size: 1000,
317 max_queue_size: 100,
318 pending_requests: HashMap::new(),
319 }
320 }
321
322 pub fn with_config(max_history_size: usize, max_queue_size: usize) -> Self {
324 Self {
325 message_queues: HashMap::new(),
326 subscriptions: HashMap::new(),
327 message_history: VecDeque::new(),
328 max_history_size,
329 max_queue_size,
330 pending_requests: HashMap::new(),
331 }
332 }
333
334 pub fn subscribe(&mut self, agent_id: impl Into<String>, types: Vec<String>) {
336 let agent_id = agent_id.into();
337 let subscription = MessageSubscription::new(&agent_id).with_types(types);
338 self.subscriptions.insert(agent_id.clone(), subscription);
339 self.message_queues.entry(agent_id).or_default();
341 }
342
343 pub fn unsubscribe(&mut self, agent_id: &str) {
345 if let Some(sub) = self.subscriptions.get_mut(agent_id) {
346 sub.active = false;
347 }
348 }
349
350 pub fn is_subscribed(&self, agent_id: &str) -> bool {
352 self.subscriptions
353 .get(agent_id)
354 .map(|s| s.active)
355 .unwrap_or(false)
356 }
357
358 pub fn get_subscription(&self, agent_id: &str) -> Option<&MessageSubscription> {
360 self.subscriptions.get(agent_id)
361 }
362
363 pub fn send(&mut self, message: AgentMessage) -> MessageBusResult<()> {
365 if message.is_expired() {
367 return Err(MessageBusError::MessageExpired(message.id.clone()));
368 }
369
370 self.add_to_history(message.clone());
372
373 let target = message.to.clone();
375 match target {
376 MessageTarget::Agent(agent_id) => {
377 self.deliver_to_agent(&agent_id, message)?;
378 }
379 MessageTarget::Broadcast => {
380 self.broadcast_message(message)?;
381 }
382 MessageTarget::Multiple(agent_ids) => {
383 for agent_id in &agent_ids {
384 self.deliver_to_agent(agent_id, message.clone())?;
386 }
387 }
388 }
389
390 Ok(())
391 }
392
393 pub fn broadcast(
395 &mut self,
396 message_type: &str,
397 payload: Value,
398 sender: &str,
399 ) -> MessageBusResult<()> {
400 let message = AgentMessage::broadcast(sender, message_type, payload);
401 self.send(message)
402 }
403
404 fn deliver_to_agent(&mut self, agent_id: &str, message: AgentMessage) -> MessageBusResult<()> {
406 let queue = self.message_queues.entry(agent_id.to_string()).or_default();
408
409 if queue.len() >= self.max_queue_size {
411 return Err(MessageBusError::QueueFull(agent_id.to_string()));
412 }
413
414 queue.push(PrioritizedMessage { message });
415 Ok(())
416 }
417
418 fn broadcast_message(&mut self, message: AgentMessage) -> MessageBusResult<()> {
420 let message_type = &message.message_type;
421 let sender = &message.from;
422
423 let matching_agents: Vec<String> = self
425 .subscriptions
426 .iter()
427 .filter(|(agent_id, sub)| sub.matches(message_type) && *agent_id != sender)
428 .map(|(agent_id, _)| agent_id.clone())
429 .collect();
430
431 for agent_id in matching_agents {
433 self.deliver_to_agent(&agent_id, message.clone())?;
434 }
435
436 Ok(())
437 }
438
439 pub fn get_queue(&self, agent_id: &str) -> Vec<AgentMessage> {
441 self.message_queues
442 .get(agent_id)
443 .map(|heap| {
444 let mut messages: Vec<_> = heap.iter().map(|pm| pm.message.clone()).collect();
445 messages.sort_by(|a, b| match b.priority.cmp(&a.priority) {
447 Ordering::Equal => a.timestamp.cmp(&b.timestamp),
448 other => other,
449 });
450 messages
451 })
452 .unwrap_or_default()
453 }
454
455 pub fn dequeue(&mut self, agent_id: &str, count: usize) -> Vec<AgentMessage> {
457 let queue = match self.message_queues.get_mut(agent_id) {
458 Some(q) => q,
459 None => return Vec::new(),
460 };
461
462 let mut messages = Vec::with_capacity(count.min(queue.len()));
463 for _ in 0..count {
464 if let Some(pm) = queue.pop() {
465 if !pm.message.is_expired() {
467 messages.push(pm.message);
468 }
469 } else {
470 break;
471 }
472 }
473 messages
474 }
475
476 pub fn dequeue_all(&mut self, agent_id: &str) -> Vec<AgentMessage> {
478 let queue = match self.message_queues.get_mut(agent_id) {
479 Some(q) => q,
480 None => return Vec::new(),
481 };
482
483 let mut messages = Vec::with_capacity(queue.len());
484 while let Some(pm) = queue.pop() {
485 if !pm.message.is_expired() {
486 messages.push(pm.message);
487 }
488 }
489 messages
490 }
491
492 pub fn queue_size(&self, agent_id: &str) -> usize {
494 self.message_queues
495 .get(agent_id)
496 .map(|q| q.len())
497 .unwrap_or(0)
498 }
499
500 pub fn has_messages(&self, agent_id: &str) -> bool {
502 self.queue_size(agent_id) > 0
503 }
504
505 fn add_to_history(&mut self, message: AgentMessage) {
507 self.message_history.push_back(message);
508 while self.message_history.len() > self.max_history_size {
509 self.message_history.pop_front();
510 }
511 }
512
513 pub fn get_history(&self, limit: Option<usize>) -> Vec<AgentMessage> {
515 let limit = limit.unwrap_or(self.message_history.len());
516 self.message_history
517 .iter()
518 .rev()
519 .take(limit)
520 .cloned()
521 .collect()
522 }
523
524 pub fn clear_history(&mut self) {
526 self.message_history.clear();
527 }
528
529 pub fn get_subscribed_agents(&self) -> Vec<String> {
531 self.subscriptions
532 .iter()
533 .filter(|(_, sub)| sub.active)
534 .map(|(id, _)| id.clone())
535 .collect()
536 }
537
538 pub fn cleanup_expired(&mut self) -> usize {
540 let mut removed = 0;
541 for queue in self.message_queues.values_mut() {
542 let before = queue.len();
543 let messages: Vec<_> = queue
544 .drain()
545 .filter(|pm| !pm.message.is_expired())
546 .collect();
547 removed += before - messages.len();
548 for msg in messages {
549 queue.push(msg);
550 }
551 }
552 removed
553 }
554
555 pub fn stats(&self) -> MessageBusStats {
557 let total_queued: usize = self.message_queues.values().map(|q| q.len()).sum();
558 MessageBusStats {
559 subscribed_agents: self.subscriptions.iter().filter(|(_, s)| s.active).count(),
560 total_queued_messages: total_queued,
561 history_size: self.message_history.len(),
562 max_history_size: self.max_history_size,
563 max_queue_size: self.max_queue_size,
564 }
565 }
566
567 pub fn prepare_request(
584 &mut self,
585 to: &str,
586 message_type: &str,
587 payload: Value,
588 from: &str,
589 timeout: Duration,
590 ) -> MessageBusResult<(String, oneshot::Receiver<Value>)> {
591 let expires_at = Utc::now() + timeout;
592
593 let message = AgentMessage::new(
595 from,
596 MessageTarget::Agent(to.to_string()),
597 message_type,
598 payload,
599 )
600 .with_requires_response(true)
601 .with_expiration(expires_at);
602
603 let request_id = message.id.clone();
604
605 let (tx, rx) = oneshot::channel();
607
608 let pending = PendingRequest {
610 request_id: request_id.clone(),
611 from: from.to_string(),
612 to: to.to_string(),
613 sent_at: Utc::now(),
614 expires_at,
615 response_sender: Some(tx),
616 };
617 self.pending_requests.insert(request_id.clone(), pending);
618
619 self.send(message)?;
621
622 Ok((request_id, rx))
623 }
624
625 pub fn respond(&mut self, request: &AgentMessage, payload: Value) -> MessageBusResult<()> {
639 if !request.requires_response {
641 return Err(MessageBusError::InvalidMessage(
642 "Request does not require a response".to_string(),
643 ));
644 }
645
646 if let Some(mut pending) = self.pending_requests.remove(&request.id) {
648 if Utc::now() > pending.expires_at {
650 return Err(MessageBusError::RequestTimeout(request.id.clone()));
651 }
652
653 if let Some(sender) = pending.response_sender.take() {
655 sender
656 .send(payload.clone())
657 .map_err(|_| MessageBusError::ChannelClosed(request.id.clone()))?;
658 }
659
660 let response_message = AgentMessage::new(
662 request.to.get_agent_id().unwrap_or_default(),
663 MessageTarget::Agent(request.from.clone()),
664 format!("{}_response", request.message_type),
665 payload,
666 )
667 .with_response_to(&request.id);
668
669 self.add_to_history(response_message.clone());
671
672 self.deliver_to_agent(&request.from, response_message)?;
674
675 Ok(())
676 } else {
677 Err(MessageBusError::NoResponse(request.id.clone()))
678 }
679 }
680
681 pub fn is_request_pending(&self, request_id: &str) -> bool {
683 self.pending_requests.contains_key(request_id)
684 }
685
686 pub fn pending_request_count(&self) -> usize {
688 self.pending_requests.len()
689 }
690
691 pub fn cancel_request(&mut self, request_id: &str) -> bool {
696 self.pending_requests.remove(request_id).is_some()
697 }
698
699 pub fn cleanup_expired_requests(&mut self) -> usize {
704 let now = Utc::now();
705 let expired_ids: Vec<String> = self
706 .pending_requests
707 .iter()
708 .filter(|(_, req)| now > req.expires_at)
709 .map(|(id, _)| id.clone())
710 .collect();
711
712 let count = expired_ids.len();
713 for id in expired_ids {
714 self.pending_requests.remove(&id);
715 }
716 count
717 }
718
719 pub fn get_response(&mut self, agent_id: &str, request_id: &str) -> Option<AgentMessage> {
724 let queue = self.message_queues.get_mut(agent_id)?;
725
726 let messages: Vec<PrioritizedMessage> = queue.drain().collect();
728 let mut response = None;
729 let mut remaining = Vec::new();
730
731 for pm in messages {
732 if pm.message.response_to_id.as_deref() == Some(request_id) {
733 response = Some(pm.message);
734 } else {
735 remaining.push(pm);
736 }
737 }
738
739 for pm in remaining {
741 queue.push(pm);
742 }
743
744 response
745 }
746
747 pub fn find_message_in_history(&self, message_id: &str) -> Option<&AgentMessage> {
749 self.message_history.iter().find(|m| m.id == message_id)
750 }
751
752 pub fn get_responses_from_history(&self, request_id: &str) -> Vec<&AgentMessage> {
754 self.message_history
755 .iter()
756 .filter(|m| m.response_to_id.as_deref() == Some(request_id))
757 .collect()
758 }
759}
760
761#[derive(Debug, Clone, Serialize, Deserialize)]
763#[serde(rename_all = "camelCase")]
764pub struct MessageBusStats {
765 pub subscribed_agents: usize,
767 pub total_queued_messages: usize,
769 pub history_size: usize,
771 pub max_history_size: usize,
773 pub max_queue_size: usize,
775}
776
777#[cfg(test)]
778mod tests {
779 use super::*;
780 use serde_json::json;
781 use tokio::sync::oneshot;
782
783 #[test]
784 fn test_message_creation() {
785 let msg = AgentMessage::new(
786 "agent-1",
787 MessageTarget::Agent("agent-2".to_string()),
788 "test-type",
789 json!({"data": "value"}),
790 );
791
792 assert!(!msg.id.is_empty());
793 assert_eq!(msg.from, "agent-1");
794 assert_eq!(msg.to, MessageTarget::Agent("agent-2".to_string()));
795 assert_eq!(msg.message_type, "test-type");
796 assert_eq!(msg.priority, MessagePriority::Normal as u8);
797 assert!(!msg.requires_response);
798 assert!(msg.response_to_id.is_none());
799 assert!(msg.expires_at.is_none());
800 }
801
802 #[test]
803 fn test_message_broadcast_creation() {
804 let msg = AgentMessage::broadcast("agent-1", "broadcast-type", json!({"key": "value"}));
805
806 assert_eq!(msg.to, MessageTarget::Broadcast);
807 assert_eq!(msg.message_type, "broadcast-type");
808 }
809
810 #[test]
811 fn test_message_with_priority() {
812 let msg = AgentMessage::new(
813 "agent-1",
814 MessageTarget::Agent("agent-2".to_string()),
815 "test",
816 json!({}),
817 )
818 .with_priority(MessagePriority::Critical as u8);
819
820 assert_eq!(msg.priority, MessagePriority::Critical as u8);
821 }
822
823 #[test]
824 fn test_message_expiration() {
825 let expired_msg = AgentMessage::new(
826 "agent-1",
827 MessageTarget::Agent("agent-2".to_string()),
828 "test",
829 json!({}),
830 )
831 .with_expiration(Utc::now() - Duration::seconds(10));
832
833 assert!(expired_msg.is_expired());
834
835 let valid_msg = AgentMessage::new(
836 "agent-1",
837 MessageTarget::Agent("agent-2".to_string()),
838 "test",
839 json!({}),
840 )
841 .expires_in(Duration::hours(1));
842
843 assert!(!valid_msg.is_expired());
844 }
845
846 #[test]
847 fn test_message_bus_subscribe() {
848 let mut bus = AgentMessageBus::new();
849 bus.subscribe("agent-1", vec!["type-a".to_string(), "type-b".to_string()]);
850
851 assert!(bus.is_subscribed("agent-1"));
852 assert!(!bus.is_subscribed("agent-2"));
853
854 let sub = bus.get_subscription("agent-1").unwrap();
855 assert!(sub.matches("type-a"));
856 assert!(sub.matches("type-b"));
857 assert!(!sub.matches("type-c"));
858 }
859
860 #[test]
861 fn test_message_bus_subscribe_all_types() {
862 let mut bus = AgentMessageBus::new();
863 bus.subscribe("agent-1", vec![]); let sub = bus.get_subscription("agent-1").unwrap();
866 assert!(sub.matches("any-type"));
867 assert!(sub.matches("another-type"));
868 }
869
870 #[test]
871 fn test_message_bus_unsubscribe() {
872 let mut bus = AgentMessageBus::new();
873 bus.subscribe("agent-1", vec![]);
874 assert!(bus.is_subscribed("agent-1"));
875
876 bus.unsubscribe("agent-1");
877 assert!(!bus.is_subscribed("agent-1"));
878 }
879
880 #[test]
881 fn test_message_bus_send_to_agent() {
882 let mut bus = AgentMessageBus::new();
883 bus.subscribe("agent-2", vec![]);
884
885 let msg = AgentMessage::new(
886 "agent-1",
887 MessageTarget::Agent("agent-2".to_string()),
888 "test",
889 json!({"data": 123}),
890 );
891
892 bus.send(msg).unwrap();
893
894 assert_eq!(bus.queue_size("agent-2"), 1);
895 assert!(bus.has_messages("agent-2"));
896 }
897
898 #[test]
899 fn test_message_bus_broadcast() {
900 let mut bus = AgentMessageBus::new();
901 bus.subscribe("agent-1", vec!["broadcast-type".to_string()]);
902 bus.subscribe("agent-2", vec!["broadcast-type".to_string()]);
903 bus.subscribe("agent-3", vec!["other-type".to_string()]);
904
905 bus.broadcast("broadcast-type", json!({"msg": "hello"}), "sender")
906 .unwrap();
907
908 assert_eq!(bus.queue_size("agent-1"), 1);
911 assert_eq!(bus.queue_size("agent-2"), 1);
912 assert_eq!(bus.queue_size("agent-3"), 0);
913 }
914
915 #[test]
916 fn test_message_bus_priority_ordering() {
917 let mut bus = AgentMessageBus::new();
918 bus.subscribe("agent-1", vec![]);
919
920 let low = AgentMessage::new(
922 "sender",
923 MessageTarget::Agent("agent-1".to_string()),
924 "test",
925 json!({"priority": "low"}),
926 )
927 .with_priority(MessagePriority::Low as u8);
928
929 let high = AgentMessage::new(
930 "sender",
931 MessageTarget::Agent("agent-1".to_string()),
932 "test",
933 json!({"priority": "high"}),
934 )
935 .with_priority(MessagePriority::High as u8);
936
937 let normal = AgentMessage::new(
938 "sender",
939 MessageTarget::Agent("agent-1".to_string()),
940 "test",
941 json!({"priority": "normal"}),
942 )
943 .with_priority(MessagePriority::Normal as u8);
944
945 let critical = AgentMessage::new(
946 "sender",
947 MessageTarget::Agent("agent-1".to_string()),
948 "test",
949 json!({"priority": "critical"}),
950 )
951 .with_priority(MessagePriority::Critical as u8);
952
953 bus.send(low).unwrap();
955 bus.send(high).unwrap();
956 bus.send(normal).unwrap();
957 bus.send(critical).unwrap();
958
959 let messages = bus.dequeue("agent-1", 4);
961 assert_eq!(messages.len(), 4);
962 assert_eq!(messages[0].priority, MessagePriority::Critical as u8);
963 assert_eq!(messages[1].priority, MessagePriority::High as u8);
964 assert_eq!(messages[2].priority, MessagePriority::Normal as u8);
965 assert_eq!(messages[3].priority, MessagePriority::Low as u8);
966 }
967
968 #[test]
969 fn test_message_bus_dequeue() {
970 let mut bus = AgentMessageBus::new();
971 bus.subscribe("agent-1", vec![]);
972
973 for i in 0..5 {
974 let msg = AgentMessage::new(
975 "sender",
976 MessageTarget::Agent("agent-1".to_string()),
977 "test",
978 json!({"index": i}),
979 );
980 bus.send(msg).unwrap();
981 }
982
983 assert_eq!(bus.queue_size("agent-1"), 5);
984
985 let messages = bus.dequeue("agent-1", 3);
986 assert_eq!(messages.len(), 3);
987 assert_eq!(bus.queue_size("agent-1"), 2);
988
989 let remaining = bus.dequeue_all("agent-1");
990 assert_eq!(remaining.len(), 2);
991 assert_eq!(bus.queue_size("agent-1"), 0);
992 }
993
994 #[test]
995 fn test_message_bus_queue_full() {
996 let mut bus = AgentMessageBus::with_config(100, 2); bus.subscribe("agent-1", vec![]);
998
999 let msg1 = AgentMessage::new(
1000 "sender",
1001 MessageTarget::Agent("agent-1".to_string()),
1002 "test",
1003 json!({}),
1004 );
1005 let msg2 = AgentMessage::new(
1006 "sender",
1007 MessageTarget::Agent("agent-1".to_string()),
1008 "test",
1009 json!({}),
1010 );
1011 let msg3 = AgentMessage::new(
1012 "sender",
1013 MessageTarget::Agent("agent-1".to_string()),
1014 "test",
1015 json!({}),
1016 );
1017
1018 bus.send(msg1).unwrap();
1019 bus.send(msg2).unwrap();
1020
1021 let result = bus.send(msg3);
1023 assert!(matches!(result, Err(MessageBusError::QueueFull(_))));
1024 }
1025
1026 #[test]
1027 fn test_message_bus_expired_message() {
1028 let mut bus = AgentMessageBus::new();
1029 bus.subscribe("agent-1", vec![]);
1030
1031 let expired = AgentMessage::new(
1032 "sender",
1033 MessageTarget::Agent("agent-1".to_string()),
1034 "test",
1035 json!({}),
1036 )
1037 .with_expiration(Utc::now() - Duration::seconds(10));
1038
1039 let result = bus.send(expired);
1040 assert!(matches!(result, Err(MessageBusError::MessageExpired(_))));
1041 }
1042
1043 #[test]
1044 fn test_message_bus_history() {
1045 let mut bus = AgentMessageBus::with_config(5, 100); bus.subscribe("agent-1", vec![]);
1047
1048 for i in 0..10 {
1049 let msg = AgentMessage::new(
1050 "sender",
1051 MessageTarget::Agent("agent-1".to_string()),
1052 "test",
1053 json!({"index": i}),
1054 );
1055 bus.send(msg).unwrap();
1056 }
1057
1058 let history = bus.get_history(None);
1059 assert_eq!(history.len(), 5); let limited = bus.get_history(Some(3));
1062 assert_eq!(limited.len(), 3);
1063 }
1064
1065 #[test]
1066 fn test_message_bus_stats() {
1067 let mut bus = AgentMessageBus::new();
1068 bus.subscribe("agent-1", vec![]);
1069 bus.subscribe("agent-2", vec![]);
1070
1071 let msg = AgentMessage::new(
1072 "sender",
1073 MessageTarget::Agent("agent-1".to_string()),
1074 "test",
1075 json!({}),
1076 );
1077 bus.send(msg).unwrap();
1078
1079 let stats = bus.stats();
1080 assert_eq!(stats.subscribed_agents, 2);
1081 assert_eq!(stats.total_queued_messages, 1);
1082 assert_eq!(stats.history_size, 1);
1083 }
1084
1085 #[test]
1086 fn test_message_bus_get_subscribed_agents() {
1087 let mut bus = AgentMessageBus::new();
1088 bus.subscribe("agent-1", vec![]);
1089 bus.subscribe("agent-2", vec![]);
1090 bus.subscribe("agent-3", vec![]);
1091 bus.unsubscribe("agent-2");
1092
1093 let agents = bus.get_subscribed_agents();
1094 assert_eq!(agents.len(), 2);
1095 assert!(agents.contains(&"agent-1".to_string()));
1096 assert!(agents.contains(&"agent-3".to_string()));
1097 assert!(!agents.contains(&"agent-2".to_string()));
1098 }
1099
1100 #[test]
1101 fn test_prepare_request() {
1102 let mut bus = AgentMessageBus::new();
1103 bus.subscribe("agent-1", vec![]);
1104 bus.subscribe("agent-2", vec![]);
1105
1106 let (request_id, _rx) = bus
1107 .prepare_request(
1108 "agent-2",
1109 "query",
1110 json!({"question": "hello?"}),
1111 "agent-1",
1112 Duration::seconds(30),
1113 )
1114 .unwrap();
1115
1116 assert!(bus.is_request_pending(&request_id));
1118 assert_eq!(bus.pending_request_count(), 1);
1119
1120 assert_eq!(bus.queue_size("agent-2"), 1);
1122
1123 let messages = bus.get_queue("agent-2");
1124 assert_eq!(messages[0].message_type, "query");
1125 assert!(messages[0].requires_response);
1126 }
1127
1128 #[test]
1129 fn test_respond_to_request() {
1130 let mut bus = AgentMessageBus::new();
1131 bus.subscribe("agent-1", vec![]);
1132 bus.subscribe("agent-2", vec![]);
1133
1134 let (request_id, _rx) = bus
1136 .prepare_request(
1137 "agent-2",
1138 "query",
1139 json!({"question": "hello?"}),
1140 "agent-1",
1141 Duration::seconds(30),
1142 )
1143 .unwrap();
1144
1145 let messages = bus.dequeue("agent-2", 1);
1147 let request = &messages[0];
1148
1149 bus.respond(request, json!({"answer": "world!"})).unwrap();
1151
1152 assert!(!bus.is_request_pending(&request_id));
1154
1155 assert_eq!(bus.queue_size("agent-1"), 1);
1157
1158 let responses = bus.get_queue("agent-1");
1159 assert_eq!(responses[0].message_type, "query_response");
1160 assert_eq!(responses[0].response_to_id, Some(request_id));
1161 }
1162
1163 #[test]
1164 fn test_respond_to_non_request() {
1165 let mut bus = AgentMessageBus::new();
1166 bus.subscribe("agent-1", vec![]);
1167 bus.subscribe("agent-2", vec![]);
1168
1169 let msg = AgentMessage::new(
1171 "agent-1",
1172 MessageTarget::Agent("agent-2".to_string()),
1173 "info",
1174 json!({"data": "test"}),
1175 );
1176 bus.send(msg).unwrap();
1177
1178 let messages = bus.dequeue("agent-2", 1);
1180 let message = &messages[0];
1181
1182 let result = bus.respond(message, json!({"response": "test"}));
1184 assert!(matches!(result, Err(MessageBusError::InvalidMessage(_))));
1185 }
1186
1187 #[test]
1188 fn test_cancel_request() {
1189 let mut bus = AgentMessageBus::new();
1190 bus.subscribe("agent-1", vec![]);
1191 bus.subscribe("agent-2", vec![]);
1192
1193 let (request_id, _rx) = bus
1194 .prepare_request(
1195 "agent-2",
1196 "query",
1197 json!({}),
1198 "agent-1",
1199 Duration::seconds(30),
1200 )
1201 .unwrap();
1202
1203 assert!(bus.is_request_pending(&request_id));
1204
1205 assert!(bus.cancel_request(&request_id));
1207 assert!(!bus.is_request_pending(&request_id));
1208
1209 assert!(!bus.cancel_request(&request_id));
1211 }
1212
1213 #[test]
1214 fn test_cleanup_expired_requests() {
1215 let mut bus = AgentMessageBus::new();
1216 bus.subscribe("agent-1", vec![]);
1217 bus.subscribe("agent-2", vec![]);
1218
1219 let expires_at = Utc::now() - Duration::seconds(1);
1221 let message = AgentMessage::new(
1222 "agent-1",
1223 MessageTarget::Agent("agent-2".to_string()),
1224 "query",
1225 json!({}),
1226 )
1227 .with_requires_response(true)
1228 .with_expiration(expires_at);
1229
1230 let request_id = message.id.clone();
1231 let (tx, _rx) = oneshot::channel();
1232
1233 bus.pending_requests.insert(
1235 request_id.clone(),
1236 PendingRequest {
1237 request_id: request_id.clone(),
1238 from: "agent-1".to_string(),
1239 to: "agent-2".to_string(),
1240 sent_at: Utc::now() - Duration::seconds(10),
1241 expires_at,
1242 response_sender: Some(tx),
1243 },
1244 );
1245
1246 assert_eq!(bus.pending_request_count(), 1);
1247
1248 let cleaned = bus.cleanup_expired_requests();
1250 assert_eq!(cleaned, 1);
1251 assert_eq!(bus.pending_request_count(), 0);
1252 }
1253
1254 #[test]
1255 fn test_get_response_from_queue() {
1256 let mut bus = AgentMessageBus::new();
1257 bus.subscribe("agent-1", vec![]);
1258 bus.subscribe("agent-2", vec![]);
1259
1260 let (request_id, _rx) = bus
1262 .prepare_request(
1263 "agent-2",
1264 "query",
1265 json!({}),
1266 "agent-1",
1267 Duration::seconds(30),
1268 )
1269 .unwrap();
1270
1271 let messages = bus.dequeue("agent-2", 1);
1273 bus.respond(&messages[0], json!({"answer": "test"}))
1274 .unwrap();
1275
1276 let response = bus.get_response("agent-1", &request_id);
1278 assert!(response.is_some());
1279 let response = response.unwrap();
1280 assert_eq!(response.response_to_id, Some(request_id.clone()));
1281
1282 assert!(bus.get_response("agent-1", &request_id).is_none());
1284 }
1285
1286 #[test]
1287 fn test_find_message_in_history() {
1288 let mut bus = AgentMessageBus::new();
1289 bus.subscribe("agent-1", vec![]);
1290
1291 let msg = AgentMessage::new(
1292 "sender",
1293 MessageTarget::Agent("agent-1".to_string()),
1294 "test",
1295 json!({}),
1296 );
1297 let msg_id = msg.id.clone();
1298 bus.send(msg).unwrap();
1299
1300 let found = bus.find_message_in_history(&msg_id);
1302 assert!(found.is_some());
1303 assert_eq!(found.unwrap().id, msg_id);
1304
1305 assert!(bus.find_message_in_history("non-existent").is_none());
1307 }
1308
1309 #[test]
1310 fn test_get_responses_from_history() {
1311 let mut bus = AgentMessageBus::new();
1312 bus.subscribe("agent-1", vec![]);
1313 bus.subscribe("agent-2", vec![]);
1314
1315 let (request_id, _rx) = bus
1317 .prepare_request(
1318 "agent-2",
1319 "query",
1320 json!({}),
1321 "agent-1",
1322 Duration::seconds(30),
1323 )
1324 .unwrap();
1325
1326 let messages = bus.dequeue("agent-2", 1);
1328 bus.respond(&messages[0], json!({"answer": "test"}))
1329 .unwrap();
1330
1331 let responses = bus.get_responses_from_history(&request_id);
1333 assert_eq!(responses.len(), 1);
1334 assert_eq!(responses[0].response_to_id, Some(request_id));
1335 }
1336
1337 #[test]
1338 fn test_message_target_get_agent_id() {
1339 let agent_target = MessageTarget::Agent("agent-1".to_string());
1340 assert_eq!(agent_target.get_agent_id(), Some("agent-1".to_string()));
1341
1342 let broadcast_target = MessageTarget::Broadcast;
1343 assert_eq!(broadcast_target.get_agent_id(), None);
1344
1345 let multiple_target = MessageTarget::Multiple(vec!["a".to_string(), "b".to_string()]);
1346 assert_eq!(multiple_target.get_agent_id(), None);
1347 }
1348
1349 #[test]
1350 fn test_cleanup_expired_messages() {
1351 let mut bus = AgentMessageBus::new();
1352 bus.subscribe("agent-1", vec![]);
1353
1354 let msg = AgentMessage::new(
1356 "sender",
1357 MessageTarget::Agent("agent-1".to_string()),
1358 "test",
1359 json!({}),
1360 )
1361 .with_expiration(Utc::now() - Duration::seconds(1)); bus.message_queues
1365 .entry("agent-1".to_string())
1366 .or_default()
1367 .push(PrioritizedMessage { message: msg });
1368
1369 assert_eq!(bus.queue_size("agent-1"), 1);
1370
1371 let removed = bus.cleanup_expired();
1373 assert_eq!(removed, 1);
1374 assert_eq!(bus.queue_size("agent-1"), 0);
1375 }
1376}