Skip to main content

codetether_agent/tool/
relay_autochat.rs

1//! Relay AutoChat Tool - Autonomous relay communication between agents
2//!
3//! Enables task delegation and result aggregation between agents using
4//! the protocol-first relay runtime. This tool allows LLMs to trigger
5//! agent handoffs and coordinate multi-agent workflows.
6
7use super::{Tool, ToolResult};
8use crate::bus::AgentBus;
9use crate::bus::relay::ProtocolRelayRuntime;
10use anyhow::Result;
11use async_trait::async_trait;
12use parking_lot::RwLock;
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::OnceCell;
17use uuid::Uuid;
18
19#[derive(Clone)]
20struct RelayRuntimeState {
21    runtime: Arc<ProtocolRelayRuntime>,
22    participants: Vec<String>,
23}
24
25impl RelayRuntimeState {
26    fn new(runtime: Arc<ProtocolRelayRuntime>) -> Self {
27        Self {
28            runtime,
29            participants: Vec::new(),
30        }
31    }
32
33    fn remember_participant(&mut self, participant: &str) {
34        if !self.participants.iter().any(|name| name == participant) {
35            self.participants.push(participant.to_string());
36        }
37    }
38}
39
40lazy_static::lazy_static! {
41    static ref RELAY_STORE: RwLock<HashMap<String, RelayRuntimeState>> = RwLock::new(HashMap::new());
42    static ref AGENT_BUS: OnceCell<Arc<AgentBus>> = OnceCell::const_new();
43}
44
45async fn get_agent_bus() -> Result<Arc<AgentBus>> {
46    let bus = AGENT_BUS
47        .get_or_try_init(|| async {
48            let bus = AgentBus::new().into_arc();
49            Ok::<_, anyhow::Error>(bus)
50        })
51        .await?;
52    Ok(bus.clone())
53}
54
55pub struct RelayAutoChatTool;
56
57impl Default for RelayAutoChatTool {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl RelayAutoChatTool {
64    pub fn new() -> Self {
65        Self
66    }
67
68    async fn get_or_create_runtime(&self, relay_id: &str) -> Result<Arc<ProtocolRelayRuntime>> {
69        if let Some(runtime) = {
70            let store = RELAY_STORE.read();
71            store.get(relay_id).map(|state| Arc::clone(&state.runtime))
72        } {
73            return Ok(runtime);
74        }
75
76        let bus = get_agent_bus().await?;
77        let runtime = Arc::new(ProtocolRelayRuntime::with_relay_id(
78            bus,
79            relay_id.to_string(),
80        ));
81        let mut store = RELAY_STORE.write();
82        let state = store
83            .entry(relay_id.to_string())
84            .or_insert_with(|| RelayRuntimeState::new(Arc::clone(&runtime)));
85        Ok(Arc::clone(&state.runtime))
86    }
87
88    fn remember_participant(relay_id: &str, participant: &str) {
89        let mut store = RELAY_STORE.write();
90        if let Some(state) = store.get_mut(relay_id) {
91            state.remember_participant(participant);
92        }
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::RelayAutoChatTool;
99    use serde_json::{Value, json};
100    use uuid::Uuid;
101
102    fn relay_id(prefix: &str) -> String {
103        format!("{prefix}-{}", Uuid::new_v4().simple())
104    }
105
106    fn parse_output_json(output: &str) -> Value {
107        serde_json::from_str(output).expect("tool output should be valid json")
108    }
109
110    #[tokio::test]
111    async fn list_agents_is_scoped_to_single_relay() {
112        let tool = RelayAutoChatTool::new();
113        let relay_a = relay_id("relay-a");
114        let relay_b = relay_id("relay-b");
115
116        tool.init_relay(
117            Some(relay_a.clone()),
118            Some("task a".to_string()),
119            None,
120            None,
121        )
122        .await
123        .expect("init relay a");
124        tool.init_relay(
125            Some(relay_b.clone()),
126            Some("task b".to_string()),
127            None,
128            None,
129        )
130        .await
131        .expect("init relay b");
132
133        tool.delegate_task(
134            Some(relay_a.clone()),
135            Some("agent-alpha".to_string()),
136            Some("do a".to_string()),
137            None,
138            None,
139        )
140        .await
141        .expect("delegate relay a");
142        tool.delegate_task(
143            Some(relay_b.clone()),
144            Some("agent-beta".to_string()),
145            Some("do b".to_string()),
146            None,
147            None,
148        )
149        .await
150        .expect("delegate relay b");
151
152        let result = tool
153            .list_agents(Some(relay_a))
154            .await
155            .expect("list agents for relay a");
156        assert!(result.success);
157        let payload = parse_output_json(&result.output);
158        let names: Vec<&str> = payload["agents"]
159            .as_array()
160            .expect("agents array")
161            .iter()
162            .filter_map(|agent| agent["name"].as_str())
163            .collect();
164
165        assert_eq!(names, vec!["agent-alpha"]);
166        assert_eq!(payload["count"], json!(1));
167    }
168
169    #[tokio::test]
170    async fn complete_relay_reports_unique_participant_count() {
171        let tool = RelayAutoChatTool::new();
172        let relay = relay_id("relay-complete");
173
174        tool.init_relay(Some(relay.clone()), Some("task".to_string()), None, None)
175            .await
176            .expect("init relay");
177
178        for target in ["agent-a", "agent-b", "agent-a"] {
179            tool.delegate_task(
180                Some(relay.clone()),
181                Some(target.to_string()),
182                Some("work".to_string()),
183                None,
184                None,
185            )
186            .await
187            .expect("delegate to participant");
188        }
189
190        let result = tool
191            .complete_relay(Some(relay))
192            .await
193            .expect("complete relay");
194        assert!(result.success);
195        let payload = parse_output_json(&result.output);
196        assert_eq!(payload["aggregated_results"]["total_agents"], json!(2));
197    }
198}
199
200#[async_trait]
201impl Tool for RelayAutoChatTool {
202    fn id(&self) -> &str {
203        "relay_autochat"
204    }
205
206    fn name(&self) -> &str {
207        "Relay AutoChat"
208    }
209
210    fn description(&self) -> &str {
211        "Autonomous relay communication between agents for task delegation and result aggregation. \
212         Actions: delegate (send task to target agent), handoff (pass context between agents), \
213         status (check relay status), list_agents (show available agents in relay), \
214         init (initialize a new relay with task), complete (finish relay and aggregate results)."
215    }
216
217    fn parameters(&self) -> Value {
218        json!({
219            "type": "object",
220            "properties": {
221                "action": {
222                    "type": "string",
223                    "enum": ["delegate", "handoff", "status", "list_agents", "init", "complete"],
224                    "description": "Action to perform"
225                },
226                "target_agent": {
227                    "type": "string",
228                    "description": "Target agent name for delegation/handoff"
229                },
230                "message": {
231                    "type": "string",
232                    "description": "Message to send to the target agent"
233                },
234                "context": {
235                    "type": "object",
236                    "description": "Additional context to pass along (JSON object)"
237                },
238                "relay_id": {
239                    "type": "string",
240                    "description": "Relay ID to use (auto-generated if not provided)"
241                },
242                "okr_id": {
243                    "type": "string",
244                    "description": "Optional OKR ID to associate with this relay"
245                },
246                "task": {
247                    "type": "string",
248                    "description": "Task description for initializing a new relay"
249                }
250            },
251            "required": ["action"]
252        })
253    }
254
255    async fn execute(&self, params: Value) -> Result<ToolResult> {
256        let action = match params.get("action").and_then(|v| v.as_str()) {
257            Some(s) if !s.is_empty() => s.to_string(),
258            _ => {
259                return Ok(ToolResult::structured_error(
260                    "MISSING_FIELD",
261                    "relay_autochat",
262                    "action is required. Valid actions: init, delegate, handoff, status, list_agents, complete",
263                    Some(vec!["action"]),
264                    Some(json!({
265                        "action": "init",
266                        "task": "description of the relay task"
267                    })),
268                ));
269            }
270        };
271
272        let relay_id = params
273            .get("relay_id")
274            .and_then(|v| v.as_str())
275            .map(String::from);
276        let target_agent = params
277            .get("target_agent")
278            .and_then(|v| v.as_str())
279            .map(String::from);
280        let message = params
281            .get("message")
282            .and_then(|v| v.as_str())
283            .map(String::from);
284        let context = params.get("context").cloned();
285        let okr_id = params
286            .get("okr_id")
287            .and_then(|v| v.as_str())
288            .map(String::from);
289        let task = params
290            .get("task")
291            .and_then(|v| v.as_str())
292            .map(String::from);
293
294        match action.as_str() {
295            "init" => self.init_relay(relay_id, task, context, okr_id).await,
296            "delegate" => {
297                self.delegate_task(relay_id, target_agent, message, context, okr_id)
298                    .await
299            }
300            "handoff" => {
301                self.handoff_context(relay_id, target_agent, message, context)
302                    .await
303            }
304            "status" => self.get_status(relay_id).await,
305            "list_agents" => self.list_agents(relay_id).await,
306            "complete" => self.complete_relay(relay_id).await,
307            _ => Ok(ToolResult::structured_error(
308                "INVALID_ACTION",
309                "relay_autochat",
310                &format!(
311                    "Unknown action: '{action}'. Valid actions: init, delegate, handoff, status, list_agents, complete"
312                ),
313                None,
314                Some(json!({
315                    "action": "init",
316                    "task": "description of the relay task"
317                })),
318            )),
319        }
320    }
321}
322
323impl RelayAutoChatTool {
324    /// Initialize a new relay with a task
325    async fn init_relay(
326        &self,
327        relay_id: Option<String>,
328        task: Option<String>,
329        _context: Option<Value>,
330        okr_id: Option<String>,
331    ) -> Result<ToolResult> {
332        let task = task.unwrap_or_else(|| "Unspecified task".to_string());
333        let relay_id =
334            relay_id.unwrap_or_else(|| format!("relay-{}", &Uuid::new_v4().to_string()[..8]));
335
336        let bus = get_agent_bus().await?;
337        let runtime = Arc::new(ProtocolRelayRuntime::with_relay_id(bus, relay_id.clone()));
338
339        // Store the runtime
340        {
341            let mut store = RELAY_STORE.write();
342            store.insert(relay_id.clone(), RelayRuntimeState::new(runtime));
343        }
344
345        let response = json!({
346            "status": "initialized",
347            "relay_id": relay_id,
348            "task": task,
349            "okr_id": okr_id,
350            "message": "Relay initialized. Use 'delegate' to assign tasks to agents, or 'list_agents' to see available agents."
351        });
352
353        let mut result = ToolResult::success(
354            serde_json::to_string_pretty(&response).unwrap_or_else(|_| format!("{:?}", response)),
355        )
356        .with_metadata("relay_id", json!(relay_id));
357        if let Some(okr_id) = response.get("okr_id").and_then(|v| v.as_str()) {
358            result = result.with_metadata("okr_id", json!(okr_id));
359        }
360
361        Ok(result)
362    }
363
364    /// Delegate a task to a target agent
365    async fn delegate_task(
366        &self,
367        relay_id: Option<String>,
368        target_agent: Option<String>,
369        message: Option<String>,
370        context: Option<Value>,
371        okr_id: Option<String>,
372    ) -> Result<ToolResult> {
373        let relay_id = match relay_id {
374            Some(id) => id,
375            None => {
376                return Ok(ToolResult::structured_error(
377                    "MISSING_FIELD",
378                    "relay_autochat",
379                    "relay_id is required for delegate action",
380                    Some(vec!["relay_id"]),
381                    Some(
382                        json!({"action": "delegate", "relay_id": "relay-xxx", "target_agent": "agent-name", "message": "task description"}),
383                    ),
384                ));
385            }
386        };
387        let target_agent = match target_agent {
388            Some(a) => a,
389            None => {
390                return Ok(ToolResult::structured_error(
391                    "MISSING_FIELD",
392                    "relay_autochat",
393                    "target_agent is required for delegate action",
394                    Some(vec!["target_agent"]),
395                    Some(
396                        json!({"action": "delegate", "relay_id": relay_id, "target_agent": "agent-name", "message": "task description"}),
397                    ),
398                ));
399            }
400        };
401        let message = message.unwrap_or_else(|| "New task assigned".to_string());
402
403        let runtime = self.get_or_create_runtime(&relay_id).await?;
404
405        // Build context payload if provided
406        let context_msg = if let Some(ref ctx) = context {
407            format!(
408                "{}\n\nContext: {}",
409                message,
410                serde_json::to_string_pretty(ctx).unwrap_or_default()
411            )
412        } else {
413            message.clone()
414        };
415
416        // Send the delegation message
417        runtime.send_handoff("system", &target_agent, &context_msg);
418        Self::remember_participant(&relay_id, &target_agent);
419
420        let response = json!({
421            "status": "delegated",
422            "relay_id": relay_id,
423            "target_agent": target_agent,
424            "okr_id": okr_id,
425            "message": message,
426            "initial_results": {
427                "task_assigned": true,
428                "agent_notified": true
429            }
430        });
431
432        let mut result = ToolResult::success(
433            serde_json::to_string_pretty(&response).unwrap_or_else(|_| format!("{:?}", response)),
434        )
435        .with_metadata("relay_id", json!(relay_id))
436        .with_metadata("target_agent", json!(target_agent));
437        if let Some(okr_id) = response.get("okr_id").and_then(|v| v.as_str()) {
438            result = result.with_metadata("okr_id", json!(okr_id));
439        }
440
441        Ok(result)
442    }
443
444    /// Hand off context between agents
445    async fn handoff_context(
446        &self,
447        relay_id: Option<String>,
448        target_agent: Option<String>,
449        message: Option<String>,
450        context: Option<Value>,
451    ) -> Result<ToolResult> {
452        let relay_id = match relay_id {
453            Some(id) => id,
454            None => {
455                return Ok(ToolResult::structured_error(
456                    "MISSING_FIELD",
457                    "relay_autochat",
458                    "relay_id is required for handoff action",
459                    Some(vec!["relay_id"]),
460                    Some(
461                        json!({"action": "handoff", "relay_id": "relay-xxx", "target_agent": "agent-name"}),
462                    ),
463                ));
464            }
465        };
466        let target_agent = match target_agent {
467            Some(a) => a,
468            None => {
469                return Ok(ToolResult::structured_error(
470                    "MISSING_FIELD",
471                    "relay_autochat",
472                    "target_agent is required for handoff action",
473                    Some(vec!["target_agent"]),
474                    Some(
475                        json!({"action": "handoff", "relay_id": relay_id, "target_agent": "agent-name"}),
476                    ),
477                ));
478            }
479        };
480        let message = message.unwrap_or_else(|| "Context handoff".to_string());
481
482        let store = RELAY_STORE.read();
483        let runtime = match store.get(&relay_id) {
484            Some(state) => Arc::clone(&state.runtime),
485            None => {
486                return Ok(ToolResult::structured_error(
487                    "NOT_FOUND",
488                    "relay_autochat",
489                    &format!(
490                        "Relay not found: {relay_id}. Use 'init' action to create a relay first."
491                    ),
492                    None,
493                    Some(json!({"action": "init", "task": "description of the relay task"})),
494                ));
495            }
496        };
497        // need to drop the lock before await
498        drop(store);
499
500        // Build context payload
501        let context_msg = if let Some(ref ctx) = context {
502            format!(
503                "{}\n\nContext: {}",
504                message,
505                serde_json::to_string_pretty(ctx).unwrap_or_default()
506            )
507        } else {
508            message
509        };
510
511        // Send handoff
512        runtime.send_handoff("previous_agent", &target_agent, &context_msg);
513        Self::remember_participant(&relay_id, &target_agent);
514
515        let response = json!({
516            "status": "handoff_complete",
517            "relay_id": relay_id,
518            "target_agent": target_agent,
519            "message": "Context successfully handed off to target agent"
520        });
521
522        Ok(ToolResult::success(
523            serde_json::to_string_pretty(&response).unwrap_or_else(|_| format!("{:?}", response)),
524        ))
525    }
526
527    /// Get status of a relay
528    async fn get_status(&self, relay_id: Option<String>) -> Result<ToolResult> {
529        let relay_id = match relay_id {
530            Some(id) => id,
531            None => {
532                return Ok(ToolResult::structured_error(
533                    "MISSING_FIELD",
534                    "relay_autochat",
535                    "relay_id is required for status action",
536                    Some(vec!["relay_id"]),
537                    Some(json!({"action": "status", "relay_id": "relay-xxx"})),
538                ));
539            }
540        };
541
542        let store = RELAY_STORE.read();
543
544        if store.contains_key(&relay_id) {
545            let response = json!({
546                "status": "active",
547                "relay_id": relay_id,
548                "message": "Relay is active"
549            });
550
551            Ok(ToolResult::success(
552                serde_json::to_string_pretty(&response)
553                    .unwrap_or_else(|_| format!("{:?}", response)),
554            ))
555        } else {
556            Ok(ToolResult::error(format!("Relay not found: {}", relay_id)))
557        }
558    }
559
560    /// List agents in a relay
561    async fn list_agents(&self, relay_id: Option<String>) -> Result<ToolResult> {
562        let relay_id = match relay_id {
563            Some(id) => id,
564            None => {
565                return Ok(ToolResult::structured_error(
566                    "MISSING_FIELD",
567                    "relay_autochat",
568                    "relay_id is required for list_agents action",
569                    Some(vec!["relay_id"]),
570                    Some(json!({"action": "list_agents", "relay_id": "relay-xxx"})),
571                ));
572            }
573        };
574
575        let participants = {
576            let store = RELAY_STORE.read();
577            store.get(&relay_id).map(|state| state.participants.clone())
578        };
579
580        if let Some(participants) = participants {
581            let agents: Vec<Value> = participants
582                .iter()
583                .map(|name| json!({ "name": name }))
584                .collect();
585
586            let response = json!({
587                "relay_id": relay_id,
588                "agents": agents,
589                "count": agents.len()
590            });
591
592            Ok(ToolResult::success(
593                serde_json::to_string_pretty(&response)
594                    .unwrap_or_else(|_| format!("{:?}", response)),
595            ))
596        } else {
597            Ok(ToolResult::error(format!("Relay not found: {}", relay_id)))
598        }
599    }
600
601    /// Complete a relay and aggregate results
602    async fn complete_relay(&self, relay_id: Option<String>) -> Result<ToolResult> {
603        let relay_id = match relay_id {
604            Some(id) => id,
605            None => {
606                return Ok(ToolResult::structured_error(
607                    "MISSING_FIELD",
608                    "relay_autochat",
609                    "relay_id is required for complete action",
610                    Some(vec!["relay_id"]),
611                    Some(json!({"action": "complete", "relay_id": "relay-xxx"})),
612                ));
613            }
614        };
615
616        // Get the runtime and shutdown agents
617        let relay_state = {
618            let mut store = RELAY_STORE.write();
619            store.remove(&relay_id)
620        };
621
622        let total_agents = relay_state
623            .as_ref()
624            .map(|state| state.participants.len())
625            .unwrap_or(0);
626        if let Some(state) = relay_state {
627            state.runtime.shutdown_agents(&state.participants);
628        }
629
630        let response = json!({
631            "status": "completed",
632            "relay_id": relay_id,
633            "message": "Relay completed successfully. Results aggregated.",
634            "aggregated_results": {
635                "completed": true,
636                "total_agents": total_agents
637            }
638        });
639
640        Ok(ToolResult::success(
641            serde_json::to_string_pretty(&response).unwrap_or_else(|_| format!("{:?}", response)),
642        ))
643    }
644}