brainwires_agents/communication.rs
1//! Inter-agent communication hub and message types
2//!
3//! Provides a broadcast-based messaging system for agent coordination,
4//! including status updates, help requests, task results, and conflict notifications.
5
6use anyhow::Result;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::sync::Arc;
11use tokio::sync::{Mutex, RwLock, mpsc};
12
13/// Types of messages agents can send to each other
14#[derive(Debug, Clone, Serialize, Deserialize)]
15#[serde(tag = "type", rename_all = "snake_case")]
16pub enum AgentMessage {
17 /// Request to execute a task
18 TaskRequest {
19 /// Unique task identifier.
20 task_id: String,
21 /// Task description.
22 description: String,
23 /// Task priority (lower = higher priority).
24 priority: u8,
25 },
26 /// Result of task execution
27 TaskResult {
28 /// Unique task identifier.
29 task_id: String,
30 /// Whether the task succeeded.
31 success: bool,
32 /// Result summary.
33 result: String,
34 },
35 /// Status update
36 StatusUpdate {
37 /// Agent reporting the status.
38 agent_id: String,
39 /// Current status string.
40 status: String,
41 /// Optional additional details.
42 details: Option<String>,
43 },
44 /// Request for help/collaboration
45 HelpRequest {
46 /// Unique request identifier.
47 request_id: String,
48 /// Help topic.
49 topic: String,
50 /// Detailed description of what help is needed.
51 details: String,
52 },
53 /// Response to help request
54 HelpResponse {
55 /// ID of the original help request.
56 request_id: String,
57 /// Help response content.
58 response: String,
59 },
60 /// Broadcast message to all agents
61 Broadcast {
62 /// ID of the sending agent.
63 sender: String,
64 /// Broadcast message content.
65 message: String,
66 },
67 /// Custom message with arbitrary data
68 Custom {
69 /// Custom message type identifier.
70 message_type: String,
71 /// Arbitrary JSON data payload.
72 data: serde_json::Value,
73 },
74 /// Notification that an agent was spawned
75 AgentSpawned {
76 /// ID of the spawned agent.
77 agent_id: String,
78 /// ID of the task assigned to the agent.
79 task_id: String,
80 },
81 /// Progress update from an agent
82 AgentProgress {
83 /// ID of the reporting agent.
84 agent_id: String,
85 /// Completion percentage (0-100).
86 progress_percent: u8,
87 /// Progress description.
88 message: String,
89 },
90 /// Notification that an agent completed
91 AgentCompleted {
92 /// ID of the completed agent.
93 agent_id: String,
94 /// ID of the completed task.
95 task_id: String,
96 /// Completion summary.
97 summary: String,
98 },
99 /// Notification about lock contention
100 LockContention {
101 /// ID of the waiting agent.
102 agent_id: String,
103 /// Path being contended.
104 path: String,
105 /// ID of the agent holding the lock.
106 waiting_for: String,
107 },
108 /// Request for approval (dangerous operation)
109 ApprovalRequest {
110 /// Unique request identifier.
111 request_id: String,
112 /// ID of the requesting agent.
113 agent_id: String,
114 /// Operation requiring approval.
115 operation: String,
116 /// Operation details.
117 details: String,
118 },
119 /// Response to approval request
120 ApprovalResponse {
121 /// ID of the original approval request.
122 request_id: String,
123 /// Whether the operation was approved.
124 approved: bool,
125 /// Reason for approval or rejection.
126 reason: Option<String>,
127 },
128
129 // === New messages for agent coordination ===
130 /// Notification that an exclusive operation has started
131 OperationStarted {
132 /// ID of the agent performing the operation.
133 agent_id: String,
134 /// Type of operation.
135 operation_type: OperationType,
136 /// Scope of the operation (e.g., project path).
137 scope: String,
138 /// Estimated duration in milliseconds.
139 estimated_duration_ms: Option<u64>,
140 /// Human-readable description.
141 description: String,
142 },
143 /// Notification that an exclusive operation has completed
144 OperationCompleted {
145 /// ID of the agent that completed the operation.
146 agent_id: String,
147 /// Type of operation.
148 operation_type: OperationType,
149 /// Scope of the operation.
150 scope: String,
151 /// Whether the operation succeeded.
152 success: bool,
153 /// Actual duration in milliseconds.
154 duration_ms: u64,
155 /// Completion summary.
156 summary: String,
157 },
158 /// Notification that a lock has become available
159 LockAvailable {
160 /// Type of operation the lock was for.
161 operation_type: OperationType,
162 /// Scope of the released lock.
163 scope: String,
164 /// ID of the agent that released the lock.
165 released_by: String,
166 },
167 /// Update on wait queue position
168 WaitQueuePosition {
169 /// ID of the waiting agent.
170 agent_id: String,
171 /// Type of operation being waited on.
172 operation_type: OperationType,
173 /// Scope of the operation.
174 scope: String,
175 /// Current position in the wait queue.
176 position: usize,
177 /// Estimated wait time in milliseconds.
178 estimated_wait_ms: Option<u64>,
179 },
180 /// Git operation started
181 GitOperationStarted {
182 /// ID of the agent performing the git operation.
183 agent_id: String,
184 /// Type of git operation.
185 git_op: GitOperationType,
186 /// Target branch (if applicable).
187 branch: Option<String>,
188 /// Human-readable description.
189 description: String,
190 },
191 /// Git operation completed
192 GitOperationCompleted {
193 /// ID of the agent that completed the operation.
194 agent_id: String,
195 /// Type of git operation.
196 git_op: GitOperationType,
197 /// Whether the operation succeeded.
198 success: bool,
199 /// Completion summary.
200 summary: String,
201 },
202 /// Build blocked due to conflicts
203 BuildBlocked {
204 /// ID of the blocked agent.
205 agent_id: String,
206 /// Reason the build is blocked.
207 reason: String,
208 /// List of conflicts causing the block.
209 conflicts: Vec<ConflictInfo>,
210 /// Estimated wait time in milliseconds.
211 estimated_wait_ms: Option<u64>,
212 },
213 /// File write blocked due to conflicts
214 FileWriteBlocked {
215 /// ID of the blocked agent.
216 agent_id: String,
217 /// Path that cannot be written.
218 path: String,
219 /// Reason the write is blocked.
220 reason: String,
221 /// List of conflicts causing the block.
222 conflicts: Vec<ConflictInfo>,
223 },
224 /// Resource conflict resolved - agent can proceed
225 ConflictResolved {
226 /// ID of the agent that can now proceed.
227 agent_id: String,
228 /// Type of operation that was unblocked.
229 operation_type: OperationType,
230 /// Scope of the resolved conflict.
231 scope: String,
232 },
233
234 // === Saga Protocol Messages ===
235 /// A saga (multi-step transaction) has started
236 SagaStarted {
237 /// Unique saga identifier.
238 saga_id: String,
239 /// ID of the agent executing the saga.
240 agent_id: String,
241 /// Saga description.
242 description: String,
243 /// Total number of steps in the saga.
244 total_steps: usize,
245 },
246 /// A saga step has completed
247 SagaStepCompleted {
248 /// Unique saga identifier.
249 saga_id: String,
250 /// ID of the executing agent.
251 agent_id: String,
252 /// Index of the completed step.
253 step_index: usize,
254 /// Name of the completed step.
255 step_name: String,
256 /// Whether the step succeeded.
257 success: bool,
258 },
259 /// A saga has completed (successfully or with compensation)
260 SagaCompleted {
261 /// Unique saga identifier.
262 saga_id: String,
263 /// ID of the executing agent.
264 agent_id: String,
265 /// Whether the saga completed successfully.
266 success: bool,
267 /// Whether compensation was applied.
268 compensated: bool,
269 /// Completion summary.
270 summary: String,
271 },
272 /// A saga is being compensated (rolling back)
273 SagaCompensating {
274 /// Unique saga identifier.
275 saga_id: String,
276 /// ID of the executing agent.
277 agent_id: String,
278 /// Reason for compensation.
279 reason: String,
280 /// Number of steps to compensate.
281 steps_to_compensate: usize,
282 },
283
284 // === Contract-Net Protocol Messages ===
285 /// A task has been announced for bidding
286 TaskAnnounced {
287 /// Unique task identifier.
288 task_id: String,
289 /// ID of the announcing agent.
290 announcer: String,
291 /// Task description.
292 description: String,
293 /// Deadline for bids in milliseconds.
294 bid_deadline_ms: u64,
295 },
296 /// An agent has submitted a bid
297 BidSubmitted {
298 /// Task being bid on.
299 task_id: String,
300 /// ID of the bidding agent.
301 agent_id: String,
302 /// Self-assessed capability score (0.0-1.0).
303 capability_score: f32,
304 /// Current workload (0.0-1.0).
305 current_load: f32,
306 },
307 /// A task has been awarded to an agent
308 TaskAwarded {
309 /// Task that was awarded.
310 task_id: String,
311 /// ID of the winning agent.
312 winner: String,
313 /// ID of the announcing agent.
314 announcer: String,
315 },
316 /// An agent has accepted an awarded task
317 TaskAccepted {
318 /// Task that was accepted.
319 task_id: String,
320 /// ID of the accepting agent.
321 agent_id: String,
322 },
323 /// An agent has declined an awarded task
324 TaskDeclined {
325 /// Task that was declined.
326 task_id: String,
327 /// ID of the declining agent.
328 agent_id: String,
329 /// Reason for declining.
330 reason: String,
331 },
332
333 // === Market Allocation Messages ===
334 /// A resource is available for bidding
335 ResourceAvailable {
336 /// Unique resource identifier.
337 resource_id: String,
338 /// Type of resource.
339 resource_type: String,
340 },
341 /// A resource bid has been submitted
342 ResourceBidSubmitted {
343 /// Resource being bid on.
344 resource_id: String,
345 /// ID of the bidding agent.
346 agent_id: String,
347 /// Bid priority.
348 priority: u8,
349 /// Bid urgency (0.0-1.0).
350 urgency: f32,
351 },
352 /// A resource has been allocated to an agent
353 ResourceAllocated {
354 /// Allocated resource identifier.
355 resource_id: String,
356 /// ID of the agent receiving the resource.
357 agent_id: String,
358 /// Allocation price in credits.
359 price: u32,
360 },
361 /// A resource has been released
362 ResourceReleased {
363 /// Released resource identifier.
364 resource_id: String,
365 /// ID of the releasing agent.
366 agent_id: String,
367 },
368
369 // === Worktree Messages ===
370 /// A worktree has been created for an agent
371 WorktreeCreated {
372 /// ID of the agent that owns the worktree.
373 agent_id: String,
374 /// Filesystem path to the worktree.
375 worktree_path: String,
376 /// Git branch for the worktree.
377 branch: String,
378 },
379 /// A worktree has been removed
380 WorktreeRemoved {
381 /// ID of the agent whose worktree was removed.
382 agent_id: String,
383 /// Path of the removed worktree.
384 worktree_path: String,
385 },
386 /// An agent is switching worktrees
387 WorktreeSwitched {
388 /// ID of the switching agent.
389 agent_id: String,
390 /// Previous worktree path (if any).
391 from_path: Option<String>,
392 /// New worktree path.
393 to_path: String,
394 },
395
396 // === Validation Messages ===
397 /// A validation check has failed
398 ValidationFailed {
399 /// ID of the agent that failed validation.
400 agent_id: String,
401 /// Operation that was being validated.
402 operation: String,
403 /// Name of the failed validation rule.
404 rule_name: String,
405 /// Failure description.
406 message: String,
407 },
408 /// A validation warning was raised
409 ValidationWarning {
410 /// ID of the agent that triggered the warning.
411 agent_id: String,
412 /// Operation that was being validated.
413 operation: String,
414 /// Name of the warning rule.
415 rule_name: String,
416 /// Warning description.
417 message: String,
418 },
419
420 // === Cycle Orchestration Messages ===
421 /// A Plan→Work→Judge cycle has started
422 CycleStarted {
423 /// Current cycle number (0-indexed).
424 cycle_number: u32,
425 /// The high-level goal being pursued.
426 goal: String,
427 },
428 /// A Plan→Work→Judge cycle has completed
429 CycleCompleted {
430 /// Completed cycle number.
431 cycle_number: u32,
432 /// Type of verdict reached (complete, continue, fresh_restart, abort).
433 verdict_type: String,
434 },
435 /// A planner has produced a task plan
436 PlanCreated {
437 /// Cycle number the plan belongs to.
438 cycle_number: u32,
439 /// Number of tasks in the plan.
440 task_count: usize,
441 /// Planner's rationale summary.
442 rationale: String,
443 },
444 /// A worker's branch has been merged
445 WorkerBranchMerged {
446 /// ID of the worker agent.
447 agent_id: String,
448 /// Name of the merged branch.
449 branch: String,
450 /// Merge status description.
451 status: String,
452 },
453
454 // === Optimistic Concurrency Messages ===
455 /// A version conflict was detected
456 VersionConflict {
457 /// Resource with the version conflict.
458 resource_id: String,
459 /// ID of the agent that encountered the conflict.
460 agent_id: String,
461 /// Version the agent expected.
462 expected_version: u64,
463 /// Actual current version.
464 actual_version: u64,
465 },
466 /// A conflict has been resolved
467 ConflictResolutionApplied {
468 /// Resource where the conflict was resolved.
469 resource_id: String,
470 /// Type of resolution applied.
471 resolution_type: String,
472 /// Agent whose changes won (if applicable).
473 winning_agent: Option<String>,
474 },
475}
476
477/// Types of operations that require coordination
478#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
479#[serde(rename_all = "snake_case")]
480pub enum OperationType {
481 /// Build operation (cargo build, npm build, etc.)
482 Build,
483 /// Test operation (cargo test, npm test, etc.)
484 Test,
485 /// Combined build and test
486 BuildTest,
487 /// Git index/staging operations
488 GitIndex,
489 /// Git commit operations
490 GitCommit,
491 /// Git push operations
492 GitPush,
493 /// Git pull operations
494 GitPull,
495 /// Git branch operations
496 GitBranch,
497 /// File write operation
498 FileWrite,
499}
500
501impl std::fmt::Display for OperationType {
502 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
503 match self {
504 OperationType::Build => write!(f, "Build"),
505 OperationType::Test => write!(f, "Test"),
506 OperationType::BuildTest => write!(f, "BuildTest"),
507 OperationType::GitIndex => write!(f, "GitIndex"),
508 OperationType::GitCommit => write!(f, "GitCommit"),
509 OperationType::GitPush => write!(f, "GitPush"),
510 OperationType::GitPull => write!(f, "GitPull"),
511 OperationType::GitBranch => write!(f, "GitBranch"),
512 OperationType::FileWrite => write!(f, "FileWrite"),
513 }
514 }
515}
516
517/// Git-specific operation types for finer-grained control
518#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
519#[serde(rename_all = "snake_case")]
520pub enum GitOperationType {
521 /// Read-only operations (status, diff, log, fetch)
522 ReadOnly,
523 /// Staging operations (stage, unstage)
524 Staging,
525 /// Commit operations
526 Commit,
527 /// Remote write operations (push)
528 RemoteWrite,
529 /// Remote read/merge operations (pull)
530 RemoteMerge,
531 /// Branch operations (create, switch, delete)
532 Branch,
533 /// Destructive operations (discard)
534 Destructive,
535}
536
537impl std::fmt::Display for GitOperationType {
538 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
539 match self {
540 GitOperationType::ReadOnly => write!(f, "ReadOnly"),
541 GitOperationType::Staging => write!(f, "Staging"),
542 GitOperationType::Commit => write!(f, "Commit"),
543 GitOperationType::RemoteWrite => write!(f, "RemoteWrite"),
544 GitOperationType::RemoteMerge => write!(f, "RemoteMerge"),
545 GitOperationType::Branch => write!(f, "Branch"),
546 GitOperationType::Destructive => write!(f, "Destructive"),
547 }
548 }
549}
550
551/// Information about a conflict blocking an operation
552#[derive(Debug, Clone, Serialize, Deserialize)]
553pub struct ConflictInfo {
554 /// Type of conflict
555 pub conflict_type: ConflictType,
556 /// Agent holding the conflicting resource
557 pub holder_agent: String,
558 /// Resource identifier (path or scope)
559 pub resource: String,
560 /// How long the conflict has been active (seconds)
561 pub duration_secs: u64,
562 /// Current status of the blocking operation
563 pub status: String,
564}
565
566/// Types of conflicts that can block operations
567#[derive(Debug, Clone, Serialize, Deserialize)]
568#[serde(tag = "type", rename_all = "snake_case")]
569pub enum ConflictType {
570 /// File write lock blocks build
571 FileWriteBlocksBuild {
572 /// Path of the file causing the block.
573 path: PathBuf,
574 },
575 /// Build in progress blocks file write
576 BuildBlocksFileWrite,
577 /// Test in progress blocks file write
578 TestBlocksFileWrite,
579 /// Git operation blocks file write
580 GitBlocksFileWrite,
581 /// File write blocks git operation
582 FileWriteBlocksGit {
583 /// Path of the file causing the block.
584 path: PathBuf,
585 },
586 /// Build blocks git operation
587 BuildBlocksGit,
588}
589
590/// Envelope containing message metadata
591#[derive(Debug, Clone)]
592pub struct MessageEnvelope {
593 /// Sender agent ID.
594 pub from: String,
595 /// Recipient agent ID.
596 pub to: String,
597 /// The message payload.
598 pub message: AgentMessage,
599 /// When the message was created.
600 pub timestamp: std::time::SystemTime,
601}
602
603impl MessageEnvelope {
604 /// Create a new message envelope with the current timestamp.
605 pub fn new(from: String, to: String, message: AgentMessage) -> Self {
606 Self {
607 from,
608 to,
609 message,
610 timestamp: std::time::SystemTime::now(),
611 }
612 }
613}
614
615/// Agent communication channel
616pub struct AgentChannel {
617 sender: mpsc::UnboundedSender<MessageEnvelope>,
618 receiver: Arc<Mutex<mpsc::UnboundedReceiver<MessageEnvelope>>>,
619}
620
621impl AgentChannel {
622 /// Create a new agent channel
623 pub fn new() -> Self {
624 let (sender, receiver) = mpsc::unbounded_channel();
625 Self {
626 sender,
627 receiver: Arc::new(Mutex::new(receiver)),
628 }
629 }
630
631 /// Send a message on this channel
632 pub fn send(&self, envelope: MessageEnvelope) -> Result<()> {
633 self.sender
634 .send(envelope)
635 .map_err(|e| anyhow::anyhow!("Failed to send message: {}", e))
636 }
637
638 /// Receive a message from this channel (async, blocking)
639 pub async fn receive(&self) -> Option<MessageEnvelope> {
640 self.receiver.lock().await.recv().await
641 }
642
643 /// Try to receive a message without blocking
644 pub async fn try_receive(&self) -> Option<MessageEnvelope> {
645 self.receiver.lock().await.try_recv().ok()
646 }
647}
648
649impl Default for AgentChannel {
650 fn default() -> Self {
651 Self::new()
652 }
653}
654
655/// Communication hub for managing multiple agent channels
656pub struct CommunicationHub {
657 channels: Arc<RwLock<HashMap<String, AgentChannel>>>,
658 _broadcast_channel: AgentChannel,
659}
660
661impl CommunicationHub {
662 /// Create a new communication hub
663 pub fn new() -> Self {
664 Self {
665 channels: Arc::new(RwLock::new(HashMap::new())),
666 _broadcast_channel: AgentChannel::new(),
667 }
668 }
669
670 /// Register an agent with the hub
671 #[tracing::instrument(name = "agent.register", skip(self))]
672 pub async fn register_agent(&self, agent_id: String) -> Result<()> {
673 let mut channels = self.channels.write().await;
674 if channels.contains_key(&agent_id) {
675 anyhow::bail!("Agent {} is already registered", agent_id);
676 }
677 channels.insert(agent_id.clone(), AgentChannel::new());
678 Ok(())
679 }
680
681 /// Unregister an agent from the hub
682 #[tracing::instrument(name = "agent.unregister", skip(self))]
683 pub async fn unregister_agent(&self, agent_id: &str) -> Result<()> {
684 let mut channels = self.channels.write().await;
685 if channels.remove(agent_id).is_none() {
686 anyhow::bail!("Agent {} is not registered", agent_id);
687 }
688 Ok(())
689 }
690
691 /// Send a message from one agent to another
692 #[tracing::instrument(name = "agent.send_message", skip(self, message))]
693 pub async fn send_message(
694 &self,
695 from: String,
696 to: String,
697 message: AgentMessage,
698 ) -> Result<()> {
699 let channels = self.channels.read().await;
700 let channel = channels
701 .get(&to)
702 .ok_or_else(|| anyhow::anyhow!("Agent {} is not registered", to))?;
703
704 let envelope = MessageEnvelope::new(from, to, message);
705 channel.send(envelope)
706 }
707
708 /// Broadcast a message to all agents
709 #[tracing::instrument(name = "agent.broadcast", skip(self, message))]
710 pub async fn broadcast(&self, from: String, message: AgentMessage) -> Result<()> {
711 let channels = self.channels.read().await;
712 for (agent_id, channel) in channels.iter() {
713 let envelope = MessageEnvelope::new(from.clone(), agent_id.clone(), message.clone());
714 channel.send(envelope)?;
715 }
716 Ok(())
717 }
718
719 /// Receive a message for a specific agent
720 pub async fn receive_message(&self, agent_id: &str) -> Option<MessageEnvelope> {
721 let channels = self.channels.read().await;
722 if let Some(channel) = channels.get(agent_id) {
723 channel.receive().await
724 } else {
725 None
726 }
727 }
728
729 /// Try to receive a message without blocking
730 pub async fn try_receive_message(&self, agent_id: &str) -> Option<MessageEnvelope> {
731 let channels = self.channels.read().await;
732 if let Some(channel) = channels.get(agent_id) {
733 channel.try_receive().await
734 } else {
735 None
736 }
737 }
738
739 /// Get the number of registered agents
740 pub async fn agent_count(&self) -> usize {
741 self.channels.read().await.len()
742 }
743
744 /// Get list of registered agent IDs
745 pub async fn list_agents(&self) -> Vec<String> {
746 self.channels.read().await.keys().cloned().collect()
747 }
748
749 /// Check if an agent is registered
750 pub async fn is_registered(&self, agent_id: &str) -> bool {
751 self.channels.read().await.contains_key(agent_id)
752 }
753}
754
755impl Default for CommunicationHub {
756 fn default() -> Self {
757 Self::new()
758 }
759}
760
761#[cfg(test)]
762mod tests {
763 use super::*;
764
765 #[tokio::test]
766 async fn test_agent_channel() {
767 let channel = AgentChannel::new();
768 let envelope = MessageEnvelope::new(
769 "agent-1".to_string(),
770 "agent-2".to_string(),
771 AgentMessage::StatusUpdate {
772 agent_id: "agent-1".to_string(),
773 status: "working".to_string(),
774 details: None,
775 },
776 );
777
778 channel.send(envelope.clone()).unwrap();
779 let received = channel.receive().await;
780 assert!(received.is_some());
781 assert_eq!(received.unwrap().from, "agent-1");
782 }
783
784 #[tokio::test]
785 async fn test_communication_hub_register() {
786 let hub = CommunicationHub::new();
787
788 hub.register_agent("agent-1".to_string()).await.unwrap();
789 assert_eq!(hub.agent_count().await, 1);
790 assert!(hub.is_registered("agent-1").await);
791
792 // Try to register again - should fail
793 let result = hub.register_agent("agent-1".to_string()).await;
794 assert!(result.is_err());
795 }
796
797 #[tokio::test]
798 async fn test_send_receive_message() {
799 let hub = CommunicationHub::new();
800
801 hub.register_agent("agent-1".to_string()).await.unwrap();
802 hub.register_agent("agent-2".to_string()).await.unwrap();
803
804 let message = AgentMessage::TaskRequest {
805 task_id: "task-1".to_string(),
806 description: "Do something".to_string(),
807 priority: 5,
808 };
809
810 hub.send_message("agent-1".to_string(), "agent-2".to_string(), message)
811 .await
812 .unwrap();
813
814 let received = hub.receive_message("agent-2").await;
815 assert!(received.is_some());
816
817 let envelope = received.unwrap();
818 assert_eq!(envelope.from, "agent-1");
819 assert_eq!(envelope.to, "agent-2");
820 }
821
822 #[tokio::test]
823 async fn test_broadcast() {
824 let hub = CommunicationHub::new();
825
826 hub.register_agent("agent-1".to_string()).await.unwrap();
827 hub.register_agent("agent-2".to_string()).await.unwrap();
828 hub.register_agent("agent-3".to_string()).await.unwrap();
829
830 let message = AgentMessage::Broadcast {
831 sender: "orchestrator".to_string(),
832 message: "Hello all!".to_string(),
833 };
834
835 hub.broadcast("orchestrator".to_string(), message)
836 .await
837 .unwrap();
838
839 // All agents should receive the message
840 assert!(hub.try_receive_message("agent-1").await.is_some());
841 assert!(hub.try_receive_message("agent-2").await.is_some());
842 assert!(hub.try_receive_message("agent-3").await.is_some());
843 }
844
845 #[tokio::test]
846 async fn test_unregister() {
847 let hub = CommunicationHub::new();
848
849 hub.register_agent("agent-1".to_string()).await.unwrap();
850 assert_eq!(hub.agent_count().await, 1);
851
852 hub.unregister_agent("agent-1").await.unwrap();
853 assert_eq!(hub.agent_count().await, 0);
854 assert!(!hub.is_registered("agent-1").await);
855 }
856}