1use anyhow::Result;
4use crossbeam_channel::{Receiver, Sender};
5use dashmap::DashMap;
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use uuid::Uuid;
10
11use crate::core::AISession;
12
13const DEFAULT_CHANNEL_CAPACITY: usize = 1000;
15
16const BROADCAST_CHANNEL_CAPACITY: usize = 5000;
18
19const ALL_MESSAGES_CHANNEL_CAPACITY: usize = 10000;
21
22#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
24pub struct AgentId(Uuid);
25
26impl Default for AgentId {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl AgentId {
33 pub fn new() -> Self {
35 Self(Uuid::new_v4())
36 }
37}
38
39impl std::fmt::Display for AgentId {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 write!(f, "{}", self.0)
42 }
43}
44
45pub struct MultiAgentSession {
47 pub agents: Arc<DashMap<AgentId, Arc<AISession>>>,
49 pub message_bus: Arc<MessageBus>,
51 pub task_distributor: Arc<TaskDistributor>,
53 pub resource_manager: Arc<ResourceManager>,
55}
56
57impl Default for MultiAgentSession {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl MultiAgentSession {
64 pub fn new() -> Self {
66 Self {
67 agents: Arc::new(DashMap::new()),
68 message_bus: Arc::new(MessageBus::new()),
69 task_distributor: Arc::new(TaskDistributor::new()),
70 resource_manager: Arc::new(ResourceManager::new()),
71 }
72 }
73
74 pub fn register_agent(&self, agent_id: AgentId, session: Arc<AISession>) -> Result<()> {
76 self.agents.insert(agent_id.clone(), session);
77 self.message_bus.register_agent(agent_id)?;
78 Ok(())
79 }
80
81 pub fn unregister_agent(&self, agent_id: &AgentId) -> Result<()> {
83 self.agents.remove(agent_id);
84 self.message_bus.unregister_agent(agent_id)?;
85 Ok(())
86 }
87
88 pub fn get_agent(&self, agent_id: &AgentId) -> Option<Arc<AISession>> {
90 self.agents.get(agent_id).map(|entry| entry.clone())
91 }
92
93 pub fn list_agents(&self) -> Vec<AgentId> {
95 self.agents
96 .iter()
97 .map(|entry| entry.key().clone())
98 .collect()
99 }
100
101 pub async fn send_message(&self, from: AgentId, to: AgentId, message: Message) -> Result<()> {
103 self.message_bus.send_message(from, to, message)
104 }
105
106 pub async fn broadcast(&self, from: AgentId, message: BroadcastMessage) -> Result<()> {
108 self.message_bus.broadcast(from, message)
109 }
110}
111
112pub struct MessageBus {
114 channels: DashMap<AgentId, (Sender<Message>, Receiver<Message>)>,
116 broadcast_sender: Sender<BroadcastMessage>,
118 _broadcast_receiver: Receiver<BroadcastMessage>,
119 agent_channels: DashMap<AgentId, (Sender<AgentMessage>, Receiver<AgentMessage>)>,
121 all_messages_sender: Sender<AgentMessage>,
123 all_messages_receiver: Receiver<AgentMessage>,
124}
125
126impl Default for MessageBus {
127 fn default() -> Self {
128 Self::new()
129 }
130}
131
132impl MessageBus {
133 pub fn new() -> Self {
139 let (broadcast_sender, broadcast_receiver) =
140 crossbeam_channel::bounded(BROADCAST_CHANNEL_CAPACITY);
141 let (all_messages_sender, all_messages_receiver) =
142 crossbeam_channel::bounded(ALL_MESSAGES_CHANNEL_CAPACITY);
143 Self {
144 channels: DashMap::new(),
145 broadcast_sender,
146 _broadcast_receiver: broadcast_receiver,
147 agent_channels: DashMap::new(),
148 all_messages_sender,
149 all_messages_receiver,
150 }
151 }
152
153 pub fn register_agent(&self, agent_id: AgentId) -> Result<()> {
158 let (sender, receiver) = crossbeam_channel::bounded(DEFAULT_CHANNEL_CAPACITY);
159 self.channels.insert(agent_id.clone(), (sender, receiver));
160
161 let (agent_sender, agent_receiver) = crossbeam_channel::bounded(DEFAULT_CHANNEL_CAPACITY);
163 self.agent_channels
164 .insert(agent_id, (agent_sender, agent_receiver));
165 Ok(())
166 }
167
168 pub fn unregister_agent(&self, agent_id: &AgentId) -> Result<()> {
170 self.channels.remove(agent_id);
171 self.agent_channels.remove(agent_id);
172 Ok(())
173 }
174
175 pub fn send_message(&self, _from: AgentId, to: AgentId, message: Message) -> Result<()> {
180 if let Some(channel) = self.channels.get(&to) {
181 channel.0.try_send(message).map_err(|e| match e {
182 crossbeam_channel::TrySendError::Full(_) => {
183 anyhow::anyhow!("Agent {} channel is full (backpressure)", to)
184 }
185 crossbeam_channel::TrySendError::Disconnected(_) => {
186 anyhow::anyhow!("Agent {} channel disconnected", to)
187 }
188 })?;
189 Ok(())
190 } else {
191 Err(anyhow::anyhow!("Agent not found: {}", to))
192 }
193 }
194
195 pub fn broadcast(&self, _from: AgentId, message: BroadcastMessage) -> Result<()> {
199 self.broadcast_sender
200 .try_send(message)
201 .map_err(|e| match e {
202 crossbeam_channel::TrySendError::Full(_) => {
203 anyhow::anyhow!("Broadcast channel is full (backpressure)")
204 }
205 crossbeam_channel::TrySendError::Disconnected(_) => {
206 anyhow::anyhow!("Broadcast channel disconnected")
207 }
208 })?;
209 Ok(())
210 }
211
212 pub fn get_receiver(&self, agent_id: &AgentId) -> Option<Receiver<Message>> {
214 self.channels.get(agent_id).map(|entry| entry.1.clone())
215 }
216
217 pub fn subscribe_all(&self) -> Receiver<AgentMessage> {
219 self.all_messages_receiver.clone()
220 }
221
222 pub async fn publish_to_agent(&self, agent_id: &AgentId, message: AgentMessage) -> Result<()> {
227 if let Some(channel) = self.agent_channels.get(agent_id) {
229 channel.0.try_send(message.clone()).map_err(|e| match e {
230 crossbeam_channel::TrySendError::Full(_) => {
231 anyhow::anyhow!("Agent {} channel is full (backpressure)", agent_id)
232 }
233 crossbeam_channel::TrySendError::Disconnected(_) => {
234 anyhow::anyhow!("Agent {} channel disconnected", agent_id)
235 }
236 })?;
237 } else {
238 return Err(anyhow::anyhow!("Agent not found: {}", agent_id));
239 }
240
241 let _ = self.all_messages_sender.try_send(message);
244
245 Ok(())
246 }
247
248 pub fn get_agent_receiver(&self, agent_id: &AgentId) -> Option<Receiver<AgentMessage>> {
250 self.agent_channels
251 .get(agent_id)
252 .map(|entry| entry.1.clone())
253 }
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize)]
266pub enum MessageContent {
267 Registration {
269 agent_id: AgentId,
270 capabilities: Vec<String>,
271 metadata: serde_json::Value,
272 },
273 TaskAssignment {
275 task_id: TaskId,
276 agent_id: AgentId,
277 task_data: serde_json::Value,
278 },
279 TaskCompleted {
281 agent_id: AgentId,
282 task_id: TaskId,
283 result: serde_json::Value,
284 },
285 TaskProgress {
287 agent_id: AgentId,
288 task_id: TaskId,
289 progress: f32,
290 message: String,
291 },
292 HelpRequest {
294 agent_id: AgentId,
295 context: String,
296 priority: MessagePriority,
297 },
298 StatusUpdate {
300 agent_id: AgentId,
301 status: String,
302 metrics: serde_json::Value,
303 },
304 DataShare { data: serde_json::Value },
306 CoordinationRequest {
308 request_type: String,
309 data: serde_json::Value,
310 },
311 Response {
313 in_reply_to: Uuid,
314 data: serde_json::Value,
315 },
316 Custom {
318 message_type: String,
319 data: serde_json::Value,
320 },
321}
322
323#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct UnifiedMessage {
330 pub id: Uuid,
332 pub from: AgentId,
334 pub content: MessageContent,
336 pub timestamp: chrono::DateTime<chrono::Utc>,
338}
339
340impl UnifiedMessage {
341 pub fn new(from: AgentId, content: MessageContent) -> Self {
343 Self {
344 id: Uuid::new_v4(),
345 from,
346 content,
347 timestamp: chrono::Utc::now(),
348 }
349 }
350
351 pub fn from_legacy_message(msg: Message) -> Self {
353 let content = match msg.message_type {
354 MessageType::TaskAssignment => MessageContent::Custom {
355 message_type: "task_assignment".to_string(),
356 data: msg.payload,
357 },
358 MessageType::StatusUpdate => MessageContent::Custom {
359 message_type: "status_update".to_string(),
360 data: msg.payload,
361 },
362 MessageType::DataShare => MessageContent::DataShare { data: msg.payload },
363 MessageType::CoordinationRequest => MessageContent::CoordinationRequest {
364 request_type: "legacy".to_string(),
365 data: msg.payload,
366 },
367 MessageType::Response => MessageContent::Response {
368 in_reply_to: Uuid::nil(),
369 data: msg.payload,
370 },
371 MessageType::Custom(t) => MessageContent::Custom {
372 message_type: t,
373 data: msg.payload,
374 },
375 };
376
377 Self {
378 id: msg.id,
379 from: msg.from,
380 content,
381 timestamp: msg.timestamp,
382 }
383 }
384
385 pub fn from_agent_message(from: AgentId, msg: AgentMessage) -> Self {
387 let content = match msg {
388 AgentMessage::Registration {
389 agent_id,
390 capabilities,
391 metadata,
392 } => MessageContent::Registration {
393 agent_id,
394 capabilities,
395 metadata,
396 },
397 AgentMessage::TaskAssignment {
398 task_id,
399 agent_id,
400 task_data,
401 } => MessageContent::TaskAssignment {
402 task_id,
403 agent_id,
404 task_data,
405 },
406 AgentMessage::TaskCompleted {
407 agent_id,
408 task_id,
409 result,
410 } => MessageContent::TaskCompleted {
411 agent_id,
412 task_id,
413 result,
414 },
415 AgentMessage::TaskProgress {
416 agent_id,
417 task_id,
418 progress,
419 message,
420 } => MessageContent::TaskProgress {
421 agent_id,
422 task_id,
423 progress,
424 message,
425 },
426 AgentMessage::HelpRequest {
427 agent_id,
428 context,
429 priority,
430 } => MessageContent::HelpRequest {
431 agent_id,
432 context,
433 priority,
434 },
435 AgentMessage::StatusUpdate {
436 agent_id,
437 status,
438 metrics,
439 } => MessageContent::StatusUpdate {
440 agent_id,
441 status,
442 metrics,
443 },
444 AgentMessage::Custom { message_type, data } => {
445 MessageContent::Custom { message_type, data }
446 }
447 };
448
449 Self {
450 id: Uuid::new_v4(),
451 from,
452 content,
453 timestamp: chrono::Utc::now(),
454 }
455 }
456}
457
458#[derive(Debug, Clone, Serialize, Deserialize)]
467pub struct Message {
468 pub id: Uuid,
470 pub from: AgentId,
472 pub message_type: MessageType,
474 pub payload: serde_json::Value,
476 pub timestamp: chrono::DateTime<chrono::Utc>,
478}
479
480#[derive(Debug, Clone, Serialize, Deserialize)]
484pub enum MessageType {
485 TaskAssignment,
487 StatusUpdate,
489 DataShare,
491 CoordinationRequest,
493 Response,
495 Custom(String),
497}
498
499#[derive(Debug, Clone, Serialize, Deserialize)]
504pub enum AgentMessage {
505 Registration {
507 agent_id: AgentId,
508 capabilities: Vec<String>,
509 metadata: serde_json::Value,
510 },
511 TaskAssignment {
513 task_id: TaskId,
514 agent_id: AgentId,
515 task_data: serde_json::Value,
516 },
517 TaskCompleted {
519 agent_id: AgentId,
520 task_id: TaskId,
521 result: serde_json::Value,
522 },
523 TaskProgress {
525 agent_id: AgentId,
526 task_id: TaskId,
527 progress: f32,
528 message: String,
529 },
530 HelpRequest {
532 agent_id: AgentId,
533 context: String,
534 priority: MessagePriority,
535 },
536 StatusUpdate {
538 agent_id: AgentId,
539 status: String,
540 metrics: serde_json::Value,
541 },
542 Custom {
544 message_type: String,
545 data: serde_json::Value,
546 },
547}
548
549#[derive(Debug, Clone, Serialize, Deserialize)]
551pub struct BroadcastMessage {
552 pub id: Uuid,
554 pub from: AgentId,
556 pub content: String,
558 pub priority: MessagePriority,
560 pub timestamp: chrono::DateTime<chrono::Utc>,
562}
563
564#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
566pub enum MessagePriority {
567 Low,
568 Normal,
569 High,
570 Critical,
571}
572
573pub struct TaskDistributor {
575 task_queue: Arc<RwLock<Vec<Task>>>,
577 agent_capabilities: Arc<DashMap<AgentId, Vec<String>>>,
579 assignments: Arc<DashMap<TaskId, AgentId>>,
581}
582
583impl Default for TaskDistributor {
584 fn default() -> Self {
585 Self::new()
586 }
587}
588
589impl TaskDistributor {
590 pub fn new() -> Self {
592 Self {
593 task_queue: Arc::new(RwLock::new(Vec::new())),
594 agent_capabilities: Arc::new(DashMap::new()),
595 assignments: Arc::new(DashMap::new()),
596 }
597 }
598
599 pub fn register_capabilities(&self, agent_id: AgentId, capabilities: Vec<String>) {
601 self.agent_capabilities.insert(agent_id, capabilities);
602 }
603
604 pub async fn submit_task(&self, task: Task) -> Result<()> {
606 self.task_queue.write().await.push(task);
607 Ok(())
608 }
609
610 pub async fn distribute_tasks(&self) -> Result<Vec<(TaskId, AgentId)>> {
612 let mut assignments = Vec::new();
613 let mut queue = self.task_queue.write().await;
614
615 let agents: Vec<AgentId> = self
618 .agent_capabilities
619 .iter()
620 .map(|entry| entry.key().clone())
621 .collect();
622
623 if agents.is_empty() {
624 return Ok(assignments);
625 }
626
627 let mut agent_index = 0;
628 while let Some(task) = queue.pop() {
629 let agent_id = &agents[agent_index % agents.len()];
630 self.assignments.insert(task.id.clone(), agent_id.clone());
631 assignments.push((task.id, agent_id.clone()));
632 agent_index += 1;
633 }
634
635 Ok(assignments)
636 }
637}
638
639#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
641pub struct TaskId(Uuid);
642
643impl Default for TaskId {
644 fn default() -> Self {
645 Self::new()
646 }
647}
648
649impl TaskId {
650 pub fn new() -> Self {
652 Self(Uuid::new_v4())
653 }
654}
655
656impl std::fmt::Display for TaskId {
657 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
658 write!(f, "{}", self.0)
659 }
660}
661
662#[derive(Debug, Clone, Serialize, Deserialize)]
664pub struct Task {
665 pub id: TaskId,
667 pub name: String,
669 pub required_capabilities: Vec<String>,
671 pub payload: serde_json::Value,
673 pub priority: TaskPriority,
675 pub created_at: chrono::DateTime<chrono::Utc>,
677}
678
679#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
681pub enum TaskPriority {
682 Low,
683 Normal,
684 High,
685 Critical,
686}
687
688pub struct ResourceManager {
690 file_locks: Arc<DashMap<String, AgentId>>,
692 rate_limits: Arc<DashMap<String, RateLimit>>,
694 shared_memory: Arc<DashMap<String, Vec<u8>>>,
696}
697
698impl Default for ResourceManager {
699 fn default() -> Self {
700 Self::new()
701 }
702}
703
704impl ResourceManager {
705 pub fn new() -> Self {
707 Self {
708 file_locks: Arc::new(DashMap::new()),
709 rate_limits: Arc::new(DashMap::new()),
710 shared_memory: Arc::new(DashMap::new()),
711 }
712 }
713
714 pub fn acquire_file_lock(&self, path: &str, agent_id: AgentId) -> Result<()> {
716 match self.file_locks.entry(path.to_string()) {
717 dashmap::mapref::entry::Entry::Occupied(_) => {
718 Err(anyhow::anyhow!("File already locked: {}", path))
719 }
720 dashmap::mapref::entry::Entry::Vacant(entry) => {
721 entry.insert(agent_id);
722 Ok(())
723 }
724 }
725 }
726
727 pub fn release_file_lock(&self, path: &str, agent_id: &AgentId) -> Result<()> {
729 if let Some((_, owner)) = self.file_locks.remove(path)
730 && owner != *agent_id
731 {
732 return Err(anyhow::anyhow!("Not the lock owner"));
733 }
734 Ok(())
735 }
736
737 pub fn check_rate_limit(&self, resource: &str) -> bool {
742 if let Some(limit) = self.rate_limits.get(resource) {
743 limit.can_proceed()
744 } else {
745 true
746 }
747 }
748
749 pub fn set_rate_limit(
756 &self,
757 resource: &str,
758 max_requests: usize,
759 interval: std::time::Duration,
760 ) {
761 self.rate_limits
762 .insert(resource.to_string(), RateLimit::new(max_requests, interval));
763 }
764
765 pub fn rate_limit_remaining(&self, resource: &str) -> Option<usize> {
767 self.rate_limits
768 .get(resource)
769 .map(|limit| limit.remaining())
770 }
771
772 pub fn write_shared_memory(&self, key: &str, data: Vec<u8>) {
774 self.shared_memory.insert(key.to_string(), data);
775 }
776
777 pub fn read_shared_memory(&self, key: &str) -> Option<Vec<u8>> {
779 self.shared_memory.get(key).map(|entry| entry.clone())
780 }
781}
782
783#[derive(Debug, Clone)]
788pub struct RateLimit {
789 pub max_requests: usize,
791 pub interval: std::time::Duration,
793 current_count: Arc<std::sync::atomic::AtomicUsize>,
795 last_reset_nanos: Arc<std::sync::atomic::AtomicU64>,
797}
798
799impl RateLimit {
800 pub fn new(max_requests: usize, interval: std::time::Duration) -> Self {
806 let now = std::time::SystemTime::now()
807 .duration_since(std::time::UNIX_EPOCH)
808 .map(|d| d.as_nanos() as u64)
809 .unwrap_or(0);
810
811 Self {
812 max_requests,
813 interval,
814 current_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
815 last_reset_nanos: Arc::new(std::sync::atomic::AtomicU64::new(now)),
816 }
817 }
818
819 pub fn can_proceed(&self) -> bool {
824 use std::sync::atomic::Ordering;
825
826 let now_nanos = std::time::SystemTime::now()
827 .duration_since(std::time::UNIX_EPOCH)
828 .map(|d| d.as_nanos() as u64)
829 .unwrap_or(0);
830
831 let last_reset = self.last_reset_nanos.load(Ordering::Acquire);
832 let interval_nanos = self.interval.as_nanos() as u64;
833
834 if now_nanos.saturating_sub(last_reset) >= interval_nanos {
836 if self
838 .last_reset_nanos
839 .compare_exchange(last_reset, now_nanos, Ordering::AcqRel, Ordering::Relaxed)
840 .is_ok()
841 {
842 self.current_count.store(0, Ordering::Release);
844 }
845 }
846
847 let current = self.current_count.fetch_add(1, Ordering::AcqRel);
849
850 if current < self.max_requests {
851 true
852 } else {
853 self.current_count.fetch_sub(1, Ordering::AcqRel);
855 false
856 }
857 }
858
859 pub fn current_count(&self) -> usize {
861 self.current_count
862 .load(std::sync::atomic::Ordering::Acquire)
863 }
864
865 pub fn remaining(&self) -> usize {
867 let current = self.current_count();
868 self.max_requests.saturating_sub(current)
869 }
870
871 pub fn reset(&self) {
873 use std::sync::atomic::Ordering;
874
875 let now_nanos = std::time::SystemTime::now()
876 .duration_since(std::time::UNIX_EPOCH)
877 .map(|d| d.as_nanos() as u64)
878 .unwrap_or(0);
879
880 self.current_count.store(0, Ordering::Release);
881 self.last_reset_nanos.store(now_nanos, Ordering::Release);
882 }
883}
884
885#[cfg(test)]
886mod tests {
887 use super::*;
888
889 #[test]
890 fn test_multi_agent_session() {
891 let multi_session = MultiAgentSession::new();
892 let _agent_id = AgentId::new();
893
894 assert_eq!(multi_session.list_agents().len(), 0);
896 }
897
898 #[test]
899 fn test_message_bus() {
900 let bus = MessageBus::new();
901 let agent1 = AgentId::new();
902 let agent2 = AgentId::new();
903
904 bus.register_agent(agent1.clone()).unwrap();
905 bus.register_agent(agent2.clone()).unwrap();
906
907 let message = Message {
908 id: Uuid::new_v4(),
909 from: agent1.clone(),
910 message_type: MessageType::StatusUpdate,
911 payload: serde_json::json!({"status": "ready"}),
912 timestamp: chrono::Utc::now(),
913 };
914
915 bus.send_message(agent1, agent2.clone(), message).unwrap();
916
917 if let Some(receiver) = bus.get_receiver(&agent2) {
918 assert!(receiver.try_recv().is_ok());
919 }
920 }
921
922 #[tokio::test]
923 async fn test_agent_message_publish() {
924 let bus = MessageBus::new();
925 let agent1 = AgentId::new();
926 let agent2 = AgentId::new();
927
928 bus.register_agent(agent1.clone()).unwrap();
929 bus.register_agent(agent2.clone()).unwrap();
930
931 let all_receiver = bus.subscribe_all();
933
934 let registration_msg = AgentMessage::Registration {
936 agent_id: agent1.clone(),
937 capabilities: vec!["frontend".to_string(), "react".to_string()],
938 metadata: serde_json::json!({"version": "1.0"}),
939 };
940
941 bus.publish_to_agent(&agent2, registration_msg.clone())
943 .await
944 .unwrap();
945
946 if let Some(receiver) = bus.get_agent_receiver(&agent2) {
948 let received = receiver.try_recv().unwrap();
949 match received {
950 AgentMessage::Registration { agent_id, .. } => {
951 assert_eq!(agent_id, agent1);
952 }
953 _ => panic!("Unexpected message type"),
954 }
955 }
956
957 let all_msg = all_receiver.try_recv().unwrap();
959 match all_msg {
960 AgentMessage::Registration { agent_id, .. } => {
961 assert_eq!(agent_id, agent1);
962 }
963 _ => panic!("Unexpected message type"),
964 }
965 }
966
967 #[tokio::test]
968 async fn test_all_agent_message_variants() {
969 let bus = MessageBus::new();
970 let agent1 = AgentId::new();
971 bus.register_agent(agent1.clone()).unwrap();
972
973 let messages = vec![
975 AgentMessage::Registration {
976 agent_id: agent1.clone(),
977 capabilities: vec!["test".to_string()],
978 metadata: serde_json::json!({}),
979 },
980 AgentMessage::TaskAssignment {
981 task_id: TaskId::new(),
982 agent_id: agent1.clone(),
983 task_data: serde_json::json!({"task": "test"}),
984 },
985 AgentMessage::TaskCompleted {
986 agent_id: agent1.clone(),
987 task_id: TaskId::new(),
988 result: serde_json::json!({"success": true}),
989 },
990 AgentMessage::TaskProgress {
991 agent_id: agent1.clone(),
992 task_id: TaskId::new(),
993 progress: 0.5,
994 message: "Halfway done".to_string(),
995 },
996 AgentMessage::HelpRequest {
997 agent_id: agent1.clone(),
998 context: "Need help with React".to_string(),
999 priority: MessagePriority::High,
1000 },
1001 AgentMessage::StatusUpdate {
1002 agent_id: agent1.clone(),
1003 status: "active".to_string(),
1004 metrics: serde_json::json!({"cpu": 50, "memory": 1024}),
1005 },
1006 AgentMessage::Custom {
1007 message_type: "test_message".to_string(),
1008 data: serde_json::json!({"foo": "bar"}),
1009 },
1010 ];
1011
1012 for msg in messages {
1013 bus.publish_to_agent(&agent1, msg).await.unwrap();
1014 }
1015
1016 if let Some(receiver) = bus.get_agent_receiver(&agent1) {
1018 let mut count = 0;
1019 while receiver.try_recv().is_ok() {
1020 count += 1;
1021 }
1022 assert_eq!(count, 7); }
1024 }
1025
1026 #[test]
1027 fn test_rate_limit_basic() {
1028 let limit = RateLimit::new(3, std::time::Duration::from_secs(60));
1029
1030 assert!(limit.can_proceed());
1032 assert!(limit.can_proceed());
1033 assert!(limit.can_proceed());
1034
1035 assert!(!limit.can_proceed());
1037
1038 assert_eq!(limit.current_count(), 3);
1040 assert_eq!(limit.remaining(), 0);
1041 }
1042
1043 #[test]
1044 fn test_rate_limit_reset() {
1045 let limit = RateLimit::new(2, std::time::Duration::from_secs(60));
1046
1047 assert!(limit.can_proceed());
1048 assert!(limit.can_proceed());
1049 assert!(!limit.can_proceed());
1050
1051 limit.reset();
1053 assert!(limit.can_proceed());
1054 assert_eq!(limit.current_count(), 1);
1055 }
1056
1057 #[test]
1058 fn test_resource_manager_rate_limit() {
1059 let manager = ResourceManager::new();
1060
1061 assert!(manager.check_rate_limit("api"));
1063
1064 manager.set_rate_limit("api", 2, std::time::Duration::from_secs(60));
1066
1067 assert!(manager.check_rate_limit("api"));
1069 assert!(manager.check_rate_limit("api"));
1070 assert!(!manager.check_rate_limit("api"));
1071
1072 assert_eq!(manager.rate_limit_remaining("api"), Some(0));
1074 }
1075}