1pub mod circuit_breaker;
8
9pub use circuit_breaker::{A2ACircuitBreaker, CircuitState};
10
11use std::collections::HashMap;
12use std::sync::Arc;
13
14use anyhow::Result;
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use tokio::sync::RwLock;
18use uuid::Uuid;
19
20use crate::event_bus::{EventBus, KernelEvent};
21use crate::types::{AgentId, AgentStatus};
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25#[serde(tag = "type", rename_all = "snake_case")]
26pub enum A2AMessage {
27 TaskDelegation {
29 task_id: Uuid,
31 description: String,
33 payload: serde_json::Value,
35 priority: TaskPriority,
37 },
38 StatusUpdate {
40 task_id: Uuid,
42 progress: u8,
44 message: String,
46 },
47 ResultSharing {
49 task_id: Uuid,
51 result: serde_json::Value,
53 summary: String,
55 },
56 CapabilityQuery {
58 query: String,
60 required_capabilities: Vec<String>,
62 },
63 Handshake {
65 agent_id: AgentId,
67 name: String,
69 capabilities: Vec<String>,
71 },
72}
73
74impl A2AMessage {
75 pub fn type_name(&self) -> &'static str {
77 match self {
78 A2AMessage::TaskDelegation { .. } => "task_delegation",
79 A2AMessage::StatusUpdate { .. } => "status_update",
80 A2AMessage::ResultSharing { .. } => "result_sharing",
81 A2AMessage::CapabilityQuery { .. } => "capability_query",
82 A2AMessage::Handshake { .. } => "handshake",
83 }
84 }
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
89pub enum TaskPriority {
90 Low,
92 #[default]
94 Normal,
95 High,
97 Critical,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct TaskSpec {
104 pub task_id: Uuid,
106 pub description: String,
108 pub payload: serde_json::Value,
110 pub priority: TaskPriority,
112 pub deadline: Option<DateTime<Utc>>,
114}
115
116impl TaskSpec {
117 pub fn new(description: impl Into<String>, payload: serde_json::Value) -> Self {
119 Self {
120 task_id: Uuid::new_v4(),
121 description: description.into(),
122 payload,
123 priority: TaskPriority::default(),
124 deadline: None,
125 }
126 }
127
128 pub fn with_priority(mut self, priority: TaskPriority) -> Self {
130 self.priority = priority;
131 self
132 }
133
134 pub fn with_deadline(mut self, deadline: DateTime<Utc>) -> Self {
136 self.deadline = Some(deadline);
137 self
138 }
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct A2ARequest {
144 pub request_id: Uuid,
146 pub from: AgentId,
148 pub to: AgentId,
150 pub message: A2AMessage,
152 pub timestamp: DateTime<Utc>,
154}
155
156impl A2ARequest {
157 pub fn new(from: AgentId, to: AgentId, message: A2AMessage) -> Self {
159 Self {
160 request_id: Uuid::new_v4(),
161 from,
162 to,
163 message,
164 timestamp: Utc::now(),
165 }
166 }
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct A2AResponse {
172 pub response_id: Uuid,
174 pub request_id: Uuid,
176 pub from: AgentId,
178 pub to: AgentId,
180 pub accepted: bool,
182 pub payload: serde_json::Value,
184 pub timestamp: DateTime<Utc>,
186}
187
188impl A2AResponse {
189 pub fn success(
191 request_id: Uuid,
192 from: AgentId,
193 to: AgentId,
194 payload: serde_json::Value,
195 ) -> Self {
196 Self {
197 response_id: Uuid::new_v4(),
198 request_id,
199 from,
200 to,
201 accepted: true,
202 payload,
203 timestamp: Utc::now(),
204 }
205 }
206
207 pub fn error(request_id: Uuid, from: AgentId, to: AgentId, error: impl Into<String>) -> Self {
209 Self {
210 response_id: Uuid::new_v4(),
211 request_id,
212 from,
213 to,
214 accepted: false,
215 payload: serde_json::json!({ "error": error.into() }),
216 timestamp: Utc::now(),
217 }
218 }
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct PendingMessage {
224 pub request: A2ARequest,
226 pub queued_at: DateTime<Utc>,
228}
229
230impl PendingMessage {
231 fn new(request: A2ARequest) -> Self {
232 Self {
233 request,
234 queued_at: Utc::now(),
235 }
236 }
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct AgentCard {
245 pub agent_id: AgentId,
247 pub name: String,
249 pub description: String,
251 pub capabilities: Vec<String>,
253 pub skills: Vec<String>,
255 pub endpoint: String,
257 pub status: AgentStatus,
259}
260
261impl AgentCard {
262 pub fn new(agent_id: AgentId, name: impl Into<String>, description: impl Into<String>) -> Self {
264 Self {
265 agent_id,
266 name: name.into(),
267 description: description.into(),
268 capabilities: Vec::new(),
269 skills: Vec::new(),
270 endpoint: "local".into(),
271 status: AgentStatus::Starting,
272 }
273 }
274
275 pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
277 self.capabilities.push(capability.into());
278 self
279 }
280
281 pub fn with_skill(mut self, skill: impl Into<String>) -> Self {
283 self.skills.push(skill.into());
284 self
285 }
286
287 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
289 self.endpoint = endpoint.into();
290 self
291 }
292
293 pub fn with_status(mut self, status: AgentStatus) -> Self {
295 self.status = status;
296 self
297 }
298
299 pub fn has_capability(&self, capability: &str) -> bool {
301 self.capabilities.iter().any(|c| c == capability)
302 }
303
304 pub fn has_skill(&self, skill: &str) -> bool {
306 self.skills.iter().any(|s| s == skill)
307 }
308}
309
310#[derive(Clone)]
315pub struct AgentCardRegistry {
316 cards: Arc<RwLock<HashMap<AgentId, AgentCard>>>,
318 event_bus: EventBus,
320}
321
322impl AgentCardRegistry {
323 pub fn new(event_bus: EventBus) -> Self {
325 Self {
326 cards: Arc::new(RwLock::new(HashMap::new())),
327 event_bus,
328 }
329 }
330
331 pub async fn register_agent(&self, card: AgentCard) -> Result<()> {
333 let agent_id = card.agent_id;
334 let mut cards = self.cards.write().await;
335 cards.insert(agent_id, card.clone());
336 drop(cards);
337
338 self.event_bus.publish(KernelEvent::AgentCreated {
339 id: agent_id,
340 name: card.name.clone(),
341 })?;
342
343 tracing::info!(agent_id = %agent_id, name = %card.name, "Agent registered in A2A registry");
344 Ok(())
345 }
346
347 pub async fn unregister_agent(&self, agent_id: AgentId) -> Result<()> {
349 let mut cards = self.cards.write().await;
350 if let Some(card) = cards.remove(&agent_id) {
351 tracing::info!(agent_id = %agent_id, name = %card.name, "Agent unregistered from A2A registry");
352 drop(cards);
353
354 self.event_bus.publish(KernelEvent::AgentStopped {
355 id: agent_id,
356 success: false,
357 })?;
358 }
359 Ok(())
360 }
361
362 pub async fn find_agents_by_capability(&self, capability: &str) -> Result<Vec<AgentCard>> {
364 let cards = self.cards.read().await;
365 let matches: Vec<AgentCard> = cards
366 .values()
367 .filter(|card| card.has_capability(capability))
368 .cloned()
369 .collect();
370 Ok(matches)
371 }
372
373 pub async fn find_agents_by_skill(&self, skill: &str) -> Result<Vec<AgentCard>> {
375 let cards = self.cards.read().await;
376 let matches: Vec<AgentCard> = cards
377 .values()
378 .filter(|card| card.has_skill(skill))
379 .cloned()
380 .collect();
381 Ok(matches)
382 }
383
384 pub async fn get_agent(&self, agent_id: AgentId) -> Option<AgentCard> {
386 let cards = self.cards.read().await;
387 cards.get(&agent_id).cloned()
388 }
389
390 pub async fn list_agents(&self) -> Vec<AgentCard> {
392 let cards = self.cards.read().await;
393 cards.values().cloned().collect()
394 }
395
396 pub async fn agent_count(&self) -> usize {
398 let cards = self.cards.read().await;
399 cards.len()
400 }
401
402 pub async fn update_status(&self, agent_id: AgentId, status: AgentStatus) -> Result<()> {
404 let mut cards = self.cards.write().await;
405 if let Some(card) = cards.get_mut(&agent_id) {
406 card.status = status;
407 }
408 Ok(())
409 }
410}
411
412impl std::fmt::Debug for AgentCardRegistry {
413 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
414 f.debug_struct("AgentCardRegistry").finish()
415 }
416}
417
418struct AgentQueue {
423 messages: parking_lot::Mutex<Vec<PendingMessage>>,
425 notify: tokio::sync::Notify,
427}
428
429impl AgentQueue {
430 fn new() -> Self {
431 Self {
432 messages: parking_lot::Mutex::new(Vec::new()),
433 notify: tokio::sync::Notify::new(),
434 }
435 }
436}
437
438pub type DelegationHandler = Arc<
443 dyn Fn(
444 AgentId,
445 AgentId,
446 TaskSpec,
447 )
448 -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<serde_json::Value>> + Send>>
449 + Send
450 + Sync,
451>;
452
453#[derive(Debug, Clone, Serialize, Deserialize)]
459pub struct A2AMessageLogEntry {
460 pub from: AgentId,
462 pub to: AgentId,
464 pub message_type: String,
466 pub timestamp: DateTime<Utc>,
468 pub content: String,
470}
471
472#[derive(Debug, Clone, Serialize, Deserialize)]
476pub struct TopologyNode {
477 pub id: String,
479 pub label: String,
481 pub status: String,
483 pub capabilities: Vec<String>,
485 pub skills: Vec<String>,
487 pub last_seen: Option<String>,
490}
491
492#[derive(Debug, Clone, Serialize, Deserialize)]
498pub struct TopologyEdge {
499 pub from: String,
501 pub to: String,
503 pub message_count_5m: u32,
505 pub last_kind: String,
507}
508
509#[derive(Debug, Clone, Serialize, Deserialize)]
511pub struct TopologyResponse {
512 pub nodes: Vec<TopologyNode>,
514 pub edges: Vec<TopologyEdge>,
516}
517
518#[derive(Clone)]
520pub struct A2AProtocol {
521 registry: AgentCardRegistry,
523 queues: Arc<RwLock<HashMap<AgentId, Arc<AgentQueue>>>>,
525 event_bus: EventBus,
527 delegation_handler: Arc<RwLock<Option<DelegationHandler>>>,
529 message_log: Arc<parking_lot::RwLock<Vec<A2AMessageLogEntry>>>,
531}
532
533impl A2AProtocol {
534 pub const MAX_LOG_ENTRIES: usize = 10_000;
536
537 pub fn new(event_bus: EventBus) -> Self {
539 let registry = AgentCardRegistry::new(event_bus.clone());
540 Self {
541 registry,
542 queues: Arc::new(RwLock::new(HashMap::new())),
543 event_bus,
544 delegation_handler: Arc::new(RwLock::new(None)),
545 message_log: Arc::new(parking_lot::RwLock::new(Vec::with_capacity(256))),
546 }
547 }
548
549 pub async fn set_delegation_handler(&self, handler: DelegationHandler) {
555 let mut h = self.delegation_handler.write().await;
556 *h = Some(handler);
557 }
558
559 fn append_log(&self, entry: A2AMessageLogEntry) {
561 let mut log = self.message_log.write();
562 log.push(entry);
563 if log.len() > Self::MAX_LOG_ENTRIES {
564 let excess = log.len() - Self::MAX_LOG_ENTRIES;
565 log.drain(..excess);
566 }
567 }
568
569 pub fn get_message_log(&self, limit: Option<usize>) -> Vec<A2AMessageLogEntry> {
573 let log = self.message_log.read();
574 match limit {
575 Some(n) => log
576 .iter()
577 .rev()
578 .take(n)
579 .cloned()
580 .collect::<Vec<_>>()
581 .into_iter()
582 .rev()
583 .collect(),
584 None => log.clone(),
585 }
586 }
587
588 pub fn recent_messages(&self, secs: u64) -> Vec<A2AMessageLogEntry> {
594 let now = Utc::now();
595 let cutoff = now - chrono::Duration::seconds(secs as i64);
596 let log = self.message_log.read();
597 log.iter()
598 .filter(|entry| entry.timestamp >= cutoff)
599 .cloned()
600 .collect()
601 }
602
603 async fn get_or_create_queue(&self, agent_id: AgentId) -> Arc<AgentQueue> {
605 let mut queues = self.queues.write().await;
606 queues
607 .entry(agent_id)
608 .or_insert_with(|| Arc::new(AgentQueue::new()))
609 .clone()
610 }
611
612 pub fn registry(&self) -> &AgentCardRegistry {
614 &self.registry
615 }
616
617 pub async fn execute_delegation(
624 &self,
625 from: AgentId,
626 to: AgentId,
627 task: TaskSpec,
628 ) -> Option<Result<serde_json::Value>> {
629 let handler = self.delegation_handler.read().await;
630 let handler_ref = handler.as_ref()?;
631
632 let _ = self.event_bus.publish(KernelEvent::MessageReceived {
634 from,
635 content: format!("[task_delegation] {:?}", task.task_id),
636 });
637
638 self.append_log(A2AMessageLogEntry {
640 from,
641 to,
642 message_type: "task_delegation".to_string(),
643 timestamp: Utc::now(),
644 content: task.description.clone(),
645 });
646
647 tracing::info!(
648 from = %from,
649 to = %to,
650 task_id = %task.task_id,
651 "A2A execute_delegation: starting"
652 );
653
654 let result = handler_ref(from, to, task).await;
655
656 tracing::info!(
657 from = %from,
658 to = %to,
659 success = result.is_ok(),
660 "A2A execute_delegation: completed"
661 );
662
663 Some(result)
664 }
665
666 pub async fn send_message(
668 &self,
669 from: AgentId,
670 to: AgentId,
671 message: A2AMessage,
672 ) -> Result<Uuid> {
673 let msg_type = message.type_name();
674 let request = A2ARequest::new(from, to, message.clone());
675 let request_id = request.request_id;
676
677 let content_summary = match &request.message {
679 A2AMessage::TaskDelegation { description, .. } => description.clone(),
680 A2AMessage::StatusUpdate { message, .. } => message.clone(),
681 A2AMessage::ResultSharing { summary, .. } => summary.clone(),
682 A2AMessage::CapabilityQuery { query, .. } => query.clone(),
683 A2AMessage::Handshake { name, .. } => format!("handshake from {name}"),
684 };
685 self.append_log(A2AMessageLogEntry {
686 from,
687 to,
688 message_type: msg_type.to_string(),
689 timestamp: Utc::now(),
690 content: content_summary,
691 });
692
693 let queue = self.get_or_create_queue(to).await;
695 queue
696 .messages
697 .lock()
698 .push(PendingMessage::new(request.clone()));
699 queue.notify.notify_one();
700
701 if let Err(e) = self.event_bus.publish(KernelEvent::MessageReceived {
707 from,
708 content: format!("[{msg_type}] {request_id:?}"),
709 }) {
710 tracing::warn!(
711 error = %e,
712 from = %from,
713 to = %to,
714 request_id = %request_id,
715 "a2a: failed to publish MessageReceived event (message was still delivered)"
716 );
717 }
718
719 tracing::debug!(
720 from = %from,
721 to = %to,
722 request_id = %request_id,
723 msg_type,
724 "A2A message sent"
725 );
726
727 Ok(request_id)
728 }
729
730 pub async fn delegate_task(&self, from: AgentId, to: AgentId, task: TaskSpec) -> Result<Uuid> {
732 let message = A2AMessage::TaskDelegation {
733 task_id: task.task_id,
734 description: task.description.clone(),
735 payload: task.payload.clone(),
736 priority: task.priority,
737 };
738
739 self.send_message(from, to, message).await
740 }
741
742 pub async fn send_status_update(
744 &self,
745 from: AgentId,
746 to: AgentId,
747 task_id: Uuid,
748 progress: u8,
749 message: String,
750 ) -> Result<Uuid> {
751 let message = A2AMessage::StatusUpdate {
752 task_id,
753 progress,
754 message,
755 };
756
757 self.send_message(from, to, message).await
758 }
759
760 pub async fn share_result(
762 &self,
763 from: AgentId,
764 to: AgentId,
765 task_id: Uuid,
766 result: serde_json::Value,
767 summary: String,
768 ) -> Result<Uuid> {
769 let message = A2AMessage::ResultSharing {
770 task_id,
771 result,
772 summary,
773 };
774
775 self.send_message(from, to, message).await
776 }
777
778 pub async fn query_capabilities(&self, capability: &str) -> Result<Vec<AgentCard>> {
780 self.registry.find_agents_by_capability(capability).await
781 }
782
783 pub async fn send_handshake(&self, from: AgentId, to: AgentId) -> Result<Uuid> {
785 let card = self.registry.get_agent(from).await;
786
787 let (name, capabilities) = if let Some(card) = card {
788 (card.name, card.capabilities.clone())
789 } else {
790 ("unknown".into(), Vec::new())
791 };
792
793 let message = A2AMessage::Handshake {
794 agent_id: from,
795 name,
796 capabilities,
797 };
798
799 self.send_message(from, to, message).await
800 }
801
802 pub async fn receive_messages(&self, agent_id: AgentId) -> Vec<A2ARequest> {
804 let queues = self.queues.read().await;
805 if let Some(queue) = queues.get(&agent_id) {
806 let drained: Vec<PendingMessage> = queue.messages.lock().drain(..).collect();
807 drained.into_iter().map(|m| m.request).collect()
808 } else {
809 Vec::new()
810 }
811 }
812
813 pub async fn pending_count(&self, agent_id: AgentId) -> usize {
815 let queues = self.queues.read().await;
816 queues
817 .get(&agent_id)
818 .map(|q| q.messages.lock().len())
819 .unwrap_or(0)
820 }
821
822 pub async fn has_messages(&self, agent_id: AgentId) -> bool {
824 self.pending_count(agent_id).await > 0
825 }
826
827 pub async fn deliver_pending_messages(&self, agent_id: AgentId) -> Result<Vec<A2ARequest>> {
833 Ok(self.receive_messages(agent_id).await)
834 }
835
836 pub async fn send_and_wait(
844 &self,
845 from: AgentId,
846 to: AgentId,
847 message: A2AMessage,
848 timeout: std::time::Duration,
849 ) -> Result<A2AResponse> {
850 let wait_task_id = match &message {
852 A2AMessage::TaskDelegation { task_id, .. } => Some(*task_id),
853 _ => None,
854 };
855
856 let request_id = self.send_message(from, to, message).await?;
857 let queue = self.get_or_create_queue(from).await;
858 let deadline = tokio::time::Instant::now() + timeout;
859
860 loop {
861 {
863 let mut msgs = queue.messages.lock();
864 let match_idx = msgs.iter().position(|p| {
865 match (&p.request.message, wait_task_id) {
866 (A2AMessage::ResultSharing { task_id, .. }, Some(wait_id)) => {
868 *task_id == wait_id
869 }
870 (A2AMessage::ResultSharing { result, .. }, None) => {
872 result.get("request_id").and_then(|v| v.as_str())
873 == Some(&request_id.to_string())
874 }
875 _ => false,
876 }
877 });
878 if let Some(idx) = match_idx {
879 let matched = msgs.remove(idx);
880 if let A2AMessage::ResultSharing { result, .. } = matched.request.message {
881 return Ok(A2AResponse::success(request_id, to, from, result));
882 }
883 }
884 }
885
886 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
888 if remaining.is_zero() {
889 anyhow::bail!("A2A response timeout after {timeout:?}");
890 }
891
892 tokio::select! {
893 _ = queue.notify.notified() => {
894 }
896 _ = tokio::time::sleep(remaining) => {
897 anyhow::bail!("A2A response timeout after {timeout:?}");
898 }
899 }
900 }
901 }
902}
903
904impl std::fmt::Debug for A2AProtocol {
905 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
906 f.debug_struct("A2AProtocol")
907 .field("registry", &self.registry)
908 .finish()
909 }
910}
911
912#[cfg(test)]
913mod tests {
914 use super::*;
915
916 fn create_test_event_bus() -> EventBus {
917 EventBus::new(256)
918 }
919
920 fn create_test_agent_id() -> AgentId {
921 Uuid::new_v4()
922 }
923
924 #[tokio::test]
925 async fn test_agent_card_creation() {
926 let agent_id = create_test_agent_id();
927 let card = AgentCard::new(agent_id, "test-agent", "A test agent")
928 .with_capability("code-review")
929 .with_capability("lint")
930 .with_skill("rust")
931 .with_endpoint("local");
932
933 assert_eq!(card.agent_id, agent_id);
934 assert_eq!(card.name, "test-agent");
935 assert!(card.has_capability("code-review"));
936 assert!(card.has_capability("lint"));
937 assert!(!card.has_capability("refactor"));
938 assert!(card.has_skill("rust"));
939 assert!(!card.has_skill("python"));
940 }
941
942 #[tokio::test]
943 async fn test_registry_register_unregister() {
944 let bus = create_test_event_bus();
945 let registry = AgentCardRegistry::new(bus);
946
947 let agent_id = create_test_agent_id();
948 let card = AgentCard::new(agent_id, "register-test", "Test agent").with_capability("test");
949
950 registry.register_agent(card.clone()).await.unwrap();
951 assert_eq!(registry.agent_count().await, 1);
952
953 let found = registry.get_agent(agent_id).await;
954 assert!(found.is_some());
955 assert_eq!(found.unwrap().name, "register-test");
956
957 registry.unregister_agent(agent_id).await.unwrap();
958 assert_eq!(registry.agent_count().await, 0);
959
960 let found = registry.get_agent(agent_id).await;
961 assert!(found.is_none());
962 }
963
964 #[tokio::test]
965 async fn test_registry_find_by_capability() {
966 let bus = create_test_event_bus();
967 let registry = AgentCardRegistry::new(bus);
968
969 let id1 = Uuid::new_v4();
970 let id2 = Uuid::new_v4();
971
972 registry
973 .register_agent(
974 AgentCard::new(id1, "agent-1", "First agent").with_capability("code-review"),
975 )
976 .await
977 .unwrap();
978
979 registry
980 .register_agent(
981 AgentCard::new(id2, "agent-2", "Second agent")
982 .with_capability("code-review")
983 .with_capability("refactor"),
984 )
985 .await
986 .unwrap();
987
988 let reviewers = registry
989 .find_agents_by_capability("code-review")
990 .await
991 .unwrap();
992 assert_eq!(reviewers.len(), 2);
993 }
994
995 #[tokio::test]
996 async fn test_a2a_protocol_send_receive() {
997 let bus = create_test_event_bus();
998 let a2a = A2AProtocol::new(bus);
999
1000 let from = create_test_agent_id();
1001 let to = create_test_agent_id();
1002
1003 let message = A2AMessage::Handshake {
1004 agent_id: from,
1005 name: "sender".into(),
1006 capabilities: vec!["test".into()],
1007 };
1008
1009 a2a.send_message(from, to, message).await.unwrap();
1010 assert_eq!(a2a.pending_count(to).await, 1);
1011
1012 let messages = a2a.receive_messages(to).await;
1013 assert_eq!(messages.len(), 1);
1014 assert_eq!(messages[0].from, from);
1015 assert_eq!(messages[0].to, to);
1016 assert_eq!(a2a.pending_count(to).await, 0);
1017 }
1018
1019 #[tokio::test]
1020 async fn test_delegate_task() {
1021 let bus = create_test_event_bus();
1022 let a2a = A2AProtocol::new(bus);
1023
1024 let from = create_test_agent_id();
1025 let to = create_test_agent_id();
1026
1027 let task = TaskSpec::new("Review PR", serde_json::json!({ "pr": 42 }));
1028
1029 let request_id = a2a.delegate_task(from, to, task).await.unwrap();
1030 assert!(request_id != Uuid::nil());
1031
1032 let messages = a2a.receive_messages(to).await;
1033 assert_eq!(messages.len(), 1);
1034 }
1035
1036 #[test]
1037 fn test_recent_messages_filters_by_window() {
1038 let bus = create_test_event_bus();
1039 let a2a = A2AProtocol::new(bus);
1040
1041 let recent_ts = Utc::now();
1043 a2a.append_log(A2AMessageLogEntry {
1044 from: Uuid::new_v4(),
1045 to: Uuid::new_v4(),
1046 message_type: "task_delegation".into(),
1047 timestamp: recent_ts,
1048 content: "recent".into(),
1049 });
1050
1051 let old_ts = Utc::now() - chrono::Duration::seconds(600);
1053 a2a.append_log(A2AMessageLogEntry {
1054 from: Uuid::new_v4(),
1055 to: Uuid::new_v4(),
1056 message_type: "handshake".into(),
1057 timestamp: old_ts,
1058 content: "old".into(),
1059 });
1060
1061 let window = a2a.recent_messages(300);
1063 assert_eq!(window.len(), 1);
1064 assert_eq!(window[0].content, "recent");
1065 assert_eq!(window[0].message_type, "task_delegation");
1066
1067 let wider = a2a.recent_messages(900);
1069 assert_eq!(wider.len(), 2);
1070
1071 let narrow = a2a.recent_messages(1);
1073 assert_eq!(narrow.len(), 1);
1074 assert_eq!(narrow[0].content, "recent");
1075 }
1076
1077 #[tokio::test]
1078 async fn test_recent_messages_aggregates_fan_in_fan_out() {
1079 let bus = create_test_event_bus();
1081 let a2a = A2AProtocol::new(bus);
1082
1083 let orch = Uuid::new_v4();
1085 let worker_a = Uuid::new_v4();
1086 let worker_b = Uuid::new_v4();
1087 for (id, name) in [
1088 (orch, "orchestrator"),
1089 (worker_a, "worker-a"),
1090 (worker_b, "worker-b"),
1091 ] {
1092 a2a.registry
1093 .register_agent(AgentCard::new(id, name, "test").with_status(AgentStatus::Running))
1094 .await
1095 .unwrap();
1096 }
1097
1098 for _ in 0..2 {
1100 a2a.append_log(A2AMessageLogEntry {
1101 from: orch,
1102 to: worker_a,
1103 message_type: "task_delegation".into(),
1104 timestamp: Utc::now(),
1105 content: "do work".into(),
1106 });
1107 }
1108
1109 a2a.append_log(A2AMessageLogEntry {
1111 from: orch,
1112 to: worker_b,
1113 message_type: "task_delegation".into(),
1114 timestamp: Utc::now(),
1115 content: "do work b".into(),
1116 });
1117 a2a.append_log(A2AMessageLogEntry {
1118 from: worker_b,
1119 to: orch,
1120 message_type: "status_update".into(),
1121 timestamp: Utc::now(),
1122 content: "50%".into(),
1123 });
1124
1125 a2a.append_log(A2AMessageLogEntry {
1127 from: worker_a,
1128 to: orch,
1129 message_type: "result_sharing".into(),
1130 timestamp: Utc::now(),
1131 content: "done".into(),
1132 });
1133
1134 let entries = a2a.recent_messages(300);
1137 let mut aggregates: HashMap<(AgentId, AgentId), (u32, String)> = HashMap::new();
1138 for entry in &entries {
1139 let agg = aggregates
1140 .entry((entry.from, entry.to))
1141 .or_insert((0, String::new()));
1142 agg.0 = agg.0.saturating_add(1);
1143 agg.1 = entry.message_type.clone();
1144 }
1145
1146 let e1 = aggregates.get(&(orch, worker_a)).expect("edge 1 missing");
1148 assert_eq!(e1.0, 2, "orch->worker_a count");
1149 assert_eq!(e1.1, "task_delegation", "orch->worker_a last_kind");
1150
1151 let e2 = aggregates.get(&(orch, worker_b)).expect("edge 2 missing");
1153 assert_eq!(e2.0, 1, "orch->worker_b count");
1154 assert_eq!(e2.1, "task_delegation", "orch->worker_b last_kind");
1155
1156 let e3 = aggregates.get(&(worker_b, orch)).expect("edge 3 missing");
1158 assert_eq!(e3.0, 1, "worker_b->orch count");
1159 assert_eq!(e3.1, "status_update", "worker_b->orch last_kind");
1160
1161 let e4 = aggregates.get(&(worker_a, orch)).expect("edge 4 missing");
1163 assert_eq!(e4.0, 1, "worker_a->orch count");
1164 assert_eq!(e4.1, "result_sharing", "worker_a->orch last_kind");
1165
1166 assert_eq!(aggregates.len(), 4);
1168 }
1169}