Skip to main content

construct/tools/
escalate.rs

1//! Human escalation tool with urgency-aware routing.
2//!
3//! Exposes `escalate_to_human` as an agent-callable tool that sends a structured
4//! escalation message to a messaging channel. High/critical urgency escalations
5//! additionally fire a Pushover mobile notification when credentials are available.
6//! Supports optional blocking mode to wait for a human response.
7
8use super::traits::{Tool, ToolResult};
9use crate::channels::traits::{Channel, ChannelMessage, SendMessage};
10use crate::security::SecurityPolicy;
11use crate::security::policy::ToolOperation;
12use crate::tools::ask_user::ChannelMapHandle;
13use async_trait::async_trait;
14use parking_lot::RwLock;
15use serde_json::json;
16use std::collections::HashMap;
17use std::path::PathBuf;
18use std::sync::Arc;
19
20const PUSHOVER_API_URL: &str = "https://api.pushover.net/1/messages.json";
21const PUSHOVER_REQUEST_TIMEOUT_SECS: u64 = 15;
22const DEFAULT_TIMEOUT_SECS: u64 = 600;
23
24const VALID_URGENCY_LEVELS: &[&str] = &["low", "medium", "high", "critical"];
25
26/// Agent-callable tool for escalating situations to a human operator with urgency routing.
27pub struct EscalateToHumanTool {
28    security: Arc<SecurityPolicy>,
29    channel_map: ChannelMapHandle,
30    workspace_dir: PathBuf,
31}
32
33impl EscalateToHumanTool {
34    pub fn new(security: Arc<SecurityPolicy>, workspace_dir: PathBuf) -> Self {
35        Self {
36            security,
37            channel_map: Arc::new(RwLock::new(HashMap::new())),
38            workspace_dir,
39        }
40    }
41
42    /// Return the shared handle so callers can populate it after channel init.
43    pub fn channel_map_handle(&self) -> ChannelMapHandle {
44        Arc::clone(&self.channel_map)
45    }
46
47    /// Format the escalation message with urgency prefix.
48    fn format_message(urgency: &str, summary: &str, context: Option<&str>) -> String {
49        let prefix = match urgency {
50            "low" => "\u{2139}\u{fe0f} [LOW]",
51            "high" => "\u{1f534} [HIGH]",
52            "critical" => "\u{1f6a8} [CRITICAL]",
53            // "medium" and any other value
54            _ => "\u{26a0}\u{fe0f} [MEDIUM]",
55        };
56
57        let mut lines = vec![
58            format!("{prefix} Agent Escalation"),
59            format!("Summary: {summary}"),
60        ];
61
62        if let Some(ctx) = context {
63            lines.push(format!("Context: {ctx}"));
64        }
65
66        lines.push("---".to_string());
67        lines.push("Reply to this message to respond.".to_string());
68
69        lines.join("\n")
70    }
71
72    /// Try to read Pushover credentials from .env file. Returns None if unavailable.
73    async fn get_pushover_credentials(&self) -> Option<(String, String)> {
74        let env_path = self.workspace_dir.join(".env");
75        let content = tokio::fs::read_to_string(&env_path).await.ok()?;
76
77        let mut token = None;
78        let mut user_key = None;
79
80        for line in content.lines() {
81            let line = line.trim();
82            if line.starts_with('#') || line.is_empty() {
83                continue;
84            }
85            let line = line.strip_prefix("export ").map(str::trim).unwrap_or(line);
86            if let Some((key, value)) = line.split_once('=') {
87                let key = key.trim();
88                let value = Self::parse_env_value(value);
89
90                if key.eq_ignore_ascii_case("PUSHOVER_TOKEN") {
91                    token = Some(value);
92                } else if key.eq_ignore_ascii_case("PUSHOVER_USER_KEY") {
93                    user_key = Some(value);
94                }
95            }
96        }
97
98        match (token, user_key) {
99            (Some(t), Some(u)) if !t.is_empty() && !u.is_empty() => Some((t, u)),
100            _ => None,
101        }
102    }
103
104    fn parse_env_value(raw: &str) -> String {
105        let raw = raw.trim();
106        let unquoted = if raw.len() >= 2
107            && ((raw.starts_with('"') && raw.ends_with('"'))
108                || (raw.starts_with('\'') && raw.ends_with('\'')))
109        {
110            &raw[1..raw.len() - 1]
111        } else {
112            raw
113        };
114        unquoted.split_once(" #").map_or_else(
115            || unquoted.trim().to_string(),
116            |(value, _)| value.trim().to_string(),
117        )
118    }
119
120    /// Send a Pushover notification. Logs but does not fail on error.
121    async fn send_pushover(&self, urgency: &str, summary: &str) {
122        let creds = match self.get_pushover_credentials().await {
123            Some(c) => c,
124            None => {
125                tracing::debug!(
126                    "escalate_to_human: Pushover credentials not available, skipping push notification"
127                );
128                return;
129            }
130        };
131
132        let priority = match urgency {
133            "critical" => 1,
134            "high" => 0,
135            _ => return,
136        };
137
138        let form = reqwest::multipart::Form::new()
139            .text("token", creds.0)
140            .text("user", creds.1)
141            .text("message", summary.to_string())
142            .text("title", "Agent Escalation")
143            .text("priority", priority.to_string());
144
145        let client = crate::config::build_runtime_proxy_client_with_timeouts(
146            "tool.escalate_to_human",
147            PUSHOVER_REQUEST_TIMEOUT_SECS,
148            10,
149        );
150
151        match client.post(PUSHOVER_API_URL).multipart(form).send().await {
152            Ok(resp) if resp.status().is_success() => {
153                tracing::info!("escalate_to_human: Pushover notification sent");
154            }
155            Ok(resp) => {
156                tracing::warn!(
157                    "escalate_to_human: Pushover returned status {}",
158                    resp.status()
159                );
160            }
161            Err(e) => {
162                tracing::warn!("escalate_to_human: Pushover request failed: {e}");
163            }
164        }
165    }
166}
167
168#[async_trait]
169impl Tool for EscalateToHumanTool {
170    fn name(&self) -> &str {
171        "escalate_to_human"
172    }
173
174    fn description(&self) -> &str {
175        "Escalate a situation to a human operator with urgency routing. \
176         Sends a structured message to the active channel. High/critical urgency \
177         also triggers a Pushover mobile notification when configured. \
178         Optionally blocks to wait for a human response."
179    }
180
181    fn parameters_schema(&self) -> serde_json::Value {
182        json!({
183            "type": "object",
184            "properties": {
185                "summary": {
186                    "type": "string",
187                    "description": "One-line escalation summary"
188                },
189                "context": {
190                    "type": "string",
191                    "description": "Detailed context for the human"
192                },
193                "urgency": {
194                    "type": "string",
195                    "enum": ["low", "medium", "high", "critical"],
196                    "description": "Urgency level (default: medium). high/critical triggers Pushover notification."
197                },
198                "wait_for_response": {
199                    "type": "boolean",
200                    "description": "Block and return the human's reply (default: false)"
201                },
202                "timeout_secs": {
203                    "type": "integer",
204                    "description": "Seconds to wait for a response when wait_for_response is true (default: 600)"
205                }
206            },
207            "required": ["summary"]
208        })
209    }
210
211    async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
212        // Security gate
213        if let Err(e) = self
214            .security
215            .enforce_tool_operation(ToolOperation::Act, "escalate_to_human")
216        {
217            return Ok(ToolResult {
218                success: false,
219                output: String::new(),
220                error: Some(format!("Action blocked: {e}")),
221            });
222        }
223
224        // Parse required params
225        let summary = args
226            .get("summary")
227            .and_then(|v| v.as_str())
228            .map(|s| s.trim())
229            .filter(|s| !s.is_empty())
230            .ok_or_else(|| anyhow::anyhow!("Missing 'summary' parameter"))?
231            .to_string();
232
233        let context = args
234            .get("context")
235            .and_then(|v| v.as_str())
236            .map(|s| s.trim().to_string())
237            .filter(|s| !s.is_empty());
238
239        let urgency = args
240            .get("urgency")
241            .and_then(|v| v.as_str())
242            .unwrap_or("medium");
243
244        if !VALID_URGENCY_LEVELS.contains(&urgency) {
245            return Ok(ToolResult {
246                success: false,
247                output: String::new(),
248                error: Some(format!(
249                    "Invalid urgency '{}'. Must be one of: {}",
250                    urgency,
251                    VALID_URGENCY_LEVELS.join(", ")
252                )),
253            });
254        }
255
256        let wait_for_response = args
257            .get("wait_for_response")
258            .and_then(|v| v.as_bool())
259            .unwrap_or(false);
260
261        let timeout_secs = args
262            .get("timeout_secs")
263            .and_then(|v| v.as_u64())
264            .unwrap_or(DEFAULT_TIMEOUT_SECS);
265
266        // Format the message
267        let text = Self::format_message(urgency, &summary, context.as_deref());
268
269        // Resolve channel — block-scoped to drop the RwLock guard before any .await
270        let (channel_name, channel): (String, Arc<dyn Channel>) = {
271            let channels = self.channel_map.read();
272            if channels.is_empty() {
273                return Ok(ToolResult {
274                    success: false,
275                    output: String::new(),
276                    error: Some("No channels available yet (channels not initialized)".to_string()),
277                });
278            }
279            let (name, ch) = channels.iter().next().ok_or_else(|| {
280                anyhow::anyhow!("No channels available. Configure at least one channel.")
281            })?;
282            (name.clone(), ch.clone())
283        };
284
285        // Send the escalation message
286        let msg = SendMessage::new(&text, "");
287        if let Err(e) = channel.send(&msg).await {
288            return Ok(ToolResult {
289                success: false,
290                output: String::new(),
291                error: Some(format!(
292                    "Failed to send escalation to channel '{channel_name}': {e}"
293                )),
294            });
295        }
296
297        // Fire Pushover for high/critical urgency (non-blocking, best-effort)
298        if urgency == "high" || urgency == "critical" {
299            self.send_pushover(urgency, &summary).await;
300        }
301
302        if wait_for_response {
303            // Block and wait for human response (same pattern as ask_user)
304            let (tx, mut rx) = tokio::sync::mpsc::channel::<ChannelMessage>(1);
305            let timeout = std::time::Duration::from_secs(timeout_secs);
306
307            let listen_channel = Arc::clone(&channel);
308            let listen_handle = tokio::spawn(async move { listen_channel.listen(tx).await });
309
310            let response = tokio::time::timeout(timeout, rx.recv()).await;
311            listen_handle.abort();
312
313            match response {
314                Ok(Some(msg)) => Ok(ToolResult {
315                    success: true,
316                    output: msg.content,
317                    error: None,
318                }),
319                Ok(None) => Ok(ToolResult {
320                    success: false,
321                    output: "TIMEOUT".to_string(),
322                    error: Some("Channel closed before receiving a response".to_string()),
323                }),
324                Err(_) => Ok(ToolResult {
325                    success: false,
326                    output: "TIMEOUT".to_string(),
327                    error: Some(format!(
328                        "No response received within {timeout_secs} seconds"
329                    )),
330                }),
331            }
332        } else {
333            // Non-blocking: return confirmation
334            Ok(ToolResult {
335                success: true,
336                output: json!({
337                    "status": "escalated",
338                    "urgency": urgency,
339                    "channel": channel_name,
340                })
341                .to_string(),
342                error: None,
343            })
344        }
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351
352    /// A stub channel that records sent messages but never produces incoming messages.
353    struct SilentChannel {
354        channel_name: String,
355        sent: Arc<RwLock<Vec<String>>>,
356    }
357
358    impl SilentChannel {
359        fn new(name: &str) -> Self {
360            Self {
361                channel_name: name.to_string(),
362                sent: Arc::new(RwLock::new(Vec::new())),
363            }
364        }
365    }
366
367    #[async_trait]
368    impl Channel for SilentChannel {
369        fn name(&self) -> &str {
370            &self.channel_name
371        }
372
373        async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
374            self.sent.write().push(message.content.clone());
375            Ok(())
376        }
377
378        async fn listen(
379            &self,
380            _tx: tokio::sync::mpsc::Sender<ChannelMessage>,
381        ) -> anyhow::Result<()> {
382            // Never sends anything — simulates no user response
383            tokio::time::sleep(std::time::Duration::from_secs(600)).await;
384            Ok(())
385        }
386    }
387
388    /// A stub channel that immediately responds with a canned message.
389    struct RespondingChannel {
390        channel_name: String,
391        response: String,
392        sent: Arc<RwLock<Vec<String>>>,
393    }
394
395    impl RespondingChannel {
396        fn new(name: &str, response: &str) -> Self {
397            Self {
398                channel_name: name.to_string(),
399                response: response.to_string(),
400                sent: Arc::new(RwLock::new(Vec::new())),
401            }
402        }
403    }
404
405    #[async_trait]
406    impl Channel for RespondingChannel {
407        fn name(&self) -> &str {
408            &self.channel_name
409        }
410
411        async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
412            self.sent.write().push(message.content.clone());
413            Ok(())
414        }
415
416        async fn listen(
417            &self,
418            tx: tokio::sync::mpsc::Sender<ChannelMessage>,
419        ) -> anyhow::Result<()> {
420            let msg = ChannelMessage {
421                id: "resp_1".to_string(),
422                sender: "human".to_string(),
423                reply_target: "human".to_string(),
424                content: self.response.clone(),
425                channel: self.channel_name.clone(),
426                timestamp: 1000,
427                thread_ts: None,
428                interruption_scope_id: None,
429                attachments: vec![],
430            };
431            let _ = tx.send(msg).await;
432            Ok(())
433        }
434    }
435
436    fn make_tool_with_channels(channels: Vec<(&str, Arc<dyn Channel>)>) -> EscalateToHumanTool {
437        let tool =
438            EscalateToHumanTool::new(Arc::new(SecurityPolicy::default()), PathBuf::from("/tmp"));
439        let map: HashMap<String, Arc<dyn Channel>> = channels
440            .into_iter()
441            .map(|(name, ch)| (name.to_string(), ch))
442            .collect();
443        *tool.channel_map.write() = map;
444        tool
445    }
446
447    // ── 1. test_tool_metadata ──
448
449    #[test]
450    fn test_tool_metadata() {
451        let tool =
452            EscalateToHumanTool::new(Arc::new(SecurityPolicy::default()), PathBuf::from("/tmp"));
453        assert_eq!(tool.name(), "escalate_to_human");
454        assert!(!tool.description().is_empty());
455        assert!(tool.description().to_lowercase().contains("escalat"));
456    }
457
458    // ── 2. test_parameters_schema ──
459
460    #[test]
461    fn test_parameters_schema() {
462        let tool =
463            EscalateToHumanTool::new(Arc::new(SecurityPolicy::default()), PathBuf::from("/tmp"));
464        let schema = tool.parameters_schema();
465        assert_eq!(schema["type"], "object");
466        assert!(schema["properties"]["summary"].is_object());
467        assert!(schema["properties"]["urgency"].is_object());
468        assert!(schema["properties"]["context"].is_object());
469        assert!(schema["properties"]["wait_for_response"].is_object());
470        assert!(schema["properties"]["timeout_secs"].is_object());
471        let required = schema["required"].as_array().unwrap();
472        assert!(required.iter().any(|v| v == "summary"));
473        // Optional fields should not be in required
474        assert!(!required.iter().any(|v| v == "urgency"));
475        assert!(!required.iter().any(|v| v == "context"));
476        assert!(!required.iter().any(|v| v == "wait_for_response"));
477        assert!(!required.iter().any(|v| v == "timeout_secs"));
478    }
479
480    // ── 3. test_default_urgency_is_medium ──
481
482    #[tokio::test]
483    async fn test_default_urgency_is_medium() {
484        let channel = Arc::new(SilentChannel::new("test"));
485        let sent = Arc::clone(&channel.sent);
486        let tool = make_tool_with_channels(vec![("test", channel as Arc<dyn Channel>)]);
487
488        let result = tool
489            .execute(json!({ "summary": "Need help" }))
490            .await
491            .unwrap();
492
493        assert!(result.success, "error: {:?}", result.error);
494        // Check the output JSON contains medium urgency
495        assert!(result.output.contains("\"medium\""));
496        // Check the sent message contains MEDIUM prefix
497        let messages = sent.read();
498        assert!(!messages.is_empty());
499        assert!(messages[0].contains("[MEDIUM]"));
500    }
501
502    // ── 4. test_message_format_low ──
503
504    #[test]
505    fn test_message_format_low() {
506        let msg = EscalateToHumanTool::format_message("low", "Disk space low", None);
507        assert!(msg.starts_with("\u{2139}\u{fe0f} [LOW]"));
508        assert!(msg.contains("Summary: Disk space low"));
509        assert!(msg.contains("Reply to this message to respond."));
510    }
511
512    // ── 5. test_message_format_critical ──
513
514    #[test]
515    fn test_message_format_critical() {
516        let msg = EscalateToHumanTool::format_message(
517            "critical",
518            "Production down",
519            Some("Database unreachable for 5 minutes"),
520        );
521        assert!(msg.starts_with("\u{1f6a8} [CRITICAL]"));
522        assert!(msg.contains("Summary: Production down"));
523        assert!(msg.contains("Context: Database unreachable for 5 minutes"));
524    }
525
526    // ── 6. test_invalid_urgency_rejected ──
527
528    #[tokio::test]
529    async fn test_invalid_urgency_rejected() {
530        let tool = make_tool_with_channels(vec![(
531            "test",
532            Arc::new(SilentChannel::new("test")) as Arc<dyn Channel>,
533        )]);
534
535        let result = tool
536            .execute(json!({ "summary": "Help", "urgency": "extreme" }))
537            .await
538            .unwrap();
539
540        assert!(!result.success);
541        assert!(result.error.as_deref().unwrap().contains("Invalid urgency"));
542        assert!(result.error.as_deref().unwrap().contains("extreme"));
543    }
544
545    // ── 7. test_non_blocking_returns_status ──
546
547    #[tokio::test]
548    async fn test_non_blocking_returns_status() {
549        let tool = make_tool_with_channels(vec![(
550            "slack",
551            Arc::new(SilentChannel::new("slack")) as Arc<dyn Channel>,
552        )]);
553
554        let result = tool
555            .execute(json!({
556                "summary": "Need approval",
557                "urgency": "low"
558            }))
559            .await
560            .unwrap();
561
562        assert!(result.success, "error: {:?}", result.error);
563        let parsed: serde_json::Value = serde_json::from_str(&result.output).unwrap();
564        assert_eq!(parsed["status"], "escalated");
565        assert_eq!(parsed["urgency"], "low");
566        assert_eq!(parsed["channel"], "slack");
567    }
568
569    // ── 8. test_blocking_mode_returns_response ──
570
571    #[tokio::test]
572    async fn test_blocking_mode_returns_response() {
573        let tool = make_tool_with_channels(vec![(
574            "test",
575            Arc::new(RespondingChannel::new("test", "Approved, go ahead")) as Arc<dyn Channel>,
576        )]);
577
578        let result = tool
579            .execute(json!({
580                "summary": "Need deployment approval",
581                "wait_for_response": true,
582                "timeout_secs": 5
583            }))
584            .await
585            .unwrap();
586
587        assert!(result.success, "error: {:?}", result.error);
588        assert_eq!(result.output, "Approved, go ahead");
589    }
590
591    // ── 9. test_blocking_mode_timeout ──
592
593    #[tokio::test]
594    async fn test_blocking_mode_timeout() {
595        let tool = make_tool_with_channels(vec![(
596            "test",
597            Arc::new(SilentChannel::new("test")) as Arc<dyn Channel>,
598        )]);
599
600        let result = tool
601            .execute(json!({
602                "summary": "Waiting for response",
603                "wait_for_response": true,
604                "timeout_secs": 1
605            }))
606            .await
607            .unwrap();
608
609        assert!(!result.success);
610        assert_eq!(result.output, "TIMEOUT");
611        assert!(result.error.as_deref().unwrap().contains("1 seconds"));
612    }
613
614    // ── 10. test_pushover_not_required ──
615
616    #[tokio::test]
617    async fn test_pushover_not_required() {
618        // High urgency without Pushover credentials should still succeed (channel-only)
619        let tool = make_tool_with_channels(vec![(
620            "test",
621            Arc::new(SilentChannel::new("test")) as Arc<dyn Channel>,
622        )]);
623
624        let result = tool
625            .execute(json!({
626                "summary": "Critical alert",
627                "urgency": "high"
628            }))
629            .await
630            .unwrap();
631
632        assert!(result.success, "error: {:?}", result.error);
633        let parsed: serde_json::Value = serde_json::from_str(&result.output).unwrap();
634        assert_eq!(parsed["status"], "escalated");
635        assert_eq!(parsed["urgency"], "high");
636    }
637}