Skip to main content

aster/agents/communication/
message_bus.rs

1//! Agent Message Bus
2//!
3//! Provides inter-agent messaging with priority queues,
4//! broadcast support, and request-response patterns.
5//!
6//! # Features
7//! - Priority-based message queuing
8//! - Broadcast messaging to subscribed agents
9//! - Request-response communication patterns
10//! - Message expiration handling
11//! - Message history for debugging
12
13use chrono::{DateTime, Duration, Utc};
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16use std::cmp::Ordering;
17use std::collections::{BinaryHeap, HashMap, VecDeque};
18use thiserror::Error;
19use tokio::sync::oneshot;
20
21/// Result type alias for message bus operations
22pub type MessageBusResult<T> = Result<T, MessageBusError>;
23
24/// Error types for message bus operations
25#[derive(Debug, Error)]
26pub enum MessageBusError {
27    /// Agent not found
28    #[error("Agent not found: {0}")]
29    AgentNotFound(String),
30
31    /// Queue is full
32    #[error("Queue is full for agent: {0}")]
33    QueueFull(String),
34
35    /// Message expired
36    #[error("Message expired: {0}")]
37    MessageExpired(String),
38
39    /// Request timeout
40    #[error("Request timeout: {0}")]
41    RequestTimeout(String),
42
43    /// Invalid message
44    #[error("Invalid message: {0}")]
45    InvalidMessage(String),
46
47    /// Serialization error
48    #[error("Serialization error: {0}")]
49    SerializationError(String),
50
51    /// No response received
52    #[error("No response received for request: {0}")]
53    NoResponse(String),
54
55    /// Response channel closed
56    #[error("Response channel closed: {0}")]
57    ChannelClosed(String),
58}
59
60/// Message target - either a specific agent or broadcast
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
62#[serde(rename_all = "camelCase")]
63pub enum MessageTarget {
64    /// Send to a specific agent
65    Agent(String),
66    /// Broadcast to all agents subscribed to a message type
67    Broadcast,
68    /// Send to multiple specific agents
69    Multiple(Vec<String>),
70}
71
72impl MessageTarget {
73    /// Get the agent ID if this is a single agent target
74    pub fn get_agent_id(&self) -> Option<String> {
75        match self {
76            MessageTarget::Agent(id) => Some(id.clone()),
77            _ => None,
78        }
79    }
80}
81
82/// Priority levels for messages
83#[derive(
84    Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash,
85)]
86#[serde(rename_all = "camelCase")]
87pub enum MessagePriority {
88    /// Low priority - processed last
89    Low = 0,
90    /// Normal priority - default
91    #[default]
92    Normal = 1,
93    /// High priority - processed before normal
94    High = 2,
95    /// Critical priority - processed first
96    Critical = 3,
97}
98
99impl From<u8> for MessagePriority {
100    fn from(value: u8) -> Self {
101        match value {
102            0 => Self::Low,
103            1 => Self::Normal,
104            2 => Self::High,
105            _ => Self::Critical,
106        }
107    }
108}
109
110/// Agent message structure
111#[derive(Debug, Clone, Serialize, Deserialize)]
112#[serde(rename_all = "camelCase")]
113pub struct AgentMessage {
114    /// Unique message identifier
115    pub id: String,
116    /// Sender agent ID
117    pub from: String,
118    /// Target (agent ID or broadcast)
119    pub to: MessageTarget,
120    /// Message type for routing/filtering
121    pub message_type: String,
122    /// Message payload
123    pub payload: Value,
124    /// Creation timestamp
125    pub timestamp: DateTime<Utc>,
126    /// Message priority (0-255, higher = more important)
127    pub priority: u8,
128    /// Whether this message requires a response
129    pub requires_response: bool,
130    /// ID of the message this is responding to (if any)
131    pub response_to_id: Option<String>,
132    /// Expiration time (if any)
133    pub expires_at: Option<DateTime<Utc>>,
134}
135
136impl AgentMessage {
137    /// Create a new message
138    pub fn new(
139        from: impl Into<String>,
140        to: MessageTarget,
141        message_type: impl Into<String>,
142        payload: Value,
143    ) -> Self {
144        Self {
145            id: uuid::Uuid::new_v4().to_string(),
146            from: from.into(),
147            to,
148            message_type: message_type.into(),
149            payload,
150            timestamp: Utc::now(),
151            priority: MessagePriority::Normal as u8,
152            requires_response: false,
153            response_to_id: None,
154            expires_at: None,
155        }
156    }
157
158    /// Create a broadcast message
159    pub fn broadcast(
160        from: impl Into<String>,
161        message_type: impl Into<String>,
162        payload: Value,
163    ) -> Self {
164        Self::new(from, MessageTarget::Broadcast, message_type, payload)
165    }
166
167    /// Set the priority
168    pub fn with_priority(mut self, priority: u8) -> Self {
169        self.priority = priority;
170        self
171    }
172
173    /// Set whether response is required
174    pub fn with_requires_response(mut self, requires: bool) -> Self {
175        self.requires_response = requires;
176        self
177    }
178
179    /// Set the response_to_id
180    pub fn with_response_to(mut self, id: impl Into<String>) -> Self {
181        self.response_to_id = Some(id.into());
182        self
183    }
184
185    /// Set expiration time
186    pub fn with_expiration(mut self, expires_at: DateTime<Utc>) -> Self {
187        self.expires_at = Some(expires_at);
188        self
189    }
190
191    /// Set expiration duration from now
192    pub fn expires_in(mut self, duration: Duration) -> Self {
193        self.expires_at = Some(Utc::now() + duration);
194        self
195    }
196
197    /// Check if the message has expired
198    pub fn is_expired(&self) -> bool {
199        self.expires_at.map(|exp| Utc::now() > exp).unwrap_or(false)
200    }
201}
202
203/// Wrapper for priority queue ordering (higher priority first)
204#[derive(Debug, Clone)]
205struct PrioritizedMessage {
206    message: AgentMessage,
207}
208
209impl PartialEq for PrioritizedMessage {
210    fn eq(&self, other: &Self) -> bool {
211        self.message.id == other.message.id
212    }
213}
214
215impl Eq for PrioritizedMessage {}
216
217impl PartialOrd for PrioritizedMessage {
218    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
219        Some(self.cmp(other))
220    }
221}
222
223impl Ord for PrioritizedMessage {
224    fn cmp(&self, other: &Self) -> Ordering {
225        // Higher priority first, then earlier timestamp
226        match self.message.priority.cmp(&other.message.priority) {
227            Ordering::Equal => other.message.timestamp.cmp(&self.message.timestamp),
228            other => other,
229        }
230    }
231}
232
233/// Message subscription configuration
234#[derive(Debug, Clone)]
235pub struct MessageSubscription {
236    /// Agent ID
237    pub agent_id: String,
238    /// Message types to subscribe to (empty = all types)
239    pub message_types: Vec<String>,
240    /// Whether the subscription is active
241    pub active: bool,
242}
243
244impl MessageSubscription {
245    /// Create a new subscription
246    pub fn new(agent_id: impl Into<String>) -> Self {
247        Self {
248            agent_id: agent_id.into(),
249            message_types: Vec::new(),
250            active: true,
251        }
252    }
253
254    /// Subscribe to specific message types
255    pub fn with_types(mut self, types: Vec<String>) -> Self {
256        self.message_types = types;
257        self
258    }
259
260    /// Check if this subscription matches a message type
261    pub fn matches(&self, message_type: &str) -> bool {
262        self.active
263            && (self.message_types.is_empty()
264                || self.message_types.contains(&message_type.to_string()))
265    }
266}
267
268/// Pending request waiting for a response
269#[derive(Debug)]
270#[allow(dead_code)]
271struct PendingRequest {
272    /// Request message ID
273    request_id: String,
274    /// Sender of the request
275    from: String,
276    /// Target agent
277    to: String,
278    /// When the request was sent
279    sent_at: DateTime<Utc>,
280    /// When the request expires
281    pub expires_at: DateTime<Utc>,
282    /// Channel to send the response
283    pub response_sender: Option<oneshot::Sender<Value>>,
284}
285
286/// Agent Message Bus for inter-agent communication
287#[derive(Debug)]
288pub struct AgentMessageBus {
289    /// Message queues per agent (using priority heap)
290    message_queues: HashMap<String, BinaryHeap<PrioritizedMessage>>,
291    /// Subscriptions per agent
292    subscriptions: HashMap<String, MessageSubscription>,
293    /// Message history for debugging
294    message_history: VecDeque<AgentMessage>,
295    /// Maximum history size
296    max_history_size: usize,
297    /// Maximum queue size per agent
298    max_queue_size: usize,
299    /// Pending requests waiting for responses (request_id -> PendingRequest)
300    pending_requests: HashMap<String, PendingRequest>,
301}
302
303impl Default for AgentMessageBus {
304    fn default() -> Self {
305        Self::new()
306    }
307}
308
309impl AgentMessageBus {
310    /// Create a new message bus with default settings
311    pub fn new() -> Self {
312        Self {
313            message_queues: HashMap::new(),
314            subscriptions: HashMap::new(),
315            message_history: VecDeque::new(),
316            max_history_size: 1000,
317            max_queue_size: 100,
318            pending_requests: HashMap::new(),
319        }
320    }
321
322    /// Create a new message bus with custom settings
323    pub fn with_config(max_history_size: usize, max_queue_size: usize) -> Self {
324        Self {
325            message_queues: HashMap::new(),
326            subscriptions: HashMap::new(),
327            message_history: VecDeque::new(),
328            max_history_size,
329            max_queue_size,
330            pending_requests: HashMap::new(),
331        }
332    }
333
334    /// Subscribe an agent to receive messages
335    pub fn subscribe(&mut self, agent_id: impl Into<String>, types: Vec<String>) {
336        let agent_id = agent_id.into();
337        let subscription = MessageSubscription::new(&agent_id).with_types(types);
338        self.subscriptions.insert(agent_id.clone(), subscription);
339        // Ensure queue exists
340        self.message_queues.entry(agent_id).or_default();
341    }
342
343    /// Unsubscribe an agent
344    pub fn unsubscribe(&mut self, agent_id: &str) {
345        if let Some(sub) = self.subscriptions.get_mut(agent_id) {
346            sub.active = false;
347        }
348    }
349
350    /// Check if an agent is subscribed
351    pub fn is_subscribed(&self, agent_id: &str) -> bool {
352        self.subscriptions
353            .get(agent_id)
354            .map(|s| s.active)
355            .unwrap_or(false)
356    }
357
358    /// Get subscription for an agent
359    pub fn get_subscription(&self, agent_id: &str) -> Option<&MessageSubscription> {
360        self.subscriptions.get(agent_id)
361    }
362
363    /// Send a message to a specific agent or broadcast
364    pub fn send(&mut self, message: AgentMessage) -> MessageBusResult<()> {
365        // Check if message has expired
366        if message.is_expired() {
367            return Err(MessageBusError::MessageExpired(message.id.clone()));
368        }
369
370        // Add to history
371        self.add_to_history(message.clone());
372
373        // Clone target to avoid borrow issues
374        let target = message.to.clone();
375        match target {
376            MessageTarget::Agent(agent_id) => {
377                self.deliver_to_agent(&agent_id, message)?;
378            }
379            MessageTarget::Broadcast => {
380                self.broadcast_message(message)?;
381            }
382            MessageTarget::Multiple(agent_ids) => {
383                for agent_id in &agent_ids {
384                    // Clone message for each recipient
385                    self.deliver_to_agent(agent_id, message.clone())?;
386                }
387            }
388        }
389
390        Ok(())
391    }
392
393    /// Broadcast a message to all subscribed agents
394    pub fn broadcast(
395        &mut self,
396        message_type: &str,
397        payload: Value,
398        sender: &str,
399    ) -> MessageBusResult<()> {
400        let message = AgentMessage::broadcast(sender, message_type, payload);
401        self.send(message)
402    }
403
404    /// Deliver a message to a specific agent
405    fn deliver_to_agent(&mut self, agent_id: &str, message: AgentMessage) -> MessageBusResult<()> {
406        // Ensure queue exists
407        let queue = self.message_queues.entry(agent_id.to_string()).or_default();
408
409        // Check queue size limit
410        if queue.len() >= self.max_queue_size {
411            return Err(MessageBusError::QueueFull(agent_id.to_string()));
412        }
413
414        queue.push(PrioritizedMessage { message });
415        Ok(())
416    }
417
418    /// Broadcast message to all subscribed agents
419    fn broadcast_message(&mut self, message: AgentMessage) -> MessageBusResult<()> {
420        let message_type = &message.message_type;
421        let sender = &message.from;
422
423        // Collect matching agents first to avoid borrow issues
424        let matching_agents: Vec<String> = self
425            .subscriptions
426            .iter()
427            .filter(|(agent_id, sub)| sub.matches(message_type) && *agent_id != sender)
428            .map(|(agent_id, _)| agent_id.clone())
429            .collect();
430
431        // Deliver to each matching agent
432        for agent_id in matching_agents {
433            self.deliver_to_agent(&agent_id, message.clone())?;
434        }
435
436        Ok(())
437    }
438
439    /// Get all messages in an agent's queue (without removing)
440    pub fn get_queue(&self, agent_id: &str) -> Vec<AgentMessage> {
441        self.message_queues
442            .get(agent_id)
443            .map(|heap| {
444                let mut messages: Vec<_> = heap.iter().map(|pm| pm.message.clone()).collect();
445                // Sort by priority (highest first) then timestamp (earliest first)
446                messages.sort_by(|a, b| match b.priority.cmp(&a.priority) {
447                    Ordering::Equal => a.timestamp.cmp(&b.timestamp),
448                    other => other,
449                });
450                messages
451            })
452            .unwrap_or_default()
453    }
454
455    /// Dequeue messages from an agent's queue (removes them)
456    pub fn dequeue(&mut self, agent_id: &str, count: usize) -> Vec<AgentMessage> {
457        let queue = match self.message_queues.get_mut(agent_id) {
458            Some(q) => q,
459            None => return Vec::new(),
460        };
461
462        let mut messages = Vec::with_capacity(count.min(queue.len()));
463        for _ in 0..count {
464            if let Some(pm) = queue.pop() {
465                // Skip expired messages
466                if !pm.message.is_expired() {
467                    messages.push(pm.message);
468                }
469            } else {
470                break;
471            }
472        }
473        messages
474    }
475
476    /// Dequeue all messages from an agent's queue
477    pub fn dequeue_all(&mut self, agent_id: &str) -> Vec<AgentMessage> {
478        let queue = match self.message_queues.get_mut(agent_id) {
479            Some(q) => q,
480            None => return Vec::new(),
481        };
482
483        let mut messages = Vec::with_capacity(queue.len());
484        while let Some(pm) = queue.pop() {
485            if !pm.message.is_expired() {
486                messages.push(pm.message);
487            }
488        }
489        messages
490    }
491
492    /// Get the number of messages in an agent's queue
493    pub fn queue_size(&self, agent_id: &str) -> usize {
494        self.message_queues
495            .get(agent_id)
496            .map(|q| q.len())
497            .unwrap_or(0)
498    }
499
500    /// Check if an agent has pending messages
501    pub fn has_messages(&self, agent_id: &str) -> bool {
502        self.queue_size(agent_id) > 0
503    }
504
505    /// Add a message to history
506    fn add_to_history(&mut self, message: AgentMessage) {
507        self.message_history.push_back(message);
508        while self.message_history.len() > self.max_history_size {
509            self.message_history.pop_front();
510        }
511    }
512
513    /// Get message history
514    pub fn get_history(&self, limit: Option<usize>) -> Vec<AgentMessage> {
515        let limit = limit.unwrap_or(self.message_history.len());
516        self.message_history
517            .iter()
518            .rev()
519            .take(limit)
520            .cloned()
521            .collect()
522    }
523
524    /// Clear message history
525    pub fn clear_history(&mut self) {
526        self.message_history.clear();
527    }
528
529    /// Get all subscribed agent IDs
530    pub fn get_subscribed_agents(&self) -> Vec<String> {
531        self.subscriptions
532            .iter()
533            .filter(|(_, sub)| sub.active)
534            .map(|(id, _)| id.clone())
535            .collect()
536    }
537
538    /// Remove expired messages from all queues
539    pub fn cleanup_expired(&mut self) -> usize {
540        let mut removed = 0;
541        for queue in self.message_queues.values_mut() {
542            let before = queue.len();
543            let messages: Vec<_> = queue
544                .drain()
545                .filter(|pm| !pm.message.is_expired())
546                .collect();
547            removed += before - messages.len();
548            for msg in messages {
549                queue.push(msg);
550            }
551        }
552        removed
553    }
554
555    /// Get statistics about the message bus
556    pub fn stats(&self) -> MessageBusStats {
557        let total_queued: usize = self.message_queues.values().map(|q| q.len()).sum();
558        MessageBusStats {
559            subscribed_agents: self.subscriptions.iter().filter(|(_, s)| s.active).count(),
560            total_queued_messages: total_queued,
561            history_size: self.message_history.len(),
562            max_history_size: self.max_history_size,
563            max_queue_size: self.max_queue_size,
564        }
565    }
566
567    /// Send a request message and wait for a response with timeout
568    ///
569    /// This method sends a message to the target agent with `requires_response` set to true,
570    /// and waits for a response within the specified timeout duration.
571    ///
572    /// # Arguments
573    /// * `to` - Target agent ID
574    /// * `message_type` - Type of the message
575    /// * `payload` - Message payload
576    /// * `from` - Sender agent ID
577    /// * `timeout` - Maximum time to wait for a response
578    ///
579    /// # Returns
580    /// * `Ok(Value)` - The response payload
581    /// * `Err(MessageBusError::RequestTimeout)` - If no response is received within timeout
582    /// * `Err(MessageBusError::MessageExpired)` - If the message expires before delivery
583    pub fn prepare_request(
584        &mut self,
585        to: &str,
586        message_type: &str,
587        payload: Value,
588        from: &str,
589        timeout: Duration,
590    ) -> MessageBusResult<(String, oneshot::Receiver<Value>)> {
591        let expires_at = Utc::now() + timeout;
592
593        // Create the request message
594        let message = AgentMessage::new(
595            from,
596            MessageTarget::Agent(to.to_string()),
597            message_type,
598            payload,
599        )
600        .with_requires_response(true)
601        .with_expiration(expires_at);
602
603        let request_id = message.id.clone();
604
605        // Create a channel for the response
606        let (tx, rx) = oneshot::channel();
607
608        // Store the pending request
609        let pending = PendingRequest {
610            request_id: request_id.clone(),
611            from: from.to_string(),
612            to: to.to_string(),
613            sent_at: Utc::now(),
614            expires_at,
615            response_sender: Some(tx),
616        };
617        self.pending_requests.insert(request_id.clone(), pending);
618
619        // Send the message
620        self.send(message)?;
621
622        Ok((request_id, rx))
623    }
624
625    /// Send a response to a request message
626    ///
627    /// This method sends a response to a previously received request message.
628    /// The response is delivered to the original requester.
629    ///
630    /// # Arguments
631    /// * `request` - The original request message
632    /// * `payload` - Response payload
633    ///
634    /// # Returns
635    /// * `Ok(())` - If the response was sent successfully
636    /// * `Err(MessageBusError::InvalidMessage)` - If the request doesn't require a response
637    /// * `Err(MessageBusError::NoResponse)` - If no pending request was found
638    pub fn respond(&mut self, request: &AgentMessage, payload: Value) -> MessageBusResult<()> {
639        // Verify the request requires a response
640        if !request.requires_response {
641            return Err(MessageBusError::InvalidMessage(
642                "Request does not require a response".to_string(),
643            ));
644        }
645
646        // Check if there's a pending request
647        if let Some(mut pending) = self.pending_requests.remove(&request.id) {
648            // Check if the request has expired
649            if Utc::now() > pending.expires_at {
650                return Err(MessageBusError::RequestTimeout(request.id.clone()));
651            }
652
653            // Send the response through the channel
654            if let Some(sender) = pending.response_sender.take() {
655                sender
656                    .send(payload.clone())
657                    .map_err(|_| MessageBusError::ChannelClosed(request.id.clone()))?;
658            }
659
660            // Also create a response message for history/queue
661            let response_message = AgentMessage::new(
662                request.to.get_agent_id().unwrap_or_default(),
663                MessageTarget::Agent(request.from.clone()),
664                format!("{}_response", request.message_type),
665                payload,
666            )
667            .with_response_to(&request.id);
668
669            // Add to history
670            self.add_to_history(response_message.clone());
671
672            // Deliver to the original sender's queue
673            self.deliver_to_agent(&request.from, response_message)?;
674
675            Ok(())
676        } else {
677            Err(MessageBusError::NoResponse(request.id.clone()))
678        }
679    }
680
681    /// Check if a request is still pending
682    pub fn is_request_pending(&self, request_id: &str) -> bool {
683        self.pending_requests.contains_key(request_id)
684    }
685
686    /// Get the number of pending requests
687    pub fn pending_request_count(&self) -> usize {
688        self.pending_requests.len()
689    }
690
691    /// Cancel a pending request
692    ///
693    /// Removes the pending request without sending a response.
694    /// Returns true if the request was found and cancelled.
695    pub fn cancel_request(&mut self, request_id: &str) -> bool {
696        self.pending_requests.remove(request_id).is_some()
697    }
698
699    /// Cleanup expired pending requests
700    ///
701    /// Removes all pending requests that have exceeded their timeout.
702    /// Returns the number of expired requests removed.
703    pub fn cleanup_expired_requests(&mut self) -> usize {
704        let now = Utc::now();
705        let expired_ids: Vec<String> = self
706            .pending_requests
707            .iter()
708            .filter(|(_, req)| now > req.expires_at)
709            .map(|(id, _)| id.clone())
710            .collect();
711
712        let count = expired_ids.len();
713        for id in expired_ids {
714            self.pending_requests.remove(&id);
715        }
716        count
717    }
718
719    /// Get a response message from an agent's queue by request ID
720    ///
721    /// Searches the agent's queue for a response to the specified request.
722    /// Returns and removes the response message if found.
723    pub fn get_response(&mut self, agent_id: &str, request_id: &str) -> Option<AgentMessage> {
724        let queue = self.message_queues.get_mut(agent_id)?;
725
726        // Find and remove the response message
727        let messages: Vec<PrioritizedMessage> = queue.drain().collect();
728        let mut response = None;
729        let mut remaining = Vec::new();
730
731        for pm in messages {
732            if pm.message.response_to_id.as_deref() == Some(request_id) {
733                response = Some(pm.message);
734            } else {
735                remaining.push(pm);
736            }
737        }
738
739        // Put back the remaining messages
740        for pm in remaining {
741            queue.push(pm);
742        }
743
744        response
745    }
746
747    /// Find a message in history by ID
748    pub fn find_message_in_history(&self, message_id: &str) -> Option<&AgentMessage> {
749        self.message_history.iter().find(|m| m.id == message_id)
750    }
751
752    /// Get all response messages for a specific request from history
753    pub fn get_responses_from_history(&self, request_id: &str) -> Vec<&AgentMessage> {
754        self.message_history
755            .iter()
756            .filter(|m| m.response_to_id.as_deref() == Some(request_id))
757            .collect()
758    }
759}
760
761/// Statistics about the message bus
762#[derive(Debug, Clone, Serialize, Deserialize)]
763#[serde(rename_all = "camelCase")]
764pub struct MessageBusStats {
765    /// Number of subscribed agents
766    pub subscribed_agents: usize,
767    /// Total messages across all queues
768    pub total_queued_messages: usize,
769    /// Current history size
770    pub history_size: usize,
771    /// Maximum history size
772    pub max_history_size: usize,
773    /// Maximum queue size per agent
774    pub max_queue_size: usize,
775}
776
777#[cfg(test)]
778mod tests {
779    use super::*;
780    use serde_json::json;
781    use tokio::sync::oneshot;
782
783    #[test]
784    fn test_message_creation() {
785        let msg = AgentMessage::new(
786            "agent-1",
787            MessageTarget::Agent("agent-2".to_string()),
788            "test-type",
789            json!({"data": "value"}),
790        );
791
792        assert!(!msg.id.is_empty());
793        assert_eq!(msg.from, "agent-1");
794        assert_eq!(msg.to, MessageTarget::Agent("agent-2".to_string()));
795        assert_eq!(msg.message_type, "test-type");
796        assert_eq!(msg.priority, MessagePriority::Normal as u8);
797        assert!(!msg.requires_response);
798        assert!(msg.response_to_id.is_none());
799        assert!(msg.expires_at.is_none());
800    }
801
802    #[test]
803    fn test_message_broadcast_creation() {
804        let msg = AgentMessage::broadcast("agent-1", "broadcast-type", json!({"key": "value"}));
805
806        assert_eq!(msg.to, MessageTarget::Broadcast);
807        assert_eq!(msg.message_type, "broadcast-type");
808    }
809
810    #[test]
811    fn test_message_with_priority() {
812        let msg = AgentMessage::new(
813            "agent-1",
814            MessageTarget::Agent("agent-2".to_string()),
815            "test",
816            json!({}),
817        )
818        .with_priority(MessagePriority::Critical as u8);
819
820        assert_eq!(msg.priority, MessagePriority::Critical as u8);
821    }
822
823    #[test]
824    fn test_message_expiration() {
825        let expired_msg = AgentMessage::new(
826            "agent-1",
827            MessageTarget::Agent("agent-2".to_string()),
828            "test",
829            json!({}),
830        )
831        .with_expiration(Utc::now() - Duration::seconds(10));
832
833        assert!(expired_msg.is_expired());
834
835        let valid_msg = AgentMessage::new(
836            "agent-1",
837            MessageTarget::Agent("agent-2".to_string()),
838            "test",
839            json!({}),
840        )
841        .expires_in(Duration::hours(1));
842
843        assert!(!valid_msg.is_expired());
844    }
845
846    #[test]
847    fn test_message_bus_subscribe() {
848        let mut bus = AgentMessageBus::new();
849        bus.subscribe("agent-1", vec!["type-a".to_string(), "type-b".to_string()]);
850
851        assert!(bus.is_subscribed("agent-1"));
852        assert!(!bus.is_subscribed("agent-2"));
853
854        let sub = bus.get_subscription("agent-1").unwrap();
855        assert!(sub.matches("type-a"));
856        assert!(sub.matches("type-b"));
857        assert!(!sub.matches("type-c"));
858    }
859
860    #[test]
861    fn test_message_bus_subscribe_all_types() {
862        let mut bus = AgentMessageBus::new();
863        bus.subscribe("agent-1", vec![]); // Empty = all types
864
865        let sub = bus.get_subscription("agent-1").unwrap();
866        assert!(sub.matches("any-type"));
867        assert!(sub.matches("another-type"));
868    }
869
870    #[test]
871    fn test_message_bus_unsubscribe() {
872        let mut bus = AgentMessageBus::new();
873        bus.subscribe("agent-1", vec![]);
874        assert!(bus.is_subscribed("agent-1"));
875
876        bus.unsubscribe("agent-1");
877        assert!(!bus.is_subscribed("agent-1"));
878    }
879
880    #[test]
881    fn test_message_bus_send_to_agent() {
882        let mut bus = AgentMessageBus::new();
883        bus.subscribe("agent-2", vec![]);
884
885        let msg = AgentMessage::new(
886            "agent-1",
887            MessageTarget::Agent("agent-2".to_string()),
888            "test",
889            json!({"data": 123}),
890        );
891
892        bus.send(msg).unwrap();
893
894        assert_eq!(bus.queue_size("agent-2"), 1);
895        assert!(bus.has_messages("agent-2"));
896    }
897
898    #[test]
899    fn test_message_bus_broadcast() {
900        let mut bus = AgentMessageBus::new();
901        bus.subscribe("agent-1", vec!["broadcast-type".to_string()]);
902        bus.subscribe("agent-2", vec!["broadcast-type".to_string()]);
903        bus.subscribe("agent-3", vec!["other-type".to_string()]);
904
905        bus.broadcast("broadcast-type", json!({"msg": "hello"}), "sender")
906            .unwrap();
907
908        // agent-1 and agent-2 should receive (subscribed to broadcast-type)
909        // agent-3 should not receive (subscribed to other-type)
910        assert_eq!(bus.queue_size("agent-1"), 1);
911        assert_eq!(bus.queue_size("agent-2"), 1);
912        assert_eq!(bus.queue_size("agent-3"), 0);
913    }
914
915    #[test]
916    fn test_message_bus_priority_ordering() {
917        let mut bus = AgentMessageBus::new();
918        bus.subscribe("agent-1", vec![]);
919
920        // Send messages with different priorities
921        let low = AgentMessage::new(
922            "sender",
923            MessageTarget::Agent("agent-1".to_string()),
924            "test",
925            json!({"priority": "low"}),
926        )
927        .with_priority(MessagePriority::Low as u8);
928
929        let high = AgentMessage::new(
930            "sender",
931            MessageTarget::Agent("agent-1".to_string()),
932            "test",
933            json!({"priority": "high"}),
934        )
935        .with_priority(MessagePriority::High as u8);
936
937        let normal = AgentMessage::new(
938            "sender",
939            MessageTarget::Agent("agent-1".to_string()),
940            "test",
941            json!({"priority": "normal"}),
942        )
943        .with_priority(MessagePriority::Normal as u8);
944
945        let critical = AgentMessage::new(
946            "sender",
947            MessageTarget::Agent("agent-1".to_string()),
948            "test",
949            json!({"priority": "critical"}),
950        )
951        .with_priority(MessagePriority::Critical as u8);
952
953        // Send in random order
954        bus.send(low).unwrap();
955        bus.send(high).unwrap();
956        bus.send(normal).unwrap();
957        bus.send(critical).unwrap();
958
959        // Dequeue should return in priority order
960        let messages = bus.dequeue("agent-1", 4);
961        assert_eq!(messages.len(), 4);
962        assert_eq!(messages[0].priority, MessagePriority::Critical as u8);
963        assert_eq!(messages[1].priority, MessagePriority::High as u8);
964        assert_eq!(messages[2].priority, MessagePriority::Normal as u8);
965        assert_eq!(messages[3].priority, MessagePriority::Low as u8);
966    }
967
968    #[test]
969    fn test_message_bus_dequeue() {
970        let mut bus = AgentMessageBus::new();
971        bus.subscribe("agent-1", vec![]);
972
973        for i in 0..5 {
974            let msg = AgentMessage::new(
975                "sender",
976                MessageTarget::Agent("agent-1".to_string()),
977                "test",
978                json!({"index": i}),
979            );
980            bus.send(msg).unwrap();
981        }
982
983        assert_eq!(bus.queue_size("agent-1"), 5);
984
985        let messages = bus.dequeue("agent-1", 3);
986        assert_eq!(messages.len(), 3);
987        assert_eq!(bus.queue_size("agent-1"), 2);
988
989        let remaining = bus.dequeue_all("agent-1");
990        assert_eq!(remaining.len(), 2);
991        assert_eq!(bus.queue_size("agent-1"), 0);
992    }
993
994    #[test]
995    fn test_message_bus_queue_full() {
996        let mut bus = AgentMessageBus::with_config(100, 2); // Max 2 messages per queue
997        bus.subscribe("agent-1", vec![]);
998
999        let msg1 = AgentMessage::new(
1000            "sender",
1001            MessageTarget::Agent("agent-1".to_string()),
1002            "test",
1003            json!({}),
1004        );
1005        let msg2 = AgentMessage::new(
1006            "sender",
1007            MessageTarget::Agent("agent-1".to_string()),
1008            "test",
1009            json!({}),
1010        );
1011        let msg3 = AgentMessage::new(
1012            "sender",
1013            MessageTarget::Agent("agent-1".to_string()),
1014            "test",
1015            json!({}),
1016        );
1017
1018        bus.send(msg1).unwrap();
1019        bus.send(msg2).unwrap();
1020
1021        // Third message should fail
1022        let result = bus.send(msg3);
1023        assert!(matches!(result, Err(MessageBusError::QueueFull(_))));
1024    }
1025
1026    #[test]
1027    fn test_message_bus_expired_message() {
1028        let mut bus = AgentMessageBus::new();
1029        bus.subscribe("agent-1", vec![]);
1030
1031        let expired = AgentMessage::new(
1032            "sender",
1033            MessageTarget::Agent("agent-1".to_string()),
1034            "test",
1035            json!({}),
1036        )
1037        .with_expiration(Utc::now() - Duration::seconds(10));
1038
1039        let result = bus.send(expired);
1040        assert!(matches!(result, Err(MessageBusError::MessageExpired(_))));
1041    }
1042
1043    #[test]
1044    fn test_message_bus_history() {
1045        let mut bus = AgentMessageBus::with_config(5, 100); // Max 5 history entries
1046        bus.subscribe("agent-1", vec![]);
1047
1048        for i in 0..10 {
1049            let msg = AgentMessage::new(
1050                "sender",
1051                MessageTarget::Agent("agent-1".to_string()),
1052                "test",
1053                json!({"index": i}),
1054            );
1055            bus.send(msg).unwrap();
1056        }
1057
1058        let history = bus.get_history(None);
1059        assert_eq!(history.len(), 5); // Limited to max_history_size
1060
1061        let limited = bus.get_history(Some(3));
1062        assert_eq!(limited.len(), 3);
1063    }
1064
1065    #[test]
1066    fn test_message_bus_stats() {
1067        let mut bus = AgentMessageBus::new();
1068        bus.subscribe("agent-1", vec![]);
1069        bus.subscribe("agent-2", vec![]);
1070
1071        let msg = AgentMessage::new(
1072            "sender",
1073            MessageTarget::Agent("agent-1".to_string()),
1074            "test",
1075            json!({}),
1076        );
1077        bus.send(msg).unwrap();
1078
1079        let stats = bus.stats();
1080        assert_eq!(stats.subscribed_agents, 2);
1081        assert_eq!(stats.total_queued_messages, 1);
1082        assert_eq!(stats.history_size, 1);
1083    }
1084
1085    #[test]
1086    fn test_message_bus_get_subscribed_agents() {
1087        let mut bus = AgentMessageBus::new();
1088        bus.subscribe("agent-1", vec![]);
1089        bus.subscribe("agent-2", vec![]);
1090        bus.subscribe("agent-3", vec![]);
1091        bus.unsubscribe("agent-2");
1092
1093        let agents = bus.get_subscribed_agents();
1094        assert_eq!(agents.len(), 2);
1095        assert!(agents.contains(&"agent-1".to_string()));
1096        assert!(agents.contains(&"agent-3".to_string()));
1097        assert!(!agents.contains(&"agent-2".to_string()));
1098    }
1099
1100    #[test]
1101    fn test_prepare_request() {
1102        let mut bus = AgentMessageBus::new();
1103        bus.subscribe("agent-1", vec![]);
1104        bus.subscribe("agent-2", vec![]);
1105
1106        let (request_id, _rx) = bus
1107            .prepare_request(
1108                "agent-2",
1109                "query",
1110                json!({"question": "hello?"}),
1111                "agent-1",
1112                Duration::seconds(30),
1113            )
1114            .unwrap();
1115
1116        // Request should be pending
1117        assert!(bus.is_request_pending(&request_id));
1118        assert_eq!(bus.pending_request_count(), 1);
1119
1120        // Message should be in agent-2's queue
1121        assert_eq!(bus.queue_size("agent-2"), 1);
1122
1123        let messages = bus.get_queue("agent-2");
1124        assert_eq!(messages[0].message_type, "query");
1125        assert!(messages[0].requires_response);
1126    }
1127
1128    #[test]
1129    fn test_respond_to_request() {
1130        let mut bus = AgentMessageBus::new();
1131        bus.subscribe("agent-1", vec![]);
1132        bus.subscribe("agent-2", vec![]);
1133
1134        // Prepare a request
1135        let (request_id, _rx) = bus
1136            .prepare_request(
1137                "agent-2",
1138                "query",
1139                json!({"question": "hello?"}),
1140                "agent-1",
1141                Duration::seconds(30),
1142            )
1143            .unwrap();
1144
1145        // Get the request message from agent-2's queue
1146        let messages = bus.dequeue("agent-2", 1);
1147        let request = &messages[0];
1148
1149        // Respond to the request
1150        bus.respond(request, json!({"answer": "world!"})).unwrap();
1151
1152        // Request should no longer be pending
1153        assert!(!bus.is_request_pending(&request_id));
1154
1155        // Response should be in agent-1's queue
1156        assert_eq!(bus.queue_size("agent-1"), 1);
1157
1158        let responses = bus.get_queue("agent-1");
1159        assert_eq!(responses[0].message_type, "query_response");
1160        assert_eq!(responses[0].response_to_id, Some(request_id));
1161    }
1162
1163    #[test]
1164    fn test_respond_to_non_request() {
1165        let mut bus = AgentMessageBus::new();
1166        bus.subscribe("agent-1", vec![]);
1167        bus.subscribe("agent-2", vec![]);
1168
1169        // Send a regular message (not a request)
1170        let msg = AgentMessage::new(
1171            "agent-1",
1172            MessageTarget::Agent("agent-2".to_string()),
1173            "info",
1174            json!({"data": "test"}),
1175        );
1176        bus.send(msg).unwrap();
1177
1178        // Get the message
1179        let messages = bus.dequeue("agent-2", 1);
1180        let message = &messages[0];
1181
1182        // Trying to respond should fail
1183        let result = bus.respond(message, json!({"response": "test"}));
1184        assert!(matches!(result, Err(MessageBusError::InvalidMessage(_))));
1185    }
1186
1187    #[test]
1188    fn test_cancel_request() {
1189        let mut bus = AgentMessageBus::new();
1190        bus.subscribe("agent-1", vec![]);
1191        bus.subscribe("agent-2", vec![]);
1192
1193        let (request_id, _rx) = bus
1194            .prepare_request(
1195                "agent-2",
1196                "query",
1197                json!({}),
1198                "agent-1",
1199                Duration::seconds(30),
1200            )
1201            .unwrap();
1202
1203        assert!(bus.is_request_pending(&request_id));
1204
1205        // Cancel the request
1206        assert!(bus.cancel_request(&request_id));
1207        assert!(!bus.is_request_pending(&request_id));
1208
1209        // Cancelling again should return false
1210        assert!(!bus.cancel_request(&request_id));
1211    }
1212
1213    #[test]
1214    fn test_cleanup_expired_requests() {
1215        let mut bus = AgentMessageBus::new();
1216        bus.subscribe("agent-1", vec![]);
1217        bus.subscribe("agent-2", vec![]);
1218
1219        // Create a request with very short timeout (already expired)
1220        let expires_at = Utc::now() - Duration::seconds(1);
1221        let message = AgentMessage::new(
1222            "agent-1",
1223            MessageTarget::Agent("agent-2".to_string()),
1224            "query",
1225            json!({}),
1226        )
1227        .with_requires_response(true)
1228        .with_expiration(expires_at);
1229
1230        let request_id = message.id.clone();
1231        let (tx, _rx) = oneshot::channel();
1232
1233        // Manually insert an expired pending request
1234        bus.pending_requests.insert(
1235            request_id.clone(),
1236            PendingRequest {
1237                request_id: request_id.clone(),
1238                from: "agent-1".to_string(),
1239                to: "agent-2".to_string(),
1240                sent_at: Utc::now() - Duration::seconds(10),
1241                expires_at,
1242                response_sender: Some(tx),
1243            },
1244        );
1245
1246        assert_eq!(bus.pending_request_count(), 1);
1247
1248        // Cleanup expired requests
1249        let cleaned = bus.cleanup_expired_requests();
1250        assert_eq!(cleaned, 1);
1251        assert_eq!(bus.pending_request_count(), 0);
1252    }
1253
1254    #[test]
1255    fn test_get_response_from_queue() {
1256        let mut bus = AgentMessageBus::new();
1257        bus.subscribe("agent-1", vec![]);
1258        bus.subscribe("agent-2", vec![]);
1259
1260        // Prepare a request
1261        let (request_id, _rx) = bus
1262            .prepare_request(
1263                "agent-2",
1264                "query",
1265                json!({}),
1266                "agent-1",
1267                Duration::seconds(30),
1268            )
1269            .unwrap();
1270
1271        // Get and respond to the request
1272        let messages = bus.dequeue("agent-2", 1);
1273        bus.respond(&messages[0], json!({"answer": "test"}))
1274            .unwrap();
1275
1276        // Get the response from agent-1's queue
1277        let response = bus.get_response("agent-1", &request_id);
1278        assert!(response.is_some());
1279        let response = response.unwrap();
1280        assert_eq!(response.response_to_id, Some(request_id.clone()));
1281
1282        // Response should be removed from queue
1283        assert!(bus.get_response("agent-1", &request_id).is_none());
1284    }
1285
1286    #[test]
1287    fn test_find_message_in_history() {
1288        let mut bus = AgentMessageBus::new();
1289        bus.subscribe("agent-1", vec![]);
1290
1291        let msg = AgentMessage::new(
1292            "sender",
1293            MessageTarget::Agent("agent-1".to_string()),
1294            "test",
1295            json!({}),
1296        );
1297        let msg_id = msg.id.clone();
1298        bus.send(msg).unwrap();
1299
1300        // Find the message in history
1301        let found = bus.find_message_in_history(&msg_id);
1302        assert!(found.is_some());
1303        assert_eq!(found.unwrap().id, msg_id);
1304
1305        // Non-existent message
1306        assert!(bus.find_message_in_history("non-existent").is_none());
1307    }
1308
1309    #[test]
1310    fn test_get_responses_from_history() {
1311        let mut bus = AgentMessageBus::new();
1312        bus.subscribe("agent-1", vec![]);
1313        bus.subscribe("agent-2", vec![]);
1314
1315        // Prepare a request
1316        let (request_id, _rx) = bus
1317            .prepare_request(
1318                "agent-2",
1319                "query",
1320                json!({}),
1321                "agent-1",
1322                Duration::seconds(30),
1323            )
1324            .unwrap();
1325
1326        // Respond to the request
1327        let messages = bus.dequeue("agent-2", 1);
1328        bus.respond(&messages[0], json!({"answer": "test"}))
1329            .unwrap();
1330
1331        // Get responses from history
1332        let responses = bus.get_responses_from_history(&request_id);
1333        assert_eq!(responses.len(), 1);
1334        assert_eq!(responses[0].response_to_id, Some(request_id));
1335    }
1336
1337    #[test]
1338    fn test_message_target_get_agent_id() {
1339        let agent_target = MessageTarget::Agent("agent-1".to_string());
1340        assert_eq!(agent_target.get_agent_id(), Some("agent-1".to_string()));
1341
1342        let broadcast_target = MessageTarget::Broadcast;
1343        assert_eq!(broadcast_target.get_agent_id(), None);
1344
1345        let multiple_target = MessageTarget::Multiple(vec!["a".to_string(), "b".to_string()]);
1346        assert_eq!(multiple_target.get_agent_id(), None);
1347    }
1348
1349    #[test]
1350    fn test_cleanup_expired_messages() {
1351        let mut bus = AgentMessageBus::new();
1352        bus.subscribe("agent-1", vec![]);
1353
1354        // Send a message that will expire
1355        let msg = AgentMessage::new(
1356            "sender",
1357            MessageTarget::Agent("agent-1".to_string()),
1358            "test",
1359            json!({}),
1360        )
1361        .with_expiration(Utc::now() - Duration::seconds(1)); // Already expired
1362
1363        // Manually add to queue (bypassing expiration check in send)
1364        bus.message_queues
1365            .entry("agent-1".to_string())
1366            .or_default()
1367            .push(PrioritizedMessage { message: msg });
1368
1369        assert_eq!(bus.queue_size("agent-1"), 1);
1370
1371        // Cleanup expired messages
1372        let removed = bus.cleanup_expired();
1373        assert_eq!(removed, 1);
1374        assert_eq!(bus.queue_size("agent-1"), 0);
1375    }
1376}