use crate::hooks::{HookEvent, HookEventType, HookExecutor, HookResult};
use a3s_ahp::protocol::{
ConfirmationDecision, ContextPerceptionDecision, IntentDetectionDecision, MemoryRecallDecision,
PlanningDecision, RateLimitDecision, ReasoningDecision,
};
use a3s_ahp::{
AhpClient, AhpEvent, Decision, EventType, HeartbeatEvent, IdleEvent, SessionStats, Transport,
};
use async_trait::async_trait;
use chrono::Utc;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tracing::{debug, warn};
#[derive(Clone)]
pub struct AhpHookExecutor {
client: Arc<AhpClient>,
agent_id: String,
depth: u32,
last_activity: Arc<AtomicU64>,
idle_threshold_ms: u64,
start_time: Instant,
total_events: Arc<AtomicU64>,
total_tokens: Arc<AtomicI32>,
error_count: Arc<AtomicU64>,
capabilities: HashMap<String, serde_json::Value>,
shutdown: Arc<AtomicBool>,
memory_summary: Arc<RwLock<Option<a3s_ahp::MemorySummary>>>,
current_task: Arc<RwLock<Option<String>>>,
recent_facts: Arc<RwLock<Vec<a3s_ahp::Fact>>>,
workspace: Arc<RwLock<Option<String>>>,
batch_buffer: Arc<RwLock<Vec<a3s_ahp::AhpEvent>>>,
batch_size: usize,
batch_timeout_ms: u64,
last_batch_flush: Arc<AtomicU64>,
batch_enabled: bool,
}
impl std::fmt::Debug for AhpHookExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AhpHookExecutor")
.field("agent_id", &self.agent_id)
.field("depth", &self.depth)
.field("idle_threshold_ms", &self.idle_threshold_ms)
.finish()
}
}
impl AhpHookExecutor {
pub async fn new(transport: Transport) -> Result<Self, a3s_ahp::AhpError> {
Self::new_with_config(transport, 10_000).await }
pub async fn new_with_config(
transport: Transport,
idle_threshold_ms: u64,
) -> Result<Self, a3s_ahp::AhpError> {
let client = AhpClient::new(transport).await?;
let capabilities = vec![
"pre_action".to_string(),
"post_action".to_string(),
"pre_prompt".to_string(),
"post_response".to_string(),
"session_start".to_string(),
"session_end".to_string(),
"error".to_string(),
"context_perception".to_string(),
"success".to_string(),
"memory_recall".to_string(),
"planning".to_string(),
"reasoning".to_string(),
"rate_limit".to_string(),
"confirmation".to_string(),
"idle".to_string(),
"heartbeat".to_string(),
"query".to_string(),
"batch".to_string(),
"skill_load".to_string(),
"skill_unload".to_string(),
];
client.handshake(capabilities.clone()).await?;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Ok(Self {
client: Arc::new(client),
agent_id: uuid::Uuid::new_v4().to_string(),
depth: 0,
last_activity: Arc::new(AtomicU64::new(now)),
idle_threshold_ms,
start_time: Instant::now(),
total_events: Arc::new(AtomicU64::new(0)),
total_tokens: Arc::new(AtomicI32::new(0)),
error_count: Arc::new(AtomicU64::new(0)),
capabilities: HashMap::new(),
shutdown: Arc::new(AtomicBool::new(false)),
memory_summary: Arc::new(RwLock::new(None)),
current_task: Arc::new(RwLock::new(None)),
recent_facts: Arc::new(RwLock::new(Vec::new())),
workspace: Arc::new(RwLock::new(None)),
batch_buffer: Arc::new(RwLock::new(Vec::new())),
batch_size: 10,
batch_timeout_ms: 5000,
last_batch_flush: Arc::new(AtomicU64::new(now)),
batch_enabled: false,
})
}
pub fn new_for_testing(client: Arc<AhpClient>, idle_threshold_ms: u64) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Self {
client,
agent_id: uuid::Uuid::new_v4().to_string(),
depth: 0,
last_activity: Arc::new(AtomicU64::new(now)),
idle_threshold_ms,
start_time: Instant::now(),
total_events: Arc::new(AtomicU64::new(0)),
total_tokens: Arc::new(AtomicI32::new(0)),
error_count: Arc::new(AtomicU64::new(0)),
capabilities: HashMap::new(),
shutdown: Arc::new(AtomicBool::new(false)),
memory_summary: Arc::new(RwLock::new(None)),
current_task: Arc::new(RwLock::new(None)),
recent_facts: Arc::new(RwLock::new(Vec::new())),
workspace: Arc::new(RwLock::new(None)),
batch_buffer: Arc::new(RwLock::new(Vec::new())),
batch_size: 10,
batch_timeout_ms: 5000,
last_batch_flush: Arc::new(AtomicU64::new(now)),
batch_enabled: false,
}
}
pub async fn with_context(
transport: Transport,
agent_id: String,
depth: u32,
) -> Result<Self, a3s_ahp::AhpError> {
Self::with_context_and_config(transport, agent_id, depth, 10_000).await
}
pub async fn with_context_and_config(
transport: Transport,
agent_id: String,
depth: u32,
idle_threshold_ms: u64,
) -> Result<Self, a3s_ahp::AhpError> {
let client = AhpClient::new(transport).await?;
client
.handshake(vec!["pre_action".to_string(), "post_action".to_string()])
.await?;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Ok(Self {
client: Arc::new(client),
agent_id,
depth,
last_activity: Arc::new(AtomicU64::new(now)),
idle_threshold_ms,
start_time: Instant::now(),
total_events: Arc::new(AtomicU64::new(0)),
total_tokens: Arc::new(AtomicI32::new(0)),
error_count: Arc::new(AtomicU64::new(0)),
capabilities: HashMap::new(),
shutdown: Arc::new(AtomicBool::new(false)),
memory_summary: Arc::new(RwLock::new(None)),
current_task: Arc::new(RwLock::new(None)),
recent_facts: Arc::new(RwLock::new(Vec::new())),
workspace: Arc::new(RwLock::new(None)),
batch_buffer: Arc::new(RwLock::new(Vec::new())),
batch_size: 10,
batch_timeout_ms: 5000,
last_batch_flush: Arc::new(AtomicU64::new(now)),
batch_enabled: false,
})
}
pub fn with_capabilities(
mut self,
capabilities: impl IntoIterator<Item = (String, serde_json::Value)>,
) -> Self {
for (key, value) in capabilities {
self.capabilities.insert(key, value);
}
self
}
pub fn add_capability(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
self.capabilities.insert(key.into(), value);
self
}
pub fn record_error(&self) {
self.error_count.fetch_add(1, Ordering::Relaxed);
}
pub fn total_events_count(&self) -> u64 {
self.total_events.load(Ordering::Relaxed)
}
pub fn error_count_value(&self) -> u64 {
self.error_count.load(Ordering::Relaxed)
}
pub fn get_idle_duration_ms(&self) -> u64 {
let last = self.last_activity.load(Ordering::Relaxed);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
now.saturating_sub(last)
}
pub fn check_idle(&self) -> Option<IdleEvent> {
let elapsed = self.get_idle_duration_ms();
if elapsed >= self.idle_threshold_ms {
Some(IdleEvent {
idle_duration_ms: elapsed,
idle_reason: "no_activity".to_string(),
last_event_type: None,
suggested_action: Some("dream".to_string()),
})
} else {
None
}
}
pub fn set_memory_summary(self: Arc<Self>, summary: a3s_ahp::MemorySummary) {
let mut lock = self.memory_summary.write().unwrap();
*lock = Some(summary);
}
pub fn set_current_task(self: Arc<Self>, task: String) {
let mut lock = self.current_task.write().unwrap();
*lock = Some(task);
}
pub fn add_recent_fact(self: Arc<Self>, fact: a3s_ahp::Fact) {
let mut lock = self.recent_facts.write().unwrap();
lock.push(fact);
}
pub fn set_recent_facts(self: Arc<Self>, facts: Vec<a3s_ahp::Fact>) {
let mut lock = self.recent_facts.write().unwrap();
*lock = facts;
}
pub fn get_recent_facts(&self) -> Vec<a3s_ahp::Fact> {
self.recent_facts.read().unwrap().clone()
}
pub fn set_workspace(self: Arc<Self>, workspace: String) {
let mut lock = self.workspace.write().unwrap();
*lock = Some(workspace);
}
pub fn get_workspace(&self) -> Option<String> {
self.workspace.read().unwrap().clone()
}
pub async fn query(
&self,
query_type: impl Into<String>,
payload: serde_json::Value,
) -> Result<a3s_ahp::QueryResponse, a3s_ahp::AhpError> {
self.client.query(query_type, payload).await
}
pub async fn send_batch(
&self,
events: Vec<a3s_ahp::AhpEvent>,
) -> Result<a3s_ahp::BatchResponse, a3s_ahp::AhpError> {
self.client.send_batch(events).await
}
pub fn with_batch_config(mut self, batch_size: usize, batch_timeout_ms: u64) -> Self {
self.batch_size = batch_size;
self.batch_timeout_ms = batch_timeout_ms;
self.batch_enabled = true;
self
}
pub async fn add_to_batch(&self, event: a3s_ahp::AhpEvent) -> bool {
let should_flush = {
let mut buffer = self.batch_buffer.write().unwrap();
buffer.push(event);
buffer.len() >= self.batch_size
};
if should_flush {
self.flush_batch().await;
}
should_flush
}
pub async fn flush_batch(&self) {
let events = {
let mut buffer = self.batch_buffer.write().unwrap();
if buffer.is_empty() {
return;
}
std::mem::take(&mut *buffer)
};
if !events.is_empty() {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
self.last_batch_flush.store(now, Ordering::Relaxed);
match self.client.send_batch(events).await {
Ok(_) => {
debug!("Batch sent successfully");
}
Err(e) => {
warn!("Batch send failed: {}", e);
}
}
}
}
pub async fn check_batch_timeout(&self) {
let elapsed = {
let last = self.last_batch_flush.load(Ordering::Relaxed);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
now.saturating_sub(last)
};
if elapsed >= self.batch_timeout_ms {
self.flush_batch().await;
}
}
pub fn execute_background(self: Arc<Self>) {
let shutdown = Arc::clone(&self.shutdown);
shutdown.store(false, Ordering::Relaxed);
let heartbeat_executor = Arc::clone(&self);
let heartbeat_shutdown = Arc::clone(&shutdown);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
tokio::select! {
_ = interval.tick() => {
if heartbeat_shutdown.load(Ordering::Relaxed) {
debug!("Heartbeat task shutting down");
break;
}
let event = AhpEvent {
event_type: EventType::Heartbeat,
session_id: heartbeat_executor.agent_id.clone(),
agent_id: heartbeat_executor.agent_id.clone(),
timestamp: Utc::now().to_rfc3339(),
depth: heartbeat_executor.depth,
payload: serde_json::to_value(HeartbeatEvent {
uptime_ms: heartbeat_executor.start_time.elapsed().as_millis() as u64,
total_events_processed: heartbeat_executor.total_events.load(Ordering::Relaxed),
current_state: "active".to_string(),
cpu_percent: None,
memory_bytes: None,
active_tools: None,
pending_actions: None,
queue_depth: None,
tokens_used: None,
}).unwrap_or_default(),
context: heartbeat_executor.build_context(),
metadata: None,
};
if let Err(e) = heartbeat_executor.client.send_event_full_value(&event).await {
warn!("Heartbeat failed: {}", e);
}
}
}
}
});
let idle_executor = Arc::clone(&self);
let idle_shutdown = Arc::clone(&shutdown);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
tokio::select! {
_ = interval.tick() => {
if idle_shutdown.load(Ordering::Relaxed) {
debug!("Idle detection task shutting down");
break;
}
if let Some(idle_event) = idle_executor.check_idle() {
debug!("Idle detected, sending IdleEvent");
let event = AhpEvent {
event_type: EventType::Idle,
session_id: idle_executor.agent_id.clone(),
agent_id: idle_executor.agent_id.clone(),
timestamp: Utc::now().to_rfc3339(),
depth: idle_executor.depth,
payload: serde_json::to_value(idle_event).unwrap_or_default(),
context: idle_executor.build_context(),
metadata: None,
};
match idle_executor.client.send_event_full_value(&event).await {
Ok(decision_payload) => {
debug!("Idle decision: {:?}", decision_payload);
if let Ok(idle_decision) = serde_json::from_value::<a3s_ahp::IdleDecision>(decision_payload.clone()) {
match idle_decision {
a3s_ahp::IdleDecision::Defer { .. } => {
}
_ => {
idle_executor.update_activity();
}
}
} else if let Ok(decision) = serde_json::from_value::<a3s_ahp::Decision>(decision_payload) {
match decision {
a3s_ahp::Decision::Defer { .. } => {
}
_ => {
idle_executor.update_activity();
}
}
}
}
Err(e) => {
warn!("Idle decision failed: {}", e);
}
}
}
}
}
}
});
}
pub fn stop_background(&self) {
self.shutdown.store(true, Ordering::Relaxed);
}
pub fn agent_id(&self) -> &str {
&self.agent_id
}
pub fn depth(&self) -> u32 {
self.depth
}
pub fn idle_threshold(&self) -> u64 {
self.idle_threshold_ms
}
fn update_activity(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
self.last_activity.store(now, Ordering::Relaxed);
}
fn record_event(&self) {
self.total_events.fetch_add(1, Ordering::Relaxed);
self.update_activity();
}
fn map_event(&self, event: &HookEvent) -> Option<AhpEvent> {
let (event_type, payload) = match event {
HookEvent::PreToolUse(e) => (
EventType::PreAction,
serde_json::json!({
"tool": e.tool,
"arguments": e.args,
"working_directory": e.working_directory,
"recent_tools": e.recent_tools,
}),
),
HookEvent::PostToolUse(e) => (
EventType::PostAction,
serde_json::json!({
"tool": e.tool,
"arguments": e.args,
"result": {
"success": e.result.success,
"output": e.result.output,
"exit_code": e.result.exit_code,
"duration_ms": e.result.duration_ms,
}
}),
),
HookEvent::PrePrompt(e) => (
EventType::PrePrompt,
serde_json::json!({
"prompt": e.prompt,
"system_prompt": e.system_prompt,
"message_count": e.message_count,
}),
),
HookEvent::GenerateStart(e) => (
EventType::PrePrompt,
serde_json::json!({
"prompt": e.prompt,
"session_id": e.session_id,
}),
),
HookEvent::PostResponse(e) => (
EventType::PostAction,
serde_json::json!({
"response_text": e.response_text,
"tool_calls_count": e.tool_calls_count,
"usage": e.usage,
"duration_ms": e.duration_ms,
}),
),
HookEvent::SessionStart(e) => (
EventType::SessionStart,
serde_json::json!({
"session_id": e.session_id,
"system_prompt": e.system_prompt,
"model_provider": e.model_provider,
"model_name": e.model_name,
}),
),
HookEvent::SessionEnd(e) => (
EventType::SessionEnd,
serde_json::json!({
"session_id": e.session_id,
"duration_ms": e.duration_ms,
}),
),
HookEvent::OnError(e) => (
EventType::Error,
serde_json::json!({
"error_type": format!("{:?}", e.error_type),
"error_message": e.error_message,
"context": e.context,
}),
),
HookEvent::PreContextPerception(e) => {
let workspace = self.workspace.read().unwrap().clone().unwrap_or_default();
(
EventType::ContextPerception,
serde_json::json!({
"intent": e.intent,
"target_type": e.target_type,
"target_name": e.target_name,
"domain": e.domain,
"query": e.query,
"working_directory": workspace,
"urgency": e.urgency,
}),
)
}
HookEvent::PostContextPerception(e) => (
EventType::ContextPerception,
serde_json::json!({
"intent": e.intent,
"target_type": e.target_type,
"success": e.success,
"facts_retrieved": e.facts_retrieved,
"files_retrieved": e.files_retrieved,
"error": e.error,
}),
),
HookEvent::OnSuccess(e) => (
EventType::Success,
serde_json::json!({
"action_type": e.action_type,
"action_summary": e.action_summary,
"duration_ms": e.duration_ms,
}),
),
HookEvent::PreMemoryRecall(e) => (
EventType::MemoryRecall,
serde_json::json!({
"query": e.query,
"memory_type": e.memory_type,
"max_results": e.max_results,
"working_directory": e.working_directory,
}),
),
HookEvent::PostMemoryRecall(e) => (
EventType::MemoryRecall,
serde_json::json!({
"query": e.query,
"memory_type": e.memory_type,
"facts_retrieved": e.facts_retrieved,
"success": e.success,
"error": e.error,
}),
),
HookEvent::PrePlanning(e) => (
EventType::Planning,
serde_json::json!({
"task_description": e.task_description,
"available_strategies": e.available_strategies,
"constraints": e.constraints,
}),
),
HookEvent::PostPlanning(e) => (
EventType::Planning,
serde_json::json!({
"task_description": e.task_description,
"strategy_used": e.strategy_used,
"subtasks": e.subtasks,
"success": e.success,
"error": e.error,
}),
),
HookEvent::PreReasoning(e) => (
EventType::Reasoning,
serde_json::json!({
"reasoning_type": format!("{:?}", e.reasoning_type),
"problem_statement": e.problem_statement,
"hints": e.hints,
}),
),
HookEvent::PostReasoning(e) => (
EventType::Reasoning,
serde_json::json!({
"reasoning_type": format!("{:?}", e.reasoning_type),
"conclusion": e.conclusion,
"steps_count": e.steps_count,
"success": e.success,
"error": e.error,
}),
),
HookEvent::OnRateLimit(e) => (
EventType::RateLimit,
serde_json::json!({
"limit_type": format!("{:?}", e.limit_type),
"retry_after_ms": e.retry_after_ms,
"current_usage": e.current_usage,
}),
),
HookEvent::OnConfirmation(e) => (
EventType::Confirmation,
serde_json::json!({
"confirmation_type": format!("{:?}", e.confirmation_type),
"message": e.message,
"options": e.options,
}),
),
HookEvent::IntentDetection(e) => (
EventType::IntentDetection,
serde_json::json!({
"prompt": e.prompt,
"workspace": e.workspace,
"language_hint": e.language_hint,
}),
),
HookEvent::GenerateEnd(e) => (
EventType::PostAction,
serde_json::json!({
"response_text": e.response_text,
"tool_calls": e.tool_calls,
"usage": e.usage,
"duration_ms": e.duration_ms,
}),
),
HookEvent::SkillLoad(_) | HookEvent::SkillUnload(_) => {
return None;
}
};
Some(AhpEvent {
event_type,
session_id: self.extract_session_id(event),
agent_id: self.agent_id.clone(),
timestamp: Utc::now().to_rfc3339(),
depth: self.depth,
payload,
context: self.build_context(),
metadata: None,
})
}
fn build_context(&self) -> Option<a3s_ahp::EventContext> {
if self.capabilities.is_empty() {
return None;
}
let session_stats = SessionStats {
total_actions: self.total_events.load(Ordering::Relaxed) as usize,
total_tokens: self.total_tokens.load(Ordering::Relaxed),
duration_ms: self.start_time.elapsed().as_millis() as u64,
error_count: self.error_count.load(Ordering::Relaxed) as usize,
};
let memory_summary = self.memory_summary.read().unwrap().clone();
let current_task = self.current_task.read().unwrap().clone();
let recent_facts = self.recent_facts.read().unwrap().clone();
Some(a3s_ahp::EventContext {
recent_facts: Some(recent_facts),
memory_summary,
session_stats: Some(session_stats),
current_task,
capabilities: Some(self.capabilities.clone()),
})
}
fn extract_session_id(&self, event: &HookEvent) -> String {
match event {
HookEvent::PreToolUse(e) => e.session_id.clone(),
HookEvent::PostToolUse(e) => e.session_id.clone(),
HookEvent::GenerateStart(e) => e.session_id.clone(),
HookEvent::SessionStart(e) => e.session_id.clone(),
HookEvent::SessionEnd(e) => e.session_id.clone(),
HookEvent::PrePrompt(e) => e.session_id.clone(),
HookEvent::PreContextPerception(e) => e.session_id.clone(),
HookEvent::PostContextPerception(e) => e.session_id.clone(),
HookEvent::OnSuccess(e) => e.session_id.clone(),
HookEvent::PreMemoryRecall(e) => e.session_id.clone(),
HookEvent::PostMemoryRecall(e) => e.session_id.clone(),
HookEvent::PrePlanning(e) => e.session_id.clone(),
HookEvent::PostPlanning(e) => e.session_id.clone(),
HookEvent::PreReasoning(e) => e.session_id.clone(),
HookEvent::PostReasoning(e) => e.session_id.clone(),
HookEvent::OnRateLimit(e) => e.session_id.clone(),
HookEvent::OnConfirmation(e) => e.session_id.clone(),
HookEvent::IntentDetection(e) => e.session_id.clone(),
HookEvent::SkillLoad(_) => String::new(),
HookEvent::SkillUnload(_) => String::new(),
HookEvent::GenerateEnd(_) | HookEvent::PostResponse(_) | HookEvent::OnError(_) => {
self.agent_id.clone()
}
}
}
fn map_decision(
&self,
event_type: EventType,
decision_payload: serde_json::Value,
) -> HookResult {
match event_type {
EventType::ContextPerception => self.map_context_perception_decision(&decision_payload),
EventType::MemoryRecall => self.map_memory_recall_decision(&decision_payload),
EventType::Planning => self.map_planning_decision(&decision_payload),
EventType::Reasoning => self.map_reasoning_decision(&decision_payload),
EventType::RateLimit => self.map_rate_limit_decision(&decision_payload),
EventType::Confirmation => self.map_confirmation_decision(&decision_payload),
EventType::IntentDetection => self.map_intent_detection_decision(&decision_payload),
_ => self.map_generic_decision(decision_payload),
}
}
fn map_generic_decision(&self, payload: serde_json::Value) -> HookResult {
match serde_json::from_value::<Decision>(payload) {
Ok(Decision::Allow {
modified_payload, ..
}) => {
if let Some(modified) = modified_payload {
HookResult::Continue(Some(modified))
} else {
HookResult::Continue(None)
}
}
Ok(Decision::Block { reason, .. }) => HookResult::Block(reason),
Ok(Decision::Defer {
retry_after_ms,
reason,
}) => {
if let Some(r) = reason {
debug!("AHP defer: {}", r);
}
HookResult::Retry(retry_after_ms)
}
Ok(Decision::Modify {
modified_payload, ..
}) => HookResult::Continue(Some(modified_payload)),
Ok(Decision::Escalate {
reason,
escalation_target,
}) => HookResult::Escalate {
reason,
target: escalation_target,
},
Err(_) => HookResult::Block("Invalid decision payload".into()),
}
}
fn map_context_perception_decision(&self, payload: &serde_json::Value) -> HookResult {
match serde_json::from_value::<ContextPerceptionDecision>(payload.clone()) {
Ok(ContextPerceptionDecision::Allow {
injected_context, ..
}) => {
let value = serde_json::to_value(injected_context).ok();
HookResult::Continue(value)
}
Ok(ContextPerceptionDecision::Block { reason, .. }) => HookResult::Block(reason),
Ok(ContextPerceptionDecision::Refine {
refined_intent,
refined_target,
scope_hints,
}) => HookResult::Continue(Some(serde_json::json!({
"refined_intent": refined_intent,
"refined_target": refined_target,
"scope_hints": scope_hints
}))),
Err(_) => {
self.map_generic_decision(payload.clone())
}
}
}
fn map_memory_recall_decision(&self, payload: &serde_json::Value) -> HookResult {
match serde_json::from_value::<MemoryRecallDecision>(payload.clone()) {
Ok(MemoryRecallDecision::Allow { injected_facts, .. }) => {
let value = serde_json::to_value(injected_facts).ok();
HookResult::Continue(value)
}
Ok(MemoryRecallDecision::Block { reason, .. }) => HookResult::Block(reason),
Err(_) => self.map_generic_decision(payload.clone()),
}
}
fn map_planning_decision(&self, payload: &serde_json::Value) -> HookResult {
match serde_json::from_value::<PlanningDecision>(payload.clone()) {
Ok(PlanningDecision::Allow {
selected_strategy,
planning_template,
..
}) => HookResult::Continue(Some(serde_json::json!({
"selected_strategy": selected_strategy,
"planning_template": planning_template
}))),
Ok(PlanningDecision::Block { reason, .. }) => HookResult::Block(reason),
Ok(PlanningDecision::Modify {
modified_task,
hints,
}) => HookResult::Continue(Some(serde_json::json!({
"modified_task": modified_task,
"hints": hints
}))),
Err(_) => self.map_generic_decision(payload.clone()),
}
}
fn map_reasoning_decision(&self, payload: &serde_json::Value) -> HookResult {
match serde_json::from_value::<ReasoningDecision>(payload.clone()) {
Ok(ReasoningDecision::Allow { hints, .. }) => {
let value = serde_json::to_value(hints).ok();
HookResult::Continue(value)
}
Ok(ReasoningDecision::Block { reason, .. }) => HookResult::Block(reason),
Err(_) => self.map_generic_decision(payload.clone()),
}
}
fn map_rate_limit_decision(&self, payload: &serde_json::Value) -> HookResult {
match serde_json::from_value::<RateLimitDecision>(payload.clone()) {
Ok(RateLimitDecision::Retry { retry_after_ms, .. }) => {
HookResult::Retry(retry_after_ms)
}
Ok(RateLimitDecision::Queue) => HookResult::Skip,
Ok(RateLimitDecision::Skip { .. }) => HookResult::Skip,
Err(_) => self.map_generic_decision(payload.clone()),
}
}
fn map_confirmation_decision(&self, payload: &serde_json::Value) -> HookResult {
match serde_json::from_value::<ConfirmationDecision>(payload.clone()) {
Ok(ConfirmationDecision::Escalate) => HookResult::Escalate {
reason: "Human confirmation required".into(),
target: None,
},
Ok(ConfirmationDecision::Approve) => HookResult::continue_(),
Ok(ConfirmationDecision::Reject { reason }) => HookResult::Block(reason),
Err(_) => self.map_generic_decision(payload.clone()),
}
}
fn map_intent_detection_decision(&self, payload: &serde_json::Value) -> HookResult {
match serde_json::from_value::<IntentDetectionDecision>(payload.clone()) {
Ok(IntentDetectionDecision::Allow {
detected_intent,
confidence,
target_hints,
}) => HookResult::Continue(Some(serde_json::json!({
"detected_intent": detected_intent,
"confidence": confidence,
"target_hints": target_hints
}))),
Ok(IntentDetectionDecision::Block { reason, .. }) => HookResult::Block(reason),
Err(_) => self.map_generic_decision(payload.clone()),
}
}
fn is_blocking_event(&self, event_type: HookEventType) -> bool {
matches!(
event_type,
HookEventType::PreToolUse
| HookEventType::PrePrompt
| HookEventType::GenerateStart
| HookEventType::PreContextPerception
| HookEventType::PreMemoryRecall
| HookEventType::PrePlanning
| HookEventType::PreReasoning
| HookEventType::OnConfirmation
| HookEventType::IntentDetection
)
}
}
#[async_trait]
impl HookExecutor for AhpHookExecutor {
async fn fire(&self, event: &HookEvent) -> HookResult {
self.record_event();
match event {
HookEvent::PostResponse(e) => {
self.total_tokens
.fetch_add(e.usage.total_tokens, Ordering::Relaxed);
}
HookEvent::GenerateEnd(e) => {
self.total_tokens
.fetch_add(e.usage.total_tokens, Ordering::Relaxed);
}
_ => {}
}
let ahp_event = match self.map_event(event) {
Some(e) => e,
None => {
debug!("Event {:?} not mapped to AHP, allowing", event.event_type());
return HookResult::Continue(None);
}
};
let is_blocking = self.is_blocking_event(event.event_type());
if is_blocking {
if self.batch_enabled {
self.flush_batch().await;
}
match self.client.send_event_full_value(&ahp_event).await {
Ok(decision_payload) => {
debug!(
"AHP decision for {:?}: {:?}",
ahp_event.event_type, decision_payload
);
self.map_decision(ahp_event.event_type, decision_payload)
}
Err(e) => {
warn!("AHP error: {}, allowing by default", e);
HookResult::Continue(None)
}
}
} else if self.batch_enabled {
self.add_to_batch(ahp_event).await;
HookResult::Continue(None)
} else {
let client = self.client.clone();
let event = ahp_event;
tokio::spawn(async move {
if let Err(e) = client.send_event_full_value(&event).await {
warn!("AHP fire-and-forget error: {}", e);
}
});
HookResult::Continue(None)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hooks::PreToolUseEvent;
struct NoopTransport;
#[async_trait]
impl a3s_ahp::transport::TransportLayer for NoopTransport {
async fn send_request(
&self,
request: a3s_ahp::AhpRequest,
) -> a3s_ahp::Result<a3s_ahp::AhpResponse> {
Ok(a3s_ahp::AhpResponse::success(
request.id,
serde_json::json!({}),
))
}
async fn send_notification(
&self,
_notification: a3s_ahp::AhpNotification,
) -> a3s_ahp::Result<()> {
Ok(())
}
async fn close(&self) -> a3s_ahp::Result<()> {
Ok(())
}
}
fn make_test_executor() -> AhpHookExecutor {
let client = Arc::new(AhpClient::new_for_testing(Arc::new(NoopTransport)));
AhpHookExecutor::new_for_testing(client, 10_000)
}
#[test]
fn test_map_pre_tool_use() {
let executor = make_test_executor();
let event = HookEvent::PreToolUse(PreToolUseEvent {
session_id: "session-123".to_string(),
tool: "Bash".to_string(),
args: serde_json::json!({"command": "ls"}),
working_directory: "/workspace".to_string(),
recent_tools: vec![],
});
let ahp_event = executor.map_event(&event).unwrap();
assert_eq!(ahp_event.event_type, EventType::PreAction);
assert_eq!(ahp_event.session_id, "session-123");
assert_eq!(ahp_event.depth, 0);
}
#[test]
fn test_map_decision_allow() {
let executor = make_test_executor();
let decision = Decision::Allow {
modified_payload: None,
metadata: None,
};
let result = executor.map_decision(
EventType::PreAction,
serde_json::to_value(decision).unwrap(),
);
assert!(matches!(result, HookResult::Continue(None)));
}
#[test]
fn test_map_decision_block() {
let executor = make_test_executor();
let decision = Decision::Block {
reason: "Dangerous command".to_string(),
metadata: None,
};
let result = executor.map_decision(
EventType::PreAction,
serde_json::to_value(decision).unwrap(),
);
assert!(matches!(result, HookResult::Block(_)));
}
#[test]
fn test_idle_detection_not_idle() {
let executor = make_test_executor();
let idle_event = executor.check_idle();
assert!(idle_event.is_none());
}
#[test]
fn test_idle_detection_after_threshold() {
let executor = make_test_executor();
let old_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64
- 11_000;
executor.last_activity.store(old_time, Ordering::Relaxed);
let idle_event = executor.check_idle();
assert!(idle_event.is_some());
let idle = idle_event.unwrap();
assert!(idle.idle_duration_ms >= 10_000);
assert_eq!(idle.idle_reason, "no_activity");
assert_eq!(idle.suggested_action, Some("dream".to_string()));
}
#[test]
fn test_record_event_updates_activity() {
let executor = make_test_executor();
let old_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64
- 1_000;
executor.last_activity.store(old_time, Ordering::Relaxed);
let before = executor.last_activity.load(Ordering::Relaxed);
executor.record_event();
let after = executor.last_activity.load(Ordering::Relaxed);
assert!(after > before);
assert!(executor.get_idle_duration_ms() < 1_000);
}
}