pub mod adapter;
mod approval_cache;
mod audit;
mod batch;
mod classifier;
mod console;
mod pattern;
pub mod permission;
pub mod policy;
mod protected;
pub mod service;
mod webhook;
#[cfg(feature = "websocket")]
mod websocket;
pub use approval_cache::SessionApprovalCache;
pub use audit::{
CompositePermissionAuditSink, InMemoryPermissionAuditSink, LoggingPermissionAuditSink,
PermissionAuditEntry, PermissionAuditSink,
};
pub use batch::BatchApprovalProvider;
pub use classifier::{
Classifier, ClassifierContext, ClassifierResult, CompositeClassifier, DenialTracker,
LlmClassifier, RiskContext, RuleClassifier,
};
pub use console::ConsoleHumanLoopProvider;
pub use permission::{
DefaultPermissionRequestHandler, PermissionContext, PermissionRequest,
PermissionRequestHandler, PermissionResponse, PermissionResponseDecision, PermissionUpdate,
RiskLevel, SuggestedAction, Suggestion,
};
pub use policy::{ApprovalPolicy, ApprovalRule, ApprovalScope, PolicyDecision};
pub use protected::{ProtectedPathChecker, ProtectedPathResult};
pub use service::PermissionService;
pub use webhook::WebhookHumanLoopProvider;
#[cfg(feature = "websocket")]
pub use websocket::WebSocketHumanLoopProvider;
use std::sync::Arc;
use std::time::Duration;
use futures::future::BoxFuture;
use serde_json::Value;
use tokio::sync::{mpsc, oneshot};
use echo_core::error::{ReactError, Result};
#[derive(Debug, Clone, PartialEq)]
pub enum ApprovalDecision {
Approved,
ApprovedWithScope { scope: ApprovalScope },
Modified {
args: Value,
scope: ApprovalScope,
},
Rejected { reason: Option<String> },
Deferred,
}
pub struct ApprovalResponder {
sender: Option<oneshot::Sender<ApprovalDecision>>,
}
impl ApprovalResponder {
fn new(sender: oneshot::Sender<ApprovalDecision>) -> Self {
Self {
sender: Some(sender),
}
}
pub fn respond(mut self, decision: ApprovalDecision) {
if let Some(sender) = self.sender.take() {
let _ = sender.send(decision);
}
}
pub fn approve(self) {
self.respond(ApprovalDecision::Approved);
}
pub fn approve_with_scope(self, scope: ApprovalScope) {
self.respond(ApprovalDecision::ApprovedWithScope { scope });
}
pub fn approve_modified(self, args: Value, scope: ApprovalScope) {
self.respond(ApprovalDecision::Modified { args, scope });
}
pub fn reject(self, reason: Option<String>) {
self.respond(ApprovalDecision::Rejected { reason });
}
pub fn defer(self) {
self.respond(ApprovalDecision::Deferred);
}
}
impl Drop for ApprovalResponder {
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
let _ = sender.send(ApprovalDecision::Rejected {
reason: Some("No response provided".to_string()),
});
}
}
}
impl std::fmt::Debug for ApprovalResponder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ApprovalResponder")
.field("has_sender", &self.sender.is_some())
.finish()
}
}
pub struct InputResponder {
sender: Option<oneshot::Sender<String>>,
}
impl InputResponder {
fn new(sender: oneshot::Sender<String>) -> Self {
Self {
sender: Some(sender),
}
}
pub fn respond(mut self, text: String) {
if let Some(sender) = self.sender.take() {
let _ = sender.send(text);
}
}
}
impl Drop for InputResponder {
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
let _ = sender.send(String::new());
}
}
}
impl std::fmt::Debug for InputResponder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InputResponder")
.field("has_sender", &self.sender.is_some())
.finish()
}
}
#[derive(Debug)]
pub enum HumanLoopEvent {
ApprovalRequest {
tool_name: String,
args: Value,
prompt: String,
risk_level: RiskLevel,
responder: ApprovalResponder,
},
InputRequest {
prompt: String,
responder: InputResponder,
},
}
pub struct HumanLoopManager {
event_tx: mpsc::Sender<HumanLoopEvent>,
event_rx: tokio::sync::Mutex<Option<mpsc::Receiver<HumanLoopEvent>>>,
}
impl HumanLoopManager {
pub fn new() -> Self {
let (event_tx, event_rx) = mpsc::channel(16);
Self {
event_tx,
event_rx: tokio::sync::Mutex::new(Some(event_rx)),
}
}
pub fn with_buffer(buffer_size: usize) -> Self {
let (event_tx, event_rx) = mpsc::channel(buffer_size);
Self {
event_tx,
event_rx: tokio::sync::Mutex::new(Some(event_rx)),
}
}
pub async fn recv_event(&self) -> Option<HumanLoopEvent> {
let mut guard = self.event_rx.lock().await;
let receiver = guard.as_mut()?;
receiver.recv().await
}
pub fn try_recv_event(&self) -> Option<HumanLoopEvent> {
let mut guard = self.event_rx.blocking_lock();
let receiver = guard.as_mut()?;
receiver.try_recv().ok()
}
pub async fn serve(&self, handler: &dyn HumanLoopHandler) {
let receiver = {
let mut guard = self.event_rx.lock().await;
guard.take()
};
let Some(mut receiver) = receiver else {
return;
};
while let Some(event) = receiver.recv().await {
dispatch_event(event, handler).await;
}
}
pub async fn serve_with_receiver(
mut receiver: mpsc::Receiver<HumanLoopEvent>,
handler: &dyn HumanLoopHandler,
) {
while let Some(event) = receiver.recv().await {
dispatch_event(event, handler).await;
}
}
}
impl Default for HumanLoopManager {
fn default() -> Self {
Self::new()
}
}
impl HumanLoopProvider for HumanLoopManager {
fn request(&self, req: HumanLoopRequest) -> BoxFuture<'_, Result<HumanLoopResponse>> {
Box::pin(async move {
match req.kind {
HumanLoopKind::Approval => {
let (tx, rx) = oneshot::channel();
let responder = ApprovalResponder::new(tx);
let risk_level = req.risk_level.unwrap_or(RiskLevel::Medium);
let event = HumanLoopEvent::ApprovalRequest {
tool_name: req.tool_name.clone().unwrap_or_default(),
args: req.args.clone().unwrap_or(Value::Null),
prompt: req.prompt.clone(),
risk_level,
responder,
};
self.event_tx
.send(event)
.await
.map_err(|_| ReactError::Other("HumanLoop channel closed".to_string()))?;
let decision = if let Some(timeout) = req.timeout {
match tokio::time::timeout(timeout, rx).await {
Ok(result) => result.map_err(|_| {
ReactError::Other("Approval responder dropped".to_string())
})?,
Err(_) => {
return Ok(HumanLoopResponse::Timeout);
}
}
} else {
rx.await.map_err(|_| {
ReactError::Other("Approval responder dropped".to_string())
})?
};
match decision {
ApprovalDecision::Approved => Ok(HumanLoopResponse::Approved),
ApprovalDecision::ApprovedWithScope { scope } => {
Ok(HumanLoopResponse::ApprovedWithScope { scope })
}
ApprovalDecision::Modified { args, scope } => {
Ok(HumanLoopResponse::ModifiedArgs { args, scope })
}
ApprovalDecision::Rejected { reason } => {
Ok(HumanLoopResponse::Rejected { reason })
}
ApprovalDecision::Deferred => Ok(HumanLoopResponse::Deferred),
}
}
HumanLoopKind::Input => {
let (tx, rx) = oneshot::channel();
let responder = InputResponder::new(tx);
let event = HumanLoopEvent::InputRequest {
prompt: req.prompt.clone(),
responder,
};
self.event_tx
.send(event)
.await
.map_err(|_| ReactError::Other("HumanLoop channel closed".to_string()))?;
let text = rx
.await
.map_err(|_| ReactError::Other("Input responder dropped".to_string()))?;
Ok(HumanLoopResponse::Text(text))
}
}
})
}
}
pub trait HumanLoopHandler: Send + Sync {
fn on_approval<'a>(
&'a self,
tool_name: &'a str,
args: &'a Value,
prompt: &'a str,
) -> BoxFuture<'a, ApprovalDecision>;
fn on_input<'a>(&'a self, prompt: &'a str) -> BoxFuture<'a, String>;
}
pub async fn dispatch_event(event: HumanLoopEvent, handler: &dyn HumanLoopHandler) {
match event {
HumanLoopEvent::ApprovalRequest {
tool_name,
args,
prompt,
risk_level: _,
responder,
} => {
let decision = handler.on_approval(&tool_name, &args, &prompt).await;
responder.respond(decision);
}
HumanLoopEvent::InputRequest { prompt, responder } => {
let text = handler.on_input(&prompt).await;
responder.respond(text);
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum HumanLoopKind {
Approval,
Input,
}
#[derive(Debug, Clone)]
pub struct HumanLoopRequest {
pub kind: HumanLoopKind,
pub prompt: String,
pub tool_name: Option<String>,
pub args: Option<Value>,
pub risk_level: Option<RiskLevel>,
pub timeout: Option<Duration>,
}
impl HumanLoopRequest {
pub fn approval(tool_name: impl Into<String>, args: Value) -> Self {
let tool_name = tool_name.into();
Self {
kind: HumanLoopKind::Approval,
prompt: format!("工具 [{}] 需要人工审批", tool_name),
tool_name: Some(tool_name),
args: Some(args),
risk_level: None,
timeout: None,
}
}
pub fn approval_with_risk(
tool_name: impl Into<String>,
args: Value,
risk_level: RiskLevel,
) -> Self {
let tool_name = tool_name.into();
Self {
kind: HumanLoopKind::Approval,
prompt: format!("工具 [{}] 需要人工审批({}风险)", tool_name, risk_level),
tool_name: Some(tool_name),
args: Some(args),
risk_level: Some(risk_level),
timeout: None,
}
}
pub fn approval_with_timeout(
tool_name: impl Into<String>,
args: Value,
timeout: Duration,
) -> Self {
let tool_name = tool_name.into();
Self {
kind: HumanLoopKind::Approval,
prompt: format!("工具 [{}] 需要人工审批", tool_name),
tool_name: Some(tool_name),
args: Some(args),
risk_level: None,
timeout: Some(timeout),
}
}
pub fn input(prompt: impl Into<String>) -> Self {
Self {
kind: HumanLoopKind::Input,
prompt: prompt.into(),
tool_name: None,
args: None,
risk_level: None,
timeout: None,
}
}
}
#[derive(Debug, Clone)]
pub enum HumanLoopResponse {
Approved,
ApprovedWithScope { scope: ApprovalScope },
ModifiedArgs { args: Value, scope: ApprovalScope },
Rejected { reason: Option<String> },
Text(String),
Timeout,
Deferred,
}
pub trait HumanLoopProvider: Send + Sync {
fn request(&self, req: HumanLoopRequest) -> BoxFuture<'_, Result<HumanLoopResponse>>;
}
pub fn default_provider() -> Arc<dyn HumanLoopProvider> {
Arc::new(ConsoleHumanLoopProvider)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_approval_decision_variants() {
let approved = ApprovalDecision::Approved;
let rejected = ApprovalDecision::Rejected {
reason: Some("test".to_string()),
};
match approved {
ApprovalDecision::Approved => {}
_ => panic!("Should be Approved"),
}
match rejected {
ApprovalDecision::Rejected { reason } => assert_eq!(reason, Some("test".to_string())),
_ => panic!("Should be Rejected"),
}
}
#[test]
fn test_approval_responder_respond() {
let (tx, mut rx) = oneshot::channel();
let responder = ApprovalResponder::new(tx);
responder.respond(ApprovalDecision::Approved);
let result = rx.try_recv();
assert!(result.is_ok());
assert_eq!(result.unwrap(), ApprovalDecision::Approved);
}
#[test]
fn test_approval_responder_approve() {
let (tx, mut rx) = oneshot::channel();
let responder = ApprovalResponder::new(tx);
responder.approve();
let result = rx.try_recv();
assert!(result.is_ok());
assert_eq!(result.unwrap(), ApprovalDecision::Approved);
}
#[test]
fn test_approval_responder_approve_with_scope() {
let (tx, mut rx) = oneshot::channel();
let responder = ApprovalResponder::new(tx);
responder.approve_with_scope(ApprovalScope::Session);
let result = rx.try_recv();
assert!(result.is_ok());
match result.unwrap() {
ApprovalDecision::ApprovedWithScope { scope } => {
assert_eq!(scope, ApprovalScope::Session);
}
_ => panic!("Should be ApprovedWithScope"),
}
}
#[test]
fn test_approval_responder_reject() {
let (tx, mut rx) = oneshot::channel();
let responder = ApprovalResponder::new(tx);
responder.reject(Some("test reason".to_string()));
let result = rx.try_recv();
assert!(result.is_ok());
match result.unwrap() {
ApprovalDecision::Rejected { reason } => {
assert_eq!(reason, Some("test reason".to_string()))
}
_ => panic!("Should be Rejected"),
}
}
#[test]
fn test_approval_responder_drop_without_response() {
let (tx, mut rx) = oneshot::channel();
{
let _responder = ApprovalResponder::new(tx);
}
let result = rx.try_recv();
assert!(result.is_ok());
match result.unwrap() {
ApprovalDecision::Rejected { reason } => assert!(reason.is_some()),
_ => panic!("Should be Rejected"),
}
}
#[test]
fn test_input_responder_respond() {
let (tx, mut rx) = oneshot::channel();
let responder = InputResponder::new(tx);
responder.respond("user input".to_string());
let result = rx.try_recv();
assert!(result.is_ok());
assert_eq!(result.unwrap(), "user input");
}
#[test]
fn test_human_loop_request_approval() {
let request = HumanLoopRequest::approval("test_tool", serde_json::json!({"arg": "value"}));
assert_eq!(request.kind, HumanLoopKind::Approval);
assert_eq!(request.tool_name, Some("test_tool".to_string()));
assert!(request.args.is_some());
}
#[test]
fn test_human_loop_request_with_risk() {
let request = HumanLoopRequest::approval_with_risk(
"dangerous_tool",
serde_json::json!({"cmd": "rm -rf"}),
RiskLevel::Critical,
);
assert_eq!(request.kind, HumanLoopKind::Approval);
assert_eq!(request.risk_level, Some(RiskLevel::Critical));
}
#[test]
fn test_human_loop_request_input() {
let request = HumanLoopRequest::input("Please enter your name");
assert_eq!(request.kind, HumanLoopKind::Input);
assert_eq!(request.prompt, "Please enter your name");
assert!(request.tool_name.is_none());
assert!(request.args.is_none());
}
#[test]
fn test_human_loop_response_variants() {
let approved = HumanLoopResponse::Approved;
let rejected = HumanLoopResponse::Rejected {
reason: Some("test".to_string()),
};
let text = HumanLoopResponse::Text("hello".to_string());
let timeout = HumanLoopResponse::Timeout;
let deferred = HumanLoopResponse::Deferred;
assert!(matches!(approved, HumanLoopResponse::Approved));
assert!(matches!(rejected, HumanLoopResponse::Rejected { .. }));
assert!(matches!(text, HumanLoopResponse::Text(_)));
assert!(matches!(timeout, HumanLoopResponse::Timeout));
assert!(matches!(deferred, HumanLoopResponse::Deferred));
}
#[tokio::test]
async fn test_human_loop_manager_new() {
let _manager = HumanLoopManager::new();
}
#[test]
fn test_human_loop_kind_variants() {
assert!(matches!(HumanLoopKind::Approval, HumanLoopKind::Approval));
assert!(matches!(HumanLoopKind::Input, HumanLoopKind::Input));
}
}