use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::Rc;
use std::sync::{Arc, Condvar, LazyLock, Mutex};
use std::time::Duration;
use crate::agent_events::{self, AgentEvent, AgentEventSink};
use crate::mcp::VmMcpClientHandle;
use crate::value::VmValue;
pub type SessionEndHook = Arc<dyn Fn(&str) + Send + Sync>;
thread_local! {
static CURRENT_HOST_BRIDGE: RefCell<Option<Rc<crate::bridge::HostBridge>>> =
const { RefCell::new(None) };
static CURRENT_LOOP_SINKS: RefCell<Vec<Arc<dyn AgentEventSink>>> =
const { RefCell::new(Vec::new()) };
}
static GLOBAL_PENDING_FEEDBACK: LazyLock<Mutex<Vec<(String, String, String)>>> =
LazyLock::new(|| Mutex::new(Vec::new()));
static GLOBAL_PENDING_FEEDBACK_CV: LazyLock<Condvar> = LazyLock::new(Condvar::new);
static SESSION_END_HOOKS: LazyLock<Mutex<Vec<SessionEndHook>>> =
LazyLock::new(|| Mutex::new(Vec::new()));
static SESSION_MCP_CLIENTS: LazyLock<Mutex<BTreeMap<String, BTreeMap<String, VmMcpClientHandle>>>> =
LazyLock::new(|| Mutex::new(BTreeMap::new()));
pub(crate) struct LoopSinkGuard {
pushed: bool,
}
impl LoopSinkGuard {
pub(crate) fn install(sink: Option<Arc<dyn AgentEventSink>>) -> Self {
if let Some(sink) = sink {
CURRENT_LOOP_SINKS.with(|stack| stack.borrow_mut().push(sink));
Self { pushed: true }
} else {
Self { pushed: false }
}
}
}
impl Drop for LoopSinkGuard {
fn drop(&mut self) {
if self.pushed {
CURRENT_LOOP_SINKS.with(|stack| {
let _ = stack.borrow_mut().pop();
});
}
}
}
pub(crate) fn emit_agent_event_sync(event: &AgentEvent) {
agent_events::emit_event(event);
let loop_sink = CURRENT_LOOP_SINKS.with(|stack| stack.borrow().last().cloned());
if let Some(sink) = loop_sink {
sink.handle_event(event);
}
}
pub(crate) async fn emit_agent_event(event: &AgentEvent) {
agent_events::emit_event(event);
let loop_sink = CURRENT_LOOP_SINKS.with(|stack| stack.borrow().last().cloned());
if let Some(sink) = loop_sink {
sink.handle_event(event);
}
let subscribers = crate::agent_sessions::subscribers_for(event.session_id());
if subscribers.is_empty() {
return;
}
let payload = serde_json::to_value(event).unwrap_or(serde_json::Value::Null);
for closure in subscribers {
let VmValue::Closure(closure) = closure else {
continue;
};
let Some(mut vm) = crate::vm::clone_async_builtin_child_vm() else {
continue;
};
let arg = crate::stdlib::json_to_vm_value(&payload);
if let Err(err) = vm.call_closure_pub(&closure, &[arg]).await {
crate::events::log_warn(
"agent.subscriber",
&format!(
"session={} event={:?} subscriber error: {}",
event.session_id(),
std::mem::discriminant(event),
err
),
);
}
}
}
pub fn push_pending_feedback_global(session_id: &str, kind: &str, content: &str) {
if let Ok(mut q) = GLOBAL_PENDING_FEEDBACK.lock() {
q.push((
session_id.to_string(),
kind.to_string(),
content.to_string(),
));
}
GLOBAL_PENDING_FEEDBACK_CV.notify_all();
}
pub fn wait_for_global_pending_feedback(session_id: &str, timeout: Duration) -> bool {
let Ok(mut guard) = GLOBAL_PENDING_FEEDBACK.lock() else {
return false;
};
if guard.iter().any(|(sid, _, _)| sid == session_id) {
return true;
}
let start = std::time::Instant::now();
loop {
let remaining = match timeout.checked_sub(start.elapsed()) {
Some(remaining) if !remaining.is_zero() => remaining,
_ => return guard.iter().any(|(sid, _, _)| sid == session_id),
};
let (next_guard, wait_result) =
match GLOBAL_PENDING_FEEDBACK_CV.wait_timeout(guard, remaining) {
Ok(pair) => pair,
Err(poison) => {
let pair = poison.into_inner();
(pair.0, pair.1)
}
};
guard = next_guard;
if guard.iter().any(|(sid, _, _)| sid == session_id) {
return true;
}
if wait_result.timed_out() {
return false;
}
}
}
pub fn drain_global_pending_feedback(session_id: &str) -> Vec<(String, String)> {
let mut drained = Vec::new();
if let Ok(mut q) = GLOBAL_PENDING_FEEDBACK.lock() {
let mut kept = Vec::new();
for (sid, kind, content) in q.drain(..) {
if sid == session_id {
drained.push((kind, content));
} else {
kept.push((sid, kind, content));
}
}
*q = kept;
}
drained
}
pub fn register_session_end_hook(hook: SessionEndHook) {
if let Ok(mut hooks) = SESSION_END_HOOKS.lock() {
hooks.push(hook);
}
}
pub(crate) fn fire_session_end_hooks(session_id: &str) {
if let Ok(hooks) = SESSION_END_HOOKS.lock() {
for hook in hooks.iter() {
hook(session_id);
}
}
}
pub(crate) fn install_current_host_bridge(bridge: Rc<crate::bridge::HostBridge>) {
CURRENT_HOST_BRIDGE.with(|slot| {
*slot.borrow_mut() = Some(bridge);
});
}
pub(crate) fn clear_current_host_bridge() {
CURRENT_HOST_BRIDGE.with(|slot| {
*slot.borrow_mut() = None;
});
}
pub(crate) fn current_host_bridge() -> Option<Rc<crate::bridge::HostBridge>> {
CURRENT_HOST_BRIDGE.with(|slot| slot.borrow().clone())
}
pub fn current_agent_session_id() -> Option<String> {
crate::agent_sessions::current_session_id()
}
pub(crate) fn install_session_mcp_clients(
session_id: &str,
clients: BTreeMap<String, VmMcpClientHandle>,
) {
if let Ok(mut map) = SESSION_MCP_CLIENTS.lock() {
map.insert(session_id.to_string(), clients);
}
}
pub(crate) fn take_session_mcp_clients(
session_id: &str,
) -> Option<BTreeMap<String, VmMcpClientHandle>> {
SESSION_MCP_CLIENTS
.lock()
.ok()
.and_then(|mut map| map.remove(session_id))
}
pub(crate) fn session_mcp_client(session_id: &str, server_name: &str) -> Option<VmMcpClientHandle> {
SESSION_MCP_CLIENTS.lock().ok().and_then(|map| {
map.get(session_id)
.and_then(|clients| clients.get(server_name))
.cloned()
})
}