use crate::escalation::{
Approver, Decision, EscalationNotifier, EscalationQueue, HeldAction, ResolveError, Surface,
};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use symbi_channel_adapter::traits::InboundCommandInterceptor;
use symbi_channel_adapter::types::{ChatPlatform, InboundMessage, OutboundMessage};
use symbi_channel_adapter::ChannelAdapterManager;
pub type ChannelApprovers = HashMap<(ChatPlatform, String), HashSet<String>>;
pub struct EscalationCommandInterceptor {
queue: Arc<EscalationQueue>,
channel_approvers: ChannelApprovers,
}
impl EscalationCommandInterceptor {
pub fn new(queue: Arc<EscalationQueue>, channel_approvers: ChannelApprovers) -> Self {
Self {
queue,
channel_approvers,
}
}
fn is_authorized(&self, msg: &InboundMessage) -> bool {
self.channel_approvers
.get(&(msg.platform, msg.channel_id.clone()))
.map(|approvers| approvers.contains(&msg.sender_id))
.unwrap_or(false)
}
fn parse(msg: &InboundMessage) -> Option<(String, String)> {
if let Some(cmd) = &msg.command {
if cmd.subcommand.as_deref() == Some("gate") && cmd.args.len() >= 2 {
return Some((cmd.args[0].to_lowercase(), cmd.args[1].clone()));
}
}
let parts: Vec<&str> = msg.content.split_whitespace().collect();
if parts.len() >= 4 && parts[1] == "gate" {
return Some((parts[2].to_lowercase(), parts[3].to_string()));
}
None
}
}
#[async_trait::async_trait]
impl InboundCommandInterceptor for EscalationCommandInterceptor {
async fn try_handle(&self, msg: &InboundMessage) -> Option<String> {
let (sub, id) = Self::parse(msg)?;
if sub != "approve" && sub != "deny" {
return Some("Usage: /symbi gate approve|deny <id> [reason]".to_string());
}
if !self.is_authorized(msg) {
return Some(format!(
"\u{26d4} {} is not authorized to resolve held actions in this channel.",
msg.sender_name
));
}
let approver = Approver {
surface: Surface::Chat,
id: msg.sender_id.clone(),
display: msg.sender_name.clone(),
};
let decision = if sub == "approve" {
Decision::Approve { reason: None }
} else {
Decision::Deny { reason: None }
};
match self.queue.resolve_async(&id, decision, approver).await {
Ok(()) => Some(format!(
"\u{2705} {} {}d held action {}.",
msg.sender_name, sub, id
)),
Err(ResolveError::NotFound) => Some(format!("Unknown held action {id}.")),
Err(ResolveError::AlreadyResolved) => {
Some(format!("Held action {id} was already resolved."))
}
}
}
}
pub struct ChatEscalationNotifier {
manager: Arc<ChannelAdapterManager>,
platform: ChatPlatform,
channel_id: String,
}
impl ChatEscalationNotifier {
pub fn new(
manager: Arc<ChannelAdapterManager>,
platform: ChatPlatform,
channel_id: String,
) -> Self {
Self {
manager,
platform,
channel_id,
}
}
}
#[async_trait::async_trait]
impl EscalationNotifier for ChatEscalationNotifier {
async fn notify(&self, action: &HeldAction) {
let content = format!(
"\u{26a0} Held for approval \u{2014} agent `{}` wants to {} ({}).\n\
Reply: `/symbi gate approve {}` or `/symbi gate deny {} [reason]`",
action.agent_id, action.summary, action.reason, action.id, action.id
);
let msg = OutboundMessage {
channel_id: self.channel_id.clone(),
thread_id: None,
content,
blocks: None,
ephemeral: false,
user_id: None,
metadata: None,
};
let _ = self.manager.send_to(self.platform, msg).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::escalation::{EscalationQueue, EscalationRequest, HeldActionKind};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use symbi_channel_adapter::traits::InboundCommandInterceptor;
use symbi_channel_adapter::types::{ChatPlatform, InboundMessage, SlashCommand};
fn approvers() -> ChannelApprovers {
let mut m: ChannelApprovers = HashMap::new();
m.insert(
(ChatPlatform::Slack, "C0APPROVERS".to_string()),
HashSet::from(["U0ALICE".to_string()]),
);
m
}
fn inbound_in(channel: &str, sender: &str, sub: &str, id: &str) -> InboundMessage {
InboundMessage {
id: "m".into(),
platform: ChatPlatform::Slack,
workspace_id: "w".into(),
channel_id: channel.into(),
thread_id: None,
sender_id: sender.into(),
sender_name: sender.into(),
content: format!("/symbi gate {sub} {id}"),
command: Some(SlashCommand {
name: "symbi".into(),
subcommand: Some("gate".into()),
args: vec![sub.into(), id.into()],
agent_name: None,
}),
timestamp: chrono::Utc::now(),
raw_payload: None,
}
}
fn inbound(sender: &str, sub: &str, id: &str) -> InboundMessage {
inbound_in("C0APPROVERS", sender, sub, id)
}
#[tokio::test]
async fn allowlisted_sender_can_approve() {
let q = Arc::new(EscalationQueue::new());
let icpt = EscalationCommandInterceptor::new(q.clone(), approvers());
let q2 = q.clone();
let h = tokio::spawn(async move {
q2.enqueue(
EscalationRequest {
agent_id: "a".into(),
kind: HeldActionKind::ToolCall,
summary: "s".into(),
reason: "r".into(),
context_snapshot: None,
},
Duration::from_secs(5),
)
.await
});
let id = loop {
if let Some(x) = q.list_pending_async().await.first() {
break x.id.clone();
}
tokio::time::sleep(Duration::from_millis(5)).await;
};
let reply = icpt.try_handle(&inbound("U0ALICE", "approve", &id)).await;
assert!(reply.unwrap().to_lowercase().contains("approved"));
assert!(matches!(
h.await.unwrap(),
crate::escalation::Decision::Approve { .. }
));
}
#[tokio::test]
async fn non_allowlisted_sender_is_rejected() {
let q = Arc::new(EscalationQueue::new());
let icpt = EscalationCommandInterceptor::new(q.clone(), approvers());
let reply = icpt
.try_handle(&inbound("U0MALLORY", "approve", "0000"))
.await;
assert!(reply.unwrap().to_lowercase().contains("not authorized"));
}
#[tokio::test]
async fn approver_rejected_from_unconfigured_channel() {
let q = Arc::new(EscalationQueue::new());
let icpt = EscalationCommandInterceptor::new(q.clone(), approvers());
let reply = icpt
.try_handle(&inbound_in("C0RANDOM", "U0ALICE", "approve", "0000"))
.await;
assert!(reply.unwrap().to_lowercase().contains("not authorized"));
}
#[tokio::test]
async fn non_gate_message_passes_through() {
let q = Arc::new(EscalationQueue::new());
let icpt = EscalationCommandInterceptor::new(q.clone(), approvers());
let mut m = inbound("U0ALICE", "approve", "0000");
m.command = None;
m.content = "hello".into();
assert!(icpt.try_handle(&m).await.is_none());
}
}