1use super::traits::{Tool, ToolResult};
9use crate::channels::traits::{Channel, ChannelMessage, SendMessage};
10use crate::security::SecurityPolicy;
11use crate::security::policy::ToolOperation;
12use crate::tools::ask_user::ChannelMapHandle;
13use async_trait::async_trait;
14use parking_lot::RwLock;
15use serde_json::json;
16use std::collections::HashMap;
17use std::path::PathBuf;
18use std::sync::Arc;
19
20const PUSHOVER_API_URL: &str = "https://api.pushover.net/1/messages.json";
21const PUSHOVER_REQUEST_TIMEOUT_SECS: u64 = 15;
22const DEFAULT_TIMEOUT_SECS: u64 = 600;
23
24const VALID_URGENCY_LEVELS: &[&str] = &["low", "medium", "high", "critical"];
25
26pub struct EscalateToHumanTool {
28 security: Arc<SecurityPolicy>,
29 channel_map: ChannelMapHandle,
30 workspace_dir: PathBuf,
31}
32
33impl EscalateToHumanTool {
34 pub fn new(security: Arc<SecurityPolicy>, workspace_dir: PathBuf) -> Self {
35 Self {
36 security,
37 channel_map: Arc::new(RwLock::new(HashMap::new())),
38 workspace_dir,
39 }
40 }
41
42 pub fn channel_map_handle(&self) -> ChannelMapHandle {
44 Arc::clone(&self.channel_map)
45 }
46
47 fn format_message(urgency: &str, summary: &str, context: Option<&str>) -> String {
49 let prefix = match urgency {
50 "low" => "\u{2139}\u{fe0f} [LOW]",
51 "high" => "\u{1f534} [HIGH]",
52 "critical" => "\u{1f6a8} [CRITICAL]",
53 _ => "\u{26a0}\u{fe0f} [MEDIUM]",
55 };
56
57 let mut lines = vec![
58 format!("{prefix} Agent Escalation"),
59 format!("Summary: {summary}"),
60 ];
61
62 if let Some(ctx) = context {
63 lines.push(format!("Context: {ctx}"));
64 }
65
66 lines.push("---".to_string());
67 lines.push("Reply to this message to respond.".to_string());
68
69 lines.join("\n")
70 }
71
72 async fn get_pushover_credentials(&self) -> Option<(String, String)> {
74 let env_path = self.workspace_dir.join(".env");
75 let content = tokio::fs::read_to_string(&env_path).await.ok()?;
76
77 let mut token = None;
78 let mut user_key = None;
79
80 for line in content.lines() {
81 let line = line.trim();
82 if line.starts_with('#') || line.is_empty() {
83 continue;
84 }
85 let line = line.strip_prefix("export ").map(str::trim).unwrap_or(line);
86 if let Some((key, value)) = line.split_once('=') {
87 let key = key.trim();
88 let value = Self::parse_env_value(value);
89
90 if key.eq_ignore_ascii_case("PUSHOVER_TOKEN") {
91 token = Some(value);
92 } else if key.eq_ignore_ascii_case("PUSHOVER_USER_KEY") {
93 user_key = Some(value);
94 }
95 }
96 }
97
98 match (token, user_key) {
99 (Some(t), Some(u)) if !t.is_empty() && !u.is_empty() => Some((t, u)),
100 _ => None,
101 }
102 }
103
104 fn parse_env_value(raw: &str) -> String {
105 let raw = raw.trim();
106 let unquoted = if raw.len() >= 2
107 && ((raw.starts_with('"') && raw.ends_with('"'))
108 || (raw.starts_with('\'') && raw.ends_with('\'')))
109 {
110 &raw[1..raw.len() - 1]
111 } else {
112 raw
113 };
114 unquoted.split_once(" #").map_or_else(
115 || unquoted.trim().to_string(),
116 |(value, _)| value.trim().to_string(),
117 )
118 }
119
120 async fn send_pushover(&self, urgency: &str, summary: &str) {
122 let creds = match self.get_pushover_credentials().await {
123 Some(c) => c,
124 None => {
125 tracing::debug!(
126 "escalate_to_human: Pushover credentials not available, skipping push notification"
127 );
128 return;
129 }
130 };
131
132 let priority = match urgency {
133 "critical" => 1,
134 "high" => 0,
135 _ => return,
136 };
137
138 let form = reqwest::multipart::Form::new()
139 .text("token", creds.0)
140 .text("user", creds.1)
141 .text("message", summary.to_string())
142 .text("title", "Agent Escalation")
143 .text("priority", priority.to_string());
144
145 let client = crate::config::build_runtime_proxy_client_with_timeouts(
146 "tool.escalate_to_human",
147 PUSHOVER_REQUEST_TIMEOUT_SECS,
148 10,
149 );
150
151 match client.post(PUSHOVER_API_URL).multipart(form).send().await {
152 Ok(resp) if resp.status().is_success() => {
153 tracing::info!("escalate_to_human: Pushover notification sent");
154 }
155 Ok(resp) => {
156 tracing::warn!(
157 "escalate_to_human: Pushover returned status {}",
158 resp.status()
159 );
160 }
161 Err(e) => {
162 tracing::warn!("escalate_to_human: Pushover request failed: {e}");
163 }
164 }
165 }
166}
167
168#[async_trait]
169impl Tool for EscalateToHumanTool {
170 fn name(&self) -> &str {
171 "escalate_to_human"
172 }
173
174 fn description(&self) -> &str {
175 "Escalate a situation to a human operator with urgency routing. \
176 Sends a structured message to the active channel. High/critical urgency \
177 also triggers a Pushover mobile notification when configured. \
178 Optionally blocks to wait for a human response."
179 }
180
181 fn parameters_schema(&self) -> serde_json::Value {
182 json!({
183 "type": "object",
184 "properties": {
185 "summary": {
186 "type": "string",
187 "description": "One-line escalation summary"
188 },
189 "context": {
190 "type": "string",
191 "description": "Detailed context for the human"
192 },
193 "urgency": {
194 "type": "string",
195 "enum": ["low", "medium", "high", "critical"],
196 "description": "Urgency level (default: medium). high/critical triggers Pushover notification."
197 },
198 "wait_for_response": {
199 "type": "boolean",
200 "description": "Block and return the human's reply (default: false)"
201 },
202 "timeout_secs": {
203 "type": "integer",
204 "description": "Seconds to wait for a response when wait_for_response is true (default: 600)"
205 }
206 },
207 "required": ["summary"]
208 })
209 }
210
211 async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
212 if let Err(e) = self
214 .security
215 .enforce_tool_operation(ToolOperation::Act, "escalate_to_human")
216 {
217 return Ok(ToolResult {
218 success: false,
219 output: String::new(),
220 error: Some(format!("Action blocked: {e}")),
221 });
222 }
223
224 let summary = args
226 .get("summary")
227 .and_then(|v| v.as_str())
228 .map(|s| s.trim())
229 .filter(|s| !s.is_empty())
230 .ok_or_else(|| anyhow::anyhow!("Missing 'summary' parameter"))?
231 .to_string();
232
233 let context = args
234 .get("context")
235 .and_then(|v| v.as_str())
236 .map(|s| s.trim().to_string())
237 .filter(|s| !s.is_empty());
238
239 let urgency = args
240 .get("urgency")
241 .and_then(|v| v.as_str())
242 .unwrap_or("medium");
243
244 if !VALID_URGENCY_LEVELS.contains(&urgency) {
245 return Ok(ToolResult {
246 success: false,
247 output: String::new(),
248 error: Some(format!(
249 "Invalid urgency '{}'. Must be one of: {}",
250 urgency,
251 VALID_URGENCY_LEVELS.join(", ")
252 )),
253 });
254 }
255
256 let wait_for_response = args
257 .get("wait_for_response")
258 .and_then(|v| v.as_bool())
259 .unwrap_or(false);
260
261 let timeout_secs = args
262 .get("timeout_secs")
263 .and_then(|v| v.as_u64())
264 .unwrap_or(DEFAULT_TIMEOUT_SECS);
265
266 let text = Self::format_message(urgency, &summary, context.as_deref());
268
269 let (channel_name, channel): (String, Arc<dyn Channel>) = {
271 let channels = self.channel_map.read();
272 if channels.is_empty() {
273 return Ok(ToolResult {
274 success: false,
275 output: String::new(),
276 error: Some("No channels available yet (channels not initialized)".to_string()),
277 });
278 }
279 let (name, ch) = channels.iter().next().ok_or_else(|| {
280 anyhow::anyhow!("No channels available. Configure at least one channel.")
281 })?;
282 (name.clone(), ch.clone())
283 };
284
285 let msg = SendMessage::new(&text, "");
287 if let Err(e) = channel.send(&msg).await {
288 return Ok(ToolResult {
289 success: false,
290 output: String::new(),
291 error: Some(format!(
292 "Failed to send escalation to channel '{channel_name}': {e}"
293 )),
294 });
295 }
296
297 if urgency == "high" || urgency == "critical" {
299 self.send_pushover(urgency, &summary).await;
300 }
301
302 if wait_for_response {
303 let (tx, mut rx) = tokio::sync::mpsc::channel::<ChannelMessage>(1);
305 let timeout = std::time::Duration::from_secs(timeout_secs);
306
307 let listen_channel = Arc::clone(&channel);
308 let listen_handle = tokio::spawn(async move { listen_channel.listen(tx).await });
309
310 let response = tokio::time::timeout(timeout, rx.recv()).await;
311 listen_handle.abort();
312
313 match response {
314 Ok(Some(msg)) => Ok(ToolResult {
315 success: true,
316 output: msg.content,
317 error: None,
318 }),
319 Ok(None) => Ok(ToolResult {
320 success: false,
321 output: "TIMEOUT".to_string(),
322 error: Some("Channel closed before receiving a response".to_string()),
323 }),
324 Err(_) => Ok(ToolResult {
325 success: false,
326 output: "TIMEOUT".to_string(),
327 error: Some(format!(
328 "No response received within {timeout_secs} seconds"
329 )),
330 }),
331 }
332 } else {
333 Ok(ToolResult {
335 success: true,
336 output: json!({
337 "status": "escalated",
338 "urgency": urgency,
339 "channel": channel_name,
340 })
341 .to_string(),
342 error: None,
343 })
344 }
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351
352 struct SilentChannel {
354 channel_name: String,
355 sent: Arc<RwLock<Vec<String>>>,
356 }
357
358 impl SilentChannel {
359 fn new(name: &str) -> Self {
360 Self {
361 channel_name: name.to_string(),
362 sent: Arc::new(RwLock::new(Vec::new())),
363 }
364 }
365 }
366
367 #[async_trait]
368 impl Channel for SilentChannel {
369 fn name(&self) -> &str {
370 &self.channel_name
371 }
372
373 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
374 self.sent.write().push(message.content.clone());
375 Ok(())
376 }
377
378 async fn listen(
379 &self,
380 _tx: tokio::sync::mpsc::Sender<ChannelMessage>,
381 ) -> anyhow::Result<()> {
382 tokio::time::sleep(std::time::Duration::from_secs(600)).await;
384 Ok(())
385 }
386 }
387
388 struct RespondingChannel {
390 channel_name: String,
391 response: String,
392 sent: Arc<RwLock<Vec<String>>>,
393 }
394
395 impl RespondingChannel {
396 fn new(name: &str, response: &str) -> Self {
397 Self {
398 channel_name: name.to_string(),
399 response: response.to_string(),
400 sent: Arc::new(RwLock::new(Vec::new())),
401 }
402 }
403 }
404
405 #[async_trait]
406 impl Channel for RespondingChannel {
407 fn name(&self) -> &str {
408 &self.channel_name
409 }
410
411 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
412 self.sent.write().push(message.content.clone());
413 Ok(())
414 }
415
416 async fn listen(
417 &self,
418 tx: tokio::sync::mpsc::Sender<ChannelMessage>,
419 ) -> anyhow::Result<()> {
420 let msg = ChannelMessage {
421 id: "resp_1".to_string(),
422 sender: "human".to_string(),
423 reply_target: "human".to_string(),
424 content: self.response.clone(),
425 channel: self.channel_name.clone(),
426 timestamp: 1000,
427 thread_ts: None,
428 interruption_scope_id: None,
429 attachments: vec![],
430 };
431 let _ = tx.send(msg).await;
432 Ok(())
433 }
434 }
435
436 fn make_tool_with_channels(channels: Vec<(&str, Arc<dyn Channel>)>) -> EscalateToHumanTool {
437 let tool =
438 EscalateToHumanTool::new(Arc::new(SecurityPolicy::default()), PathBuf::from("/tmp"));
439 let map: HashMap<String, Arc<dyn Channel>> = channels
440 .into_iter()
441 .map(|(name, ch)| (name.to_string(), ch))
442 .collect();
443 *tool.channel_map.write() = map;
444 tool
445 }
446
447 #[test]
450 fn test_tool_metadata() {
451 let tool =
452 EscalateToHumanTool::new(Arc::new(SecurityPolicy::default()), PathBuf::from("/tmp"));
453 assert_eq!(tool.name(), "escalate_to_human");
454 assert!(!tool.description().is_empty());
455 assert!(tool.description().to_lowercase().contains("escalat"));
456 }
457
458 #[test]
461 fn test_parameters_schema() {
462 let tool =
463 EscalateToHumanTool::new(Arc::new(SecurityPolicy::default()), PathBuf::from("/tmp"));
464 let schema = tool.parameters_schema();
465 assert_eq!(schema["type"], "object");
466 assert!(schema["properties"]["summary"].is_object());
467 assert!(schema["properties"]["urgency"].is_object());
468 assert!(schema["properties"]["context"].is_object());
469 assert!(schema["properties"]["wait_for_response"].is_object());
470 assert!(schema["properties"]["timeout_secs"].is_object());
471 let required = schema["required"].as_array().unwrap();
472 assert!(required.iter().any(|v| v == "summary"));
473 assert!(!required.iter().any(|v| v == "urgency"));
475 assert!(!required.iter().any(|v| v == "context"));
476 assert!(!required.iter().any(|v| v == "wait_for_response"));
477 assert!(!required.iter().any(|v| v == "timeout_secs"));
478 }
479
480 #[tokio::test]
483 async fn test_default_urgency_is_medium() {
484 let channel = Arc::new(SilentChannel::new("test"));
485 let sent = Arc::clone(&channel.sent);
486 let tool = make_tool_with_channels(vec![("test", channel as Arc<dyn Channel>)]);
487
488 let result = tool
489 .execute(json!({ "summary": "Need help" }))
490 .await
491 .unwrap();
492
493 assert!(result.success, "error: {:?}", result.error);
494 assert!(result.output.contains("\"medium\""));
496 let messages = sent.read();
498 assert!(!messages.is_empty());
499 assert!(messages[0].contains("[MEDIUM]"));
500 }
501
502 #[test]
505 fn test_message_format_low() {
506 let msg = EscalateToHumanTool::format_message("low", "Disk space low", None);
507 assert!(msg.starts_with("\u{2139}\u{fe0f} [LOW]"));
508 assert!(msg.contains("Summary: Disk space low"));
509 assert!(msg.contains("Reply to this message to respond."));
510 }
511
512 #[test]
515 fn test_message_format_critical() {
516 let msg = EscalateToHumanTool::format_message(
517 "critical",
518 "Production down",
519 Some("Database unreachable for 5 minutes"),
520 );
521 assert!(msg.starts_with("\u{1f6a8} [CRITICAL]"));
522 assert!(msg.contains("Summary: Production down"));
523 assert!(msg.contains("Context: Database unreachable for 5 minutes"));
524 }
525
526 #[tokio::test]
529 async fn test_invalid_urgency_rejected() {
530 let tool = make_tool_with_channels(vec![(
531 "test",
532 Arc::new(SilentChannel::new("test")) as Arc<dyn Channel>,
533 )]);
534
535 let result = tool
536 .execute(json!({ "summary": "Help", "urgency": "extreme" }))
537 .await
538 .unwrap();
539
540 assert!(!result.success);
541 assert!(result.error.as_deref().unwrap().contains("Invalid urgency"));
542 assert!(result.error.as_deref().unwrap().contains("extreme"));
543 }
544
545 #[tokio::test]
548 async fn test_non_blocking_returns_status() {
549 let tool = make_tool_with_channels(vec![(
550 "slack",
551 Arc::new(SilentChannel::new("slack")) as Arc<dyn Channel>,
552 )]);
553
554 let result = tool
555 .execute(json!({
556 "summary": "Need approval",
557 "urgency": "low"
558 }))
559 .await
560 .unwrap();
561
562 assert!(result.success, "error: {:?}", result.error);
563 let parsed: serde_json::Value = serde_json::from_str(&result.output).unwrap();
564 assert_eq!(parsed["status"], "escalated");
565 assert_eq!(parsed["urgency"], "low");
566 assert_eq!(parsed["channel"], "slack");
567 }
568
569 #[tokio::test]
572 async fn test_blocking_mode_returns_response() {
573 let tool = make_tool_with_channels(vec![(
574 "test",
575 Arc::new(RespondingChannel::new("test", "Approved, go ahead")) as Arc<dyn Channel>,
576 )]);
577
578 let result = tool
579 .execute(json!({
580 "summary": "Need deployment approval",
581 "wait_for_response": true,
582 "timeout_secs": 5
583 }))
584 .await
585 .unwrap();
586
587 assert!(result.success, "error: {:?}", result.error);
588 assert_eq!(result.output, "Approved, go ahead");
589 }
590
591 #[tokio::test]
594 async fn test_blocking_mode_timeout() {
595 let tool = make_tool_with_channels(vec![(
596 "test",
597 Arc::new(SilentChannel::new("test")) as Arc<dyn Channel>,
598 )]);
599
600 let result = tool
601 .execute(json!({
602 "summary": "Waiting for response",
603 "wait_for_response": true,
604 "timeout_secs": 1
605 }))
606 .await
607 .unwrap();
608
609 assert!(!result.success);
610 assert_eq!(result.output, "TIMEOUT");
611 assert!(result.error.as_deref().unwrap().contains("1 seconds"));
612 }
613
614 #[tokio::test]
617 async fn test_pushover_not_required() {
618 let tool = make_tool_with_channels(vec![(
620 "test",
621 Arc::new(SilentChannel::new("test")) as Arc<dyn Channel>,
622 )]);
623
624 let result = tool
625 .execute(json!({
626 "summary": "Critical alert",
627 "urgency": "high"
628 }))
629 .await
630 .unwrap();
631
632 assert!(result.success, "error: {:?}", result.error);
633 let parsed: serde_json::Value = serde_json::from_str(&result.output).unwrap();
634 assert_eq!(parsed["status"], "escalated");
635 assert_eq!(parsed["urgency"], "high");
636 }
637}