Skip to main content

brainwires_agents/
communication.rs

1//! Inter-agent communication hub and message types
2//!
3//! Provides a broadcast-based messaging system for agent coordination,
4//! including status updates, help requests, task results, and conflict notifications.
5
6use anyhow::Result;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::sync::Arc;
11use tokio::sync::{Mutex, RwLock, mpsc};
12
13/// Types of messages agents can send to each other
14#[derive(Debug, Clone, Serialize, Deserialize)]
15#[serde(tag = "type", rename_all = "snake_case")]
16pub enum AgentMessage {
17    /// Request to execute a task
18    TaskRequest {
19        /// Unique task identifier.
20        task_id: String,
21        /// Task description.
22        description: String,
23        /// Task priority (lower = higher priority).
24        priority: u8,
25    },
26    /// Result of task execution
27    TaskResult {
28        /// Unique task identifier.
29        task_id: String,
30        /// Whether the task succeeded.
31        success: bool,
32        /// Result summary.
33        result: String,
34    },
35    /// Status update
36    StatusUpdate {
37        /// Agent reporting the status.
38        agent_id: String,
39        /// Current status string.
40        status: String,
41        /// Optional additional details.
42        details: Option<String>,
43    },
44    /// Request for help/collaboration
45    HelpRequest {
46        /// Unique request identifier.
47        request_id: String,
48        /// Help topic.
49        topic: String,
50        /// Detailed description of what help is needed.
51        details: String,
52    },
53    /// Response to help request
54    HelpResponse {
55        /// ID of the original help request.
56        request_id: String,
57        /// Help response content.
58        response: String,
59    },
60    /// Broadcast message to all agents
61    Broadcast {
62        /// ID of the sending agent.
63        sender: String,
64        /// Broadcast message content.
65        message: String,
66    },
67    /// Custom message with arbitrary data
68    Custom {
69        /// Custom message type identifier.
70        message_type: String,
71        /// Arbitrary JSON data payload.
72        data: serde_json::Value,
73    },
74    /// Notification that an agent was spawned
75    AgentSpawned {
76        /// ID of the spawned agent.
77        agent_id: String,
78        /// ID of the task assigned to the agent.
79        task_id: String,
80    },
81    /// Progress update from an agent
82    AgentProgress {
83        /// ID of the reporting agent.
84        agent_id: String,
85        /// Completion percentage (0-100).
86        progress_percent: u8,
87        /// Progress description.
88        message: String,
89    },
90    /// Notification that an agent completed
91    AgentCompleted {
92        /// ID of the completed agent.
93        agent_id: String,
94        /// ID of the completed task.
95        task_id: String,
96        /// Completion summary.
97        summary: String,
98    },
99    /// Notification about lock contention
100    LockContention {
101        /// ID of the waiting agent.
102        agent_id: String,
103        /// Path being contended.
104        path: String,
105        /// ID of the agent holding the lock.
106        waiting_for: String,
107    },
108    /// Request for approval (dangerous operation)
109    ApprovalRequest {
110        /// Unique request identifier.
111        request_id: String,
112        /// ID of the requesting agent.
113        agent_id: String,
114        /// Operation requiring approval.
115        operation: String,
116        /// Operation details.
117        details: String,
118    },
119    /// Response to approval request
120    ApprovalResponse {
121        /// ID of the original approval request.
122        request_id: String,
123        /// Whether the operation was approved.
124        approved: bool,
125        /// Reason for approval or rejection.
126        reason: Option<String>,
127    },
128
129    // === New messages for agent coordination ===
130    /// Notification that an exclusive operation has started
131    OperationStarted {
132        /// ID of the agent performing the operation.
133        agent_id: String,
134        /// Type of operation.
135        operation_type: OperationType,
136        /// Scope of the operation (e.g., project path).
137        scope: String,
138        /// Estimated duration in milliseconds.
139        estimated_duration_ms: Option<u64>,
140        /// Human-readable description.
141        description: String,
142    },
143    /// Notification that an exclusive operation has completed
144    OperationCompleted {
145        /// ID of the agent that completed the operation.
146        agent_id: String,
147        /// Type of operation.
148        operation_type: OperationType,
149        /// Scope of the operation.
150        scope: String,
151        /// Whether the operation succeeded.
152        success: bool,
153        /// Actual duration in milliseconds.
154        duration_ms: u64,
155        /// Completion summary.
156        summary: String,
157    },
158    /// Notification that a lock has become available
159    LockAvailable {
160        /// Type of operation the lock was for.
161        operation_type: OperationType,
162        /// Scope of the released lock.
163        scope: String,
164        /// ID of the agent that released the lock.
165        released_by: String,
166    },
167    /// Update on wait queue position
168    WaitQueuePosition {
169        /// ID of the waiting agent.
170        agent_id: String,
171        /// Type of operation being waited on.
172        operation_type: OperationType,
173        /// Scope of the operation.
174        scope: String,
175        /// Current position in the wait queue.
176        position: usize,
177        /// Estimated wait time in milliseconds.
178        estimated_wait_ms: Option<u64>,
179    },
180    /// Git operation started
181    GitOperationStarted {
182        /// ID of the agent performing the git operation.
183        agent_id: String,
184        /// Type of git operation.
185        git_op: GitOperationType,
186        /// Target branch (if applicable).
187        branch: Option<String>,
188        /// Human-readable description.
189        description: String,
190    },
191    /// Git operation completed
192    GitOperationCompleted {
193        /// ID of the agent that completed the operation.
194        agent_id: String,
195        /// Type of git operation.
196        git_op: GitOperationType,
197        /// Whether the operation succeeded.
198        success: bool,
199        /// Completion summary.
200        summary: String,
201    },
202    /// Build blocked due to conflicts
203    BuildBlocked {
204        /// ID of the blocked agent.
205        agent_id: String,
206        /// Reason the build is blocked.
207        reason: String,
208        /// List of conflicts causing the block.
209        conflicts: Vec<ConflictInfo>,
210        /// Estimated wait time in milliseconds.
211        estimated_wait_ms: Option<u64>,
212    },
213    /// File write blocked due to conflicts
214    FileWriteBlocked {
215        /// ID of the blocked agent.
216        agent_id: String,
217        /// Path that cannot be written.
218        path: String,
219        /// Reason the write is blocked.
220        reason: String,
221        /// List of conflicts causing the block.
222        conflicts: Vec<ConflictInfo>,
223    },
224    /// Resource conflict resolved - agent can proceed
225    ConflictResolved {
226        /// ID of the agent that can now proceed.
227        agent_id: String,
228        /// Type of operation that was unblocked.
229        operation_type: OperationType,
230        /// Scope of the resolved conflict.
231        scope: String,
232    },
233
234    // === Saga Protocol Messages ===
235    /// A saga (multi-step transaction) has started
236    SagaStarted {
237        /// Unique saga identifier.
238        saga_id: String,
239        /// ID of the agent executing the saga.
240        agent_id: String,
241        /// Saga description.
242        description: String,
243        /// Total number of steps in the saga.
244        total_steps: usize,
245    },
246    /// A saga step has completed
247    SagaStepCompleted {
248        /// Unique saga identifier.
249        saga_id: String,
250        /// ID of the executing agent.
251        agent_id: String,
252        /// Index of the completed step.
253        step_index: usize,
254        /// Name of the completed step.
255        step_name: String,
256        /// Whether the step succeeded.
257        success: bool,
258    },
259    /// A saga has completed (successfully or with compensation)
260    SagaCompleted {
261        /// Unique saga identifier.
262        saga_id: String,
263        /// ID of the executing agent.
264        agent_id: String,
265        /// Whether the saga completed successfully.
266        success: bool,
267        /// Whether compensation was applied.
268        compensated: bool,
269        /// Completion summary.
270        summary: String,
271    },
272    /// A saga is being compensated (rolling back)
273    SagaCompensating {
274        /// Unique saga identifier.
275        saga_id: String,
276        /// ID of the executing agent.
277        agent_id: String,
278        /// Reason for compensation.
279        reason: String,
280        /// Number of steps to compensate.
281        steps_to_compensate: usize,
282    },
283
284    // === Contract-Net Protocol Messages ===
285    /// A task has been announced for bidding
286    TaskAnnounced {
287        /// Unique task identifier.
288        task_id: String,
289        /// ID of the announcing agent.
290        announcer: String,
291        /// Task description.
292        description: String,
293        /// Deadline for bids in milliseconds.
294        bid_deadline_ms: u64,
295    },
296    /// An agent has submitted a bid
297    BidSubmitted {
298        /// Task being bid on.
299        task_id: String,
300        /// ID of the bidding agent.
301        agent_id: String,
302        /// Self-assessed capability score (0.0-1.0).
303        capability_score: f32,
304        /// Current workload (0.0-1.0).
305        current_load: f32,
306    },
307    /// A task has been awarded to an agent
308    TaskAwarded {
309        /// Task that was awarded.
310        task_id: String,
311        /// ID of the winning agent.
312        winner: String,
313        /// ID of the announcing agent.
314        announcer: String,
315    },
316    /// An agent has accepted an awarded task
317    TaskAccepted {
318        /// Task that was accepted.
319        task_id: String,
320        /// ID of the accepting agent.
321        agent_id: String,
322    },
323    /// An agent has declined an awarded task
324    TaskDeclined {
325        /// Task that was declined.
326        task_id: String,
327        /// ID of the declining agent.
328        agent_id: String,
329        /// Reason for declining.
330        reason: String,
331    },
332
333    // === Market Allocation Messages ===
334    /// A resource is available for bidding
335    ResourceAvailable {
336        /// Unique resource identifier.
337        resource_id: String,
338        /// Type of resource.
339        resource_type: String,
340    },
341    /// A resource bid has been submitted
342    ResourceBidSubmitted {
343        /// Resource being bid on.
344        resource_id: String,
345        /// ID of the bidding agent.
346        agent_id: String,
347        /// Bid priority.
348        priority: u8,
349        /// Bid urgency (0.0-1.0).
350        urgency: f32,
351    },
352    /// A resource has been allocated to an agent
353    ResourceAllocated {
354        /// Allocated resource identifier.
355        resource_id: String,
356        /// ID of the agent receiving the resource.
357        agent_id: String,
358        /// Allocation price in credits.
359        price: u32,
360    },
361    /// A resource has been released
362    ResourceReleased {
363        /// Released resource identifier.
364        resource_id: String,
365        /// ID of the releasing agent.
366        agent_id: String,
367    },
368
369    // === Worktree Messages ===
370    /// A worktree has been created for an agent
371    WorktreeCreated {
372        /// ID of the agent that owns the worktree.
373        agent_id: String,
374        /// Filesystem path to the worktree.
375        worktree_path: String,
376        /// Git branch for the worktree.
377        branch: String,
378    },
379    /// A worktree has been removed
380    WorktreeRemoved {
381        /// ID of the agent whose worktree was removed.
382        agent_id: String,
383        /// Path of the removed worktree.
384        worktree_path: String,
385    },
386    /// An agent is switching worktrees
387    WorktreeSwitched {
388        /// ID of the switching agent.
389        agent_id: String,
390        /// Previous worktree path (if any).
391        from_path: Option<String>,
392        /// New worktree path.
393        to_path: String,
394    },
395
396    // === Validation Messages ===
397    /// A validation check has failed
398    ValidationFailed {
399        /// ID of the agent that failed validation.
400        agent_id: String,
401        /// Operation that was being validated.
402        operation: String,
403        /// Name of the failed validation rule.
404        rule_name: String,
405        /// Failure description.
406        message: String,
407    },
408    /// A validation warning was raised
409    ValidationWarning {
410        /// ID of the agent that triggered the warning.
411        agent_id: String,
412        /// Operation that was being validated.
413        operation: String,
414        /// Name of the warning rule.
415        rule_name: String,
416        /// Warning description.
417        message: String,
418    },
419
420    // === Cycle Orchestration Messages ===
421    /// A Plan→Work→Judge cycle has started
422    CycleStarted {
423        /// Current cycle number (0-indexed).
424        cycle_number: u32,
425        /// The high-level goal being pursued.
426        goal: String,
427    },
428    /// A Plan→Work→Judge cycle has completed
429    CycleCompleted {
430        /// Completed cycle number.
431        cycle_number: u32,
432        /// Type of verdict reached (complete, continue, fresh_restart, abort).
433        verdict_type: String,
434    },
435    /// A planner has produced a task plan
436    PlanCreated {
437        /// Cycle number the plan belongs to.
438        cycle_number: u32,
439        /// Number of tasks in the plan.
440        task_count: usize,
441        /// Planner's rationale summary.
442        rationale: String,
443    },
444    /// A worker's branch has been merged
445    WorkerBranchMerged {
446        /// ID of the worker agent.
447        agent_id: String,
448        /// Name of the merged branch.
449        branch: String,
450        /// Merge status description.
451        status: String,
452    },
453
454    // === Optimistic Concurrency Messages ===
455    /// A version conflict was detected
456    VersionConflict {
457        /// Resource with the version conflict.
458        resource_id: String,
459        /// ID of the agent that encountered the conflict.
460        agent_id: String,
461        /// Version the agent expected.
462        expected_version: u64,
463        /// Actual current version.
464        actual_version: u64,
465    },
466    /// A conflict has been resolved
467    ConflictResolutionApplied {
468        /// Resource where the conflict was resolved.
469        resource_id: String,
470        /// Type of resolution applied.
471        resolution_type: String,
472        /// Agent whose changes won (if applicable).
473        winning_agent: Option<String>,
474    },
475}
476
477/// Types of operations that require coordination
478#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
479#[serde(rename_all = "snake_case")]
480pub enum OperationType {
481    /// Build operation (cargo build, npm build, etc.)
482    Build,
483    /// Test operation (cargo test, npm test, etc.)
484    Test,
485    /// Combined build and test
486    BuildTest,
487    /// Git index/staging operations
488    GitIndex,
489    /// Git commit operations
490    GitCommit,
491    /// Git push operations
492    GitPush,
493    /// Git pull operations
494    GitPull,
495    /// Git branch operations
496    GitBranch,
497    /// File write operation
498    FileWrite,
499}
500
501impl std::fmt::Display for OperationType {
502    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
503        match self {
504            OperationType::Build => write!(f, "Build"),
505            OperationType::Test => write!(f, "Test"),
506            OperationType::BuildTest => write!(f, "BuildTest"),
507            OperationType::GitIndex => write!(f, "GitIndex"),
508            OperationType::GitCommit => write!(f, "GitCommit"),
509            OperationType::GitPush => write!(f, "GitPush"),
510            OperationType::GitPull => write!(f, "GitPull"),
511            OperationType::GitBranch => write!(f, "GitBranch"),
512            OperationType::FileWrite => write!(f, "FileWrite"),
513        }
514    }
515}
516
517/// Git-specific operation types for finer-grained control
518#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
519#[serde(rename_all = "snake_case")]
520pub enum GitOperationType {
521    /// Read-only operations (status, diff, log, fetch)
522    ReadOnly,
523    /// Staging operations (stage, unstage)
524    Staging,
525    /// Commit operations
526    Commit,
527    /// Remote write operations (push)
528    RemoteWrite,
529    /// Remote read/merge operations (pull)
530    RemoteMerge,
531    /// Branch operations (create, switch, delete)
532    Branch,
533    /// Destructive operations (discard)
534    Destructive,
535}
536
537impl std::fmt::Display for GitOperationType {
538    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
539        match self {
540            GitOperationType::ReadOnly => write!(f, "ReadOnly"),
541            GitOperationType::Staging => write!(f, "Staging"),
542            GitOperationType::Commit => write!(f, "Commit"),
543            GitOperationType::RemoteWrite => write!(f, "RemoteWrite"),
544            GitOperationType::RemoteMerge => write!(f, "RemoteMerge"),
545            GitOperationType::Branch => write!(f, "Branch"),
546            GitOperationType::Destructive => write!(f, "Destructive"),
547        }
548    }
549}
550
551/// Information about a conflict blocking an operation
552#[derive(Debug, Clone, Serialize, Deserialize)]
553pub struct ConflictInfo {
554    /// Type of conflict
555    pub conflict_type: ConflictType,
556    /// Agent holding the conflicting resource
557    pub holder_agent: String,
558    /// Resource identifier (path or scope)
559    pub resource: String,
560    /// How long the conflict has been active (seconds)
561    pub duration_secs: u64,
562    /// Current status of the blocking operation
563    pub status: String,
564}
565
566/// Types of conflicts that can block operations
567#[derive(Debug, Clone, Serialize, Deserialize)]
568#[serde(tag = "type", rename_all = "snake_case")]
569pub enum ConflictType {
570    /// File write lock blocks build
571    FileWriteBlocksBuild {
572        /// Path of the file causing the block.
573        path: PathBuf,
574    },
575    /// Build in progress blocks file write
576    BuildBlocksFileWrite,
577    /// Test in progress blocks file write
578    TestBlocksFileWrite,
579    /// Git operation blocks file write
580    GitBlocksFileWrite,
581    /// File write blocks git operation
582    FileWriteBlocksGit {
583        /// Path of the file causing the block.
584        path: PathBuf,
585    },
586    /// Build blocks git operation
587    BuildBlocksGit,
588}
589
590/// Envelope containing message metadata
591#[derive(Debug, Clone)]
592pub struct MessageEnvelope {
593    /// Sender agent ID.
594    pub from: String,
595    /// Recipient agent ID.
596    pub to: String,
597    /// The message payload.
598    pub message: AgentMessage,
599    /// When the message was created.
600    pub timestamp: std::time::SystemTime,
601}
602
603impl MessageEnvelope {
604    /// Create a new message envelope with the current timestamp.
605    pub fn new(from: String, to: String, message: AgentMessage) -> Self {
606        Self {
607            from,
608            to,
609            message,
610            timestamp: std::time::SystemTime::now(),
611        }
612    }
613}
614
615/// Agent communication channel
616pub struct AgentChannel {
617    sender: mpsc::UnboundedSender<MessageEnvelope>,
618    receiver: Arc<Mutex<mpsc::UnboundedReceiver<MessageEnvelope>>>,
619}
620
621impl AgentChannel {
622    /// Create a new agent channel
623    pub fn new() -> Self {
624        let (sender, receiver) = mpsc::unbounded_channel();
625        Self {
626            sender,
627            receiver: Arc::new(Mutex::new(receiver)),
628        }
629    }
630
631    /// Send a message on this channel
632    pub fn send(&self, envelope: MessageEnvelope) -> Result<()> {
633        self.sender
634            .send(envelope)
635            .map_err(|e| anyhow::anyhow!("Failed to send message: {}", e))
636    }
637
638    /// Receive a message from this channel (async, blocking)
639    pub async fn receive(&self) -> Option<MessageEnvelope> {
640        self.receiver.lock().await.recv().await
641    }
642
643    /// Try to receive a message without blocking
644    pub async fn try_receive(&self) -> Option<MessageEnvelope> {
645        self.receiver.lock().await.try_recv().ok()
646    }
647}
648
649impl Default for AgentChannel {
650    fn default() -> Self {
651        Self::new()
652    }
653}
654
655/// Communication hub for managing multiple agent channels
656pub struct CommunicationHub {
657    channels: Arc<RwLock<HashMap<String, AgentChannel>>>,
658    _broadcast_channel: AgentChannel,
659}
660
661impl CommunicationHub {
662    /// Create a new communication hub
663    pub fn new() -> Self {
664        Self {
665            channels: Arc::new(RwLock::new(HashMap::new())),
666            _broadcast_channel: AgentChannel::new(),
667        }
668    }
669
670    /// Register an agent with the hub
671    #[tracing::instrument(name = "agent.register", skip(self))]
672    pub async fn register_agent(&self, agent_id: String) -> Result<()> {
673        let mut channels = self.channels.write().await;
674        if channels.contains_key(&agent_id) {
675            anyhow::bail!("Agent {} is already registered", agent_id);
676        }
677        channels.insert(agent_id.clone(), AgentChannel::new());
678        Ok(())
679    }
680
681    /// Unregister an agent from the hub
682    #[tracing::instrument(name = "agent.unregister", skip(self))]
683    pub async fn unregister_agent(&self, agent_id: &str) -> Result<()> {
684        let mut channels = self.channels.write().await;
685        if channels.remove(agent_id).is_none() {
686            anyhow::bail!("Agent {} is not registered", agent_id);
687        }
688        Ok(())
689    }
690
691    /// Send a message from one agent to another
692    #[tracing::instrument(name = "agent.send_message", skip(self, message))]
693    pub async fn send_message(
694        &self,
695        from: String,
696        to: String,
697        message: AgentMessage,
698    ) -> Result<()> {
699        let channels = self.channels.read().await;
700        let channel = channels
701            .get(&to)
702            .ok_or_else(|| anyhow::anyhow!("Agent {} is not registered", to))?;
703
704        let envelope = MessageEnvelope::new(from, to, message);
705        channel.send(envelope)
706    }
707
708    /// Broadcast a message to all agents
709    #[tracing::instrument(name = "agent.broadcast", skip(self, message))]
710    pub async fn broadcast(&self, from: String, message: AgentMessage) -> Result<()> {
711        let channels = self.channels.read().await;
712        for (agent_id, channel) in channels.iter() {
713            let envelope = MessageEnvelope::new(from.clone(), agent_id.clone(), message.clone());
714            channel.send(envelope)?;
715        }
716        Ok(())
717    }
718
719    /// Receive a message for a specific agent
720    pub async fn receive_message(&self, agent_id: &str) -> Option<MessageEnvelope> {
721        let channels = self.channels.read().await;
722        if let Some(channel) = channels.get(agent_id) {
723            channel.receive().await
724        } else {
725            None
726        }
727    }
728
729    /// Try to receive a message without blocking
730    pub async fn try_receive_message(&self, agent_id: &str) -> Option<MessageEnvelope> {
731        let channels = self.channels.read().await;
732        if let Some(channel) = channels.get(agent_id) {
733            channel.try_receive().await
734        } else {
735            None
736        }
737    }
738
739    /// Get the number of registered agents
740    pub async fn agent_count(&self) -> usize {
741        self.channels.read().await.len()
742    }
743
744    /// Get list of registered agent IDs
745    pub async fn list_agents(&self) -> Vec<String> {
746        self.channels.read().await.keys().cloned().collect()
747    }
748
749    /// Check if an agent is registered
750    pub async fn is_registered(&self, agent_id: &str) -> bool {
751        self.channels.read().await.contains_key(agent_id)
752    }
753}
754
755impl Default for CommunicationHub {
756    fn default() -> Self {
757        Self::new()
758    }
759}
760
761#[cfg(test)]
762mod tests {
763    use super::*;
764
765    #[tokio::test]
766    async fn test_agent_channel() {
767        let channel = AgentChannel::new();
768        let envelope = MessageEnvelope::new(
769            "agent-1".to_string(),
770            "agent-2".to_string(),
771            AgentMessage::StatusUpdate {
772                agent_id: "agent-1".to_string(),
773                status: "working".to_string(),
774                details: None,
775            },
776        );
777
778        channel.send(envelope.clone()).unwrap();
779        let received = channel.receive().await;
780        assert!(received.is_some());
781        assert_eq!(received.unwrap().from, "agent-1");
782    }
783
784    #[tokio::test]
785    async fn test_communication_hub_register() {
786        let hub = CommunicationHub::new();
787
788        hub.register_agent("agent-1".to_string()).await.unwrap();
789        assert_eq!(hub.agent_count().await, 1);
790        assert!(hub.is_registered("agent-1").await);
791
792        // Try to register again - should fail
793        let result = hub.register_agent("agent-1".to_string()).await;
794        assert!(result.is_err());
795    }
796
797    #[tokio::test]
798    async fn test_send_receive_message() {
799        let hub = CommunicationHub::new();
800
801        hub.register_agent("agent-1".to_string()).await.unwrap();
802        hub.register_agent("agent-2".to_string()).await.unwrap();
803
804        let message = AgentMessage::TaskRequest {
805            task_id: "task-1".to_string(),
806            description: "Do something".to_string(),
807            priority: 5,
808        };
809
810        hub.send_message("agent-1".to_string(), "agent-2".to_string(), message)
811            .await
812            .unwrap();
813
814        let received = hub.receive_message("agent-2").await;
815        assert!(received.is_some());
816
817        let envelope = received.unwrap();
818        assert_eq!(envelope.from, "agent-1");
819        assert_eq!(envelope.to, "agent-2");
820    }
821
822    #[tokio::test]
823    async fn test_broadcast() {
824        let hub = CommunicationHub::new();
825
826        hub.register_agent("agent-1".to_string()).await.unwrap();
827        hub.register_agent("agent-2".to_string()).await.unwrap();
828        hub.register_agent("agent-3".to_string()).await.unwrap();
829
830        let message = AgentMessage::Broadcast {
831            sender: "orchestrator".to_string(),
832            message: "Hello all!".to_string(),
833        };
834
835        hub.broadcast("orchestrator".to_string(), message)
836            .await
837            .unwrap();
838
839        // All agents should receive the message
840        assert!(hub.try_receive_message("agent-1").await.is_some());
841        assert!(hub.try_receive_message("agent-2").await.is_some());
842        assert!(hub.try_receive_message("agent-3").await.is_some());
843    }
844
845    #[tokio::test]
846    async fn test_unregister() {
847        let hub = CommunicationHub::new();
848
849        hub.register_agent("agent-1".to_string()).await.unwrap();
850        assert_eq!(hub.agent_count().await, 1);
851
852        hub.unregister_agent("agent-1").await.unwrap();
853        assert_eq!(hub.agent_count().await, 0);
854        assert!(!hub.is_registered("agent-1").await);
855    }
856}