use async_trait::async_trait;
use echo_core::tools::permission::{PermissionMode, ToolPermission};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
use echo_core::error::Result;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[derive(Default)]
pub enum RiskLevel {
#[default]
Low,
Medium,
High,
Critical,
}
impl RiskLevel {
pub fn from_permissions(permissions: &[ToolPermission]) -> Self {
if permissions.contains(&ToolPermission::Sensitive) {
return RiskLevel::Critical;
}
if permissions.contains(&ToolPermission::Execute) {
return RiskLevel::High;
}
if permissions.contains(&ToolPermission::Write)
|| permissions.contains(&ToolPermission::Network)
{
return RiskLevel::Medium;
}
RiskLevel::Low
}
pub fn requires_confirmation(&self) -> bool {
matches!(self, RiskLevel::High | RiskLevel::Critical)
}
pub fn color(&self) -> &'static str {
match self {
RiskLevel::Low => "green",
RiskLevel::Medium => "yellow",
RiskLevel::High => "orange",
RiskLevel::Critical => "red",
}
}
pub fn icon(&self) -> &'static str {
match self {
RiskLevel::Low => "✓",
RiskLevel::Medium => "⚠",
RiskLevel::High => "⚡",
RiskLevel::Critical => "🔴",
}
}
}
impl std::fmt::Display for RiskLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RiskLevel::Low => write!(f, "低风险"),
RiskLevel::Medium => write!(f, "中等风险"),
RiskLevel::High => write!(f, "高风险"),
RiskLevel::Critical => write!(f, "危险"),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum SuggestedAction {
AllowOnce,
AllowForSession,
AllowAlways,
DenyOnce,
DenyAlways,
ModifyInput { updated_input: Value },
Custom { response: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Suggestion {
pub label: String,
pub description: String,
pub action: SuggestedAction,
#[serde(default)]
pub recommended: bool,
#[serde(default)]
pub shortcut: Option<String>,
}
impl Suggestion {
pub fn allow_once() -> Self {
Self {
label: "允许".to_string(),
description: "允许本次执行".to_string(),
action: SuggestedAction::AllowOnce,
recommended: false,
shortcut: Some("y".to_string()),
}
}
pub fn allow_for_session() -> Self {
Self {
label: "始终允许(本次会话)".to_string(),
description: "在本次会话中允许所有相同操作".to_string(),
action: SuggestedAction::AllowForSession,
recommended: false,
shortcut: Some("s".to_string()),
}
}
pub fn allow_always() -> Self {
Self {
label: "始终允许".to_string(),
description: "添加持久化规则,始终允许此操作".to_string(),
action: SuggestedAction::AllowAlways,
recommended: false,
shortcut: Some("a".to_string()),
}
}
pub fn deny_once(reason: impl Into<String>) -> Self {
Self {
label: "拒绝".to_string(),
description: reason.into(),
action: SuggestedAction::DenyOnce,
recommended: false,
shortcut: Some("n".to_string()),
}
}
pub fn deny_always(reason: impl Into<String>) -> Self {
Self {
label: "始终拒绝".to_string(),
description: reason.into(),
action: SuggestedAction::DenyAlways,
recommended: false,
shortcut: Some("d".to_string()),
}
}
pub fn modify_input(label: impl Into<String>, updated_input: Value) -> Self {
Self {
label: label.into(),
description: "修改参数后执行".to_string(),
action: SuggestedAction::ModifyInput { updated_input },
recommended: false,
shortcut: None,
}
}
pub fn recommended(mut self) -> Self {
self.recommended = true;
self
}
pub fn with_shortcut(mut self, shortcut: impl Into<String>) -> Self {
self.shortcut = Some(shortcut.into());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PermissionRequest {
pub tool_name: String,
pub tool_input: Value,
pub required_permissions: Vec<ToolPermission>,
pub risk_level: RiskLevel,
pub prompt: String,
pub suggestions: Vec<Suggestion>,
#[serde(default)]
pub context: PermissionContext,
#[serde(default)]
pub request_id: Option<String>,
#[serde(default)]
pub session_id: Option<String>,
#[serde(default)]
pub agent_name: Option<String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PermissionContext {
#[serde(default)]
pub working_directory: Option<String>,
#[serde(default)]
pub affected_files: Vec<String>,
#[serde(default)]
pub estimated_impact: Option<String>,
#[serde(default)]
pub metadata: serde_json::Map<String, Value>,
}
impl PermissionRequest {
pub fn new(tool_name: impl Into<String>, tool_input: Value) -> Self {
let tool_name = tool_name.into();
let prompt = format!("工具 '{}' 需要执行,是否允许?", tool_name);
Self {
tool_name,
tool_input,
required_permissions: Vec::new(),
risk_level: RiskLevel::Low,
prompt,
suggestions: Vec::new(),
context: PermissionContext::default(),
request_id: None,
session_id: None,
agent_name: None,
}
}
pub fn with_permissions(mut self, permissions: Vec<ToolPermission>) -> Self {
self.risk_level = RiskLevel::from_permissions(&permissions);
self.required_permissions = permissions;
self
}
pub fn with_risk_level(mut self, level: RiskLevel) -> Self {
self.risk_level = level;
self
}
pub fn with_prompt(mut self, prompt: impl Into<String>) -> Self {
self.prompt = prompt.into();
self
}
pub fn with_suggestion(mut self, suggestion: Suggestion) -> Self {
self.suggestions.push(suggestion);
self
}
pub fn with_suggestions(mut self, suggestions: Vec<Suggestion>) -> Self {
self.suggestions = suggestions;
self
}
pub fn with_request_id(mut self, id: impl Into<String>) -> Self {
self.request_id = Some(id.into());
self
}
pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
self.session_id = Some(id.into());
self
}
pub fn with_agent_name(mut self, name: impl Into<String>) -> Self {
self.agent_name = Some(name.into());
self
}
pub fn with_context(mut self, context: PermissionContext) -> Self {
self.context = context;
self
}
pub fn with_default_suggestions(mut self) -> Self {
self.suggestions = vec![Suggestion::allow_once(), Suggestion::deny_once("取消执行")];
self
}
pub fn with_risk_based_suggestions(mut self) -> Self {
self.suggestions = match self.risk_level {
RiskLevel::Low => {
vec![
Suggestion::allow_once().recommended(),
Suggestion::allow_for_session(),
]
}
RiskLevel::Medium => {
vec![
Suggestion::allow_once(),
Suggestion::allow_for_session(),
Suggestion::deny_once("取消执行"),
]
}
RiskLevel::High => {
vec![
Suggestion::allow_once(),
Suggestion::deny_once("拒绝执行").recommended(),
Suggestion::deny_always("始终拒绝此操作"),
]
}
RiskLevel::Critical => {
vec![
Suggestion::deny_once("拒绝执行").recommended(),
Suggestion::allow_once(),
Suggestion::deny_always("始终拒绝此危险操作"),
]
}
};
self
}
pub fn requires_confirmation(&self) -> bool {
self.risk_level.requires_confirmation() || !self.suggestions.is_empty()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PermissionResponse {
pub decision: PermissionResponseDecision,
#[serde(default)]
pub rule_updates: Vec<PermissionUpdate>,
#[serde(default)]
pub feedback: Option<String>,
#[serde(default)]
pub updated_input: Option<Value>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum PermissionResponseDecision {
Allowed,
Denied { reason: Option<String> },
NeedMoreInfo { question: String },
}
impl PermissionResponse {
pub fn allowed() -> Self {
Self {
decision: PermissionResponseDecision::Allowed,
rule_updates: Vec::new(),
feedback: None,
updated_input: None,
}
}
pub fn denied(reason: Option<String>) -> Self {
Self {
decision: PermissionResponseDecision::Denied { reason },
rule_updates: Vec::new(),
feedback: None,
updated_input: None,
}
}
pub fn from_suggestion(suggestion: &Suggestion, tool_name: &str, args: &Value) -> Self {
let matcher = Self::build_matcher(tool_name, args);
match &suggestion.action {
SuggestedAction::AllowOnce => Self::allowed(),
SuggestedAction::AllowForSession => Self {
decision: PermissionResponseDecision::Allowed,
rule_updates: vec![PermissionUpdate::add_session_rule(matcher)],
feedback: None,
updated_input: None,
},
SuggestedAction::AllowAlways => Self {
decision: PermissionResponseDecision::Allowed,
rule_updates: vec![PermissionUpdate::add_permanent_rule(matcher)],
feedback: None,
updated_input: None,
},
SuggestedAction::DenyOnce => Self::denied(Some(suggestion.description.clone())),
SuggestedAction::DenyAlways => Self {
decision: PermissionResponseDecision::Denied {
reason: Some(suggestion.description.clone()),
},
rule_updates: vec![PermissionUpdate::add_deny_rule(matcher)],
feedback: None,
updated_input: None,
},
SuggestedAction::ModifyInput { updated_input } => Self {
decision: PermissionResponseDecision::Allowed,
rule_updates: Vec::new(),
feedback: None,
updated_input: Some(updated_input.clone()),
},
SuggestedAction::Custom { response } => Self {
decision: PermissionResponseDecision::Allowed,
rule_updates: Vec::new(),
feedback: Some(response.clone()),
updated_input: None,
},
}
}
fn build_matcher(tool_name: &str, args: &Value) -> String {
if matches!(args, Value::Null) || matches!(args, Value::Object(map) if map.is_empty()) {
return tool_name.to_string();
}
format!("{tool_name}(*)")
}
pub fn with_feedback(mut self, feedback: impl Into<String>) -> Self {
self.feedback = Some(feedback.into());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum PermissionUpdate {
AddRule {
matcher: String,
behavior: String,
source: String,
},
RemoveRule { matcher: String },
SetMode { mode: PermissionMode },
}
impl PermissionUpdate {
pub fn add_session_rule(matcher: String) -> Self {
Self::AddRule {
matcher,
behavior: "allow".to_string(),
source: "session".to_string(),
}
}
pub fn add_permanent_rule(matcher: String) -> Self {
Self::AddRule {
matcher,
behavior: "allow".to_string(),
source: "userSettings".to_string(),
}
}
pub fn add_deny_rule(matcher: String) -> Self {
Self::AddRule {
matcher,
behavior: "deny".to_string(),
source: "userSettings".to_string(),
}
}
}
#[async_trait]
pub trait PermissionRequestHandler: Send + Sync {
async fn handle(&self, request: PermissionRequest) -> Result<PermissionResponse>;
async fn handle_batch(
&self,
requests: Vec<PermissionRequest>,
) -> Result<Vec<PermissionResponse>> {
let mut responses = Vec::with_capacity(requests.len());
for request in requests {
responses.push(self.handle(request).await?);
}
Ok(responses)
}
fn is_null_handler(&self) -> bool {
false
}
}
pub struct DefaultPermissionRequestHandler<P: super::HumanLoopProvider> {
provider: Arc<P>,
}
impl<P: super::HumanLoopProvider> DefaultPermissionRequestHandler<P> {
pub fn new(provider: Arc<P>) -> Self {
Self { provider }
}
}
#[async_trait]
impl<P: super::HumanLoopProvider + 'static> PermissionRequestHandler
for DefaultPermissionRequestHandler<P>
{
async fn handle(&self, request: PermissionRequest) -> Result<PermissionResponse> {
use super::{HumanLoopRequest, HumanLoopResponse};
let req = HumanLoopRequest::approval(&request.tool_name, request.tool_input.clone());
match self.provider.request(req).await? {
HumanLoopResponse::Approved => Ok(PermissionResponse::allowed()),
HumanLoopResponse::ApprovedWithScope { scope: _ } => Ok(PermissionResponse::allowed()),
HumanLoopResponse::ModifiedArgs { args, scope: _ } => {
Ok(PermissionResponse {
decision: PermissionResponseDecision::Allowed,
rule_updates: Vec::new(),
feedback: None,
updated_input: Some(args),
})
}
HumanLoopResponse::Rejected { reason } => Ok(PermissionResponse::denied(reason)),
HumanLoopResponse::Text(text) => Ok(PermissionResponse::allowed().with_feedback(text)),
HumanLoopResponse::Timeout => {
Ok(PermissionResponse::denied(Some("请求超时".to_string())))
}
HumanLoopResponse::Deferred => {
Ok(PermissionResponse::denied(Some("审批被推迟".to_string())))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_risk_level_from_permissions() {
assert_eq!(
RiskLevel::from_permissions(&[ToolPermission::Read]),
RiskLevel::Low
);
assert_eq!(
RiskLevel::from_permissions(&[ToolPermission::Write]),
RiskLevel::Medium
);
assert_eq!(
RiskLevel::from_permissions(&[ToolPermission::Execute]),
RiskLevel::High
);
assert_eq!(
RiskLevel::from_permissions(&[ToolPermission::Sensitive]),
RiskLevel::Critical
);
}
#[test]
fn test_risk_level_requires_confirmation() {
assert!(!RiskLevel::Low.requires_confirmation());
assert!(!RiskLevel::Medium.requires_confirmation());
assert!(RiskLevel::High.requires_confirmation());
assert!(RiskLevel::Critical.requires_confirmation());
}
#[test]
fn test_suggestion_create() {
let s = Suggestion::allow_once();
assert_eq!(s.label, "允许");
assert_eq!(s.action, SuggestedAction::AllowOnce);
assert_eq!(s.shortcut, Some("y".to_string()));
}
#[test]
fn test_permission_request_new() {
let req = PermissionRequest::new("Bash", serde_json::json!({"command": "ls"}));
assert_eq!(req.tool_name, "Bash");
assert_eq!(req.risk_level, RiskLevel::Low);
}
#[test]
fn test_permission_request_with_permissions() {
let req = PermissionRequest::new("Bash", serde_json::json!({"command": "rm"}))
.with_permissions(vec![ToolPermission::Execute]);
assert_eq!(req.risk_level, RiskLevel::High);
assert_eq!(req.required_permissions.len(), 1);
}
#[test]
fn test_permission_request_risk_based_suggestions() {
let req = PermissionRequest::new("Bash", serde_json::json!({}))
.with_risk_level(RiskLevel::Critical)
.with_risk_based_suggestions();
assert!(!req.suggestions.is_empty());
assert!(matches!(
&req.suggestions[0].action,
SuggestedAction::DenyOnce
));
}
#[test]
fn test_permission_response_allowed() {
let resp = PermissionResponse::allowed();
assert!(matches!(resp.decision, PermissionResponseDecision::Allowed));
}
#[test]
fn test_permission_response_denied() {
let resp = PermissionResponse::denied(Some("test reason".to_string()));
assert!(matches!(
resp.decision,
PermissionResponseDecision::Denied { .. }
));
}
#[test]
fn test_permission_response_from_suggestion() {
let s = Suggestion::allow_once();
let resp = PermissionResponse::from_suggestion(&s, "Bash", &serde_json::json!({}));
assert!(matches!(resp.decision, PermissionResponseDecision::Allowed));
let s = Suggestion::deny_once("test");
let resp = PermissionResponse::from_suggestion(&s, "Bash", &serde_json::json!({}));
assert!(matches!(
resp.decision,
PermissionResponseDecision::Denied { .. }
));
}
#[test]
fn test_permission_response_from_suggestion_matcher() {
let s = Suggestion::allow_for_session();
let resp = PermissionResponse::from_suggestion(
&s,
"DangerousTool",
&serde_json::json!({"cmd": "x"}),
);
match &resp.rule_updates[0] {
PermissionUpdate::AddRule { matcher, .. } => {
assert_eq!(matcher, "DangerousTool(*)");
}
_ => panic!("Expected AddRule"),
}
}
#[test]
fn test_permission_update_create() {
let update = PermissionUpdate::add_session_rule("Bash".to_string());
assert!(matches!(update, PermissionUpdate::AddRule { .. }));
let update = PermissionUpdate::add_deny_rule("Bash(rm:*)".to_string());
assert!(matches!(update, PermissionUpdate::AddRule { behavior, .. } if behavior == "deny"));
}
#[test]
fn test_permission_request_serialization() {
let req = PermissionRequest::new("Bash", serde_json::json!({"cmd": "ls"}))
.with_risk_level(RiskLevel::High)
.with_default_suggestions();
let json = serde_json::to_string(&req).unwrap();
let parsed: PermissionRequest = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.tool_name, "Bash");
assert_eq!(parsed.risk_level, RiskLevel::High);
}
}