use std::time::Duration;
use futures::stream::StreamExt;
use pyo3::prelude::*;
use tokio::{sync::mpsc, time::timeout};
use super::{
AgentId, PyCommand,
handlers::{agent, async_ops, chat, query},
};
pub(super) const HANDLER_TIMEOUT: Duration = Duration::from_mins(1);
pub(crate) fn get_or_compile_py_helper(
cache: &'static std::sync::OnceLock<PyObject>,
script: &str,
fn_name: &str,
) -> Result<PyObject, String> {
if let Some(cached) = cache.get() {
return Python::with_gil(|py| Ok(cached.clone_ref(py)));
}
Python::with_gil(|py| {
let locals = pyo3::types::PyDict::new_bound(py);
py.run_bound(script, None, Some(&locals))
.map_err(|e| e.to_string())?;
let fn_obj = locals
.get_item(fn_name)
.map_err(|e| e.to_string())?
.ok_or_else(|| format!("Failed to define {fn_name} helper"))?;
let py_obj = fn_obj.to_object(py);
if let Err(e) = cache.set(py_obj.clone_ref(py)) {
tracing::debug!("Cache was already set: {:?}", e);
}
Ok(py_obj)
})
}
pub(crate) const AGY_BRIDGE_GLOBALS_MODULE: &str = "_agy_bridge_globals";
pub(super) const CANCEL_FN_NAME: &str = "_cancel";
pub(super) static CANCEL_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
pub(super) const WAIT_FOR_IDLE_FN_NAME: &str = "_wait_for_idle";
pub(super) static WAIT_FOR_IDLE_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
pub(super) const SEND_FN_NAME: &str = "_send";
pub(super) static SEND_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
pub(super) const SIGNAL_IDLE_FN_NAME: &str = "_signal_idle";
pub(super) static SIGNAL_IDLE_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
pub(super) const WAIT_FOR_WAKEUP_FN_NAME: &str = "_wait_for_wakeup";
pub(super) static WAIT_FOR_WAKEUP_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
pub(super) const CLEAR_HISTORY_FN_NAME: &str = "_clear_history";
pub(super) static CLEAR_HISTORY_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
pub(super) const DELETE_FN_NAME: &str = "_delete";
pub(super) static DELETE_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
pub(super) const DISCONNECT_FN_NAME: &str = "_disconnect";
pub(super) static DISCONNECT_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
pub(crate) type RegistryInner = std::collections::HashMap<AgentId, (PyObject, PyObject)>;
pub(crate) type AgentRegistry = std::sync::Arc<std::sync::Mutex<RegistryInner>>;
pub(super) fn lookup_agent_instance(
registry: &AgentRegistry,
agent_id: AgentId,
) -> Option<(PyObject, PyObject)> {
let lock = registry.lock().unwrap_or_else(|e| {
tracing::warn!(
"Agent registry mutex poisoned — recovering (data is safe because entries \
are always fully formed before insertion): {e}"
);
e.into_inner()
});
lock.get(&agent_id)
.map(|(c, a)| Python::with_gil(|py| (c.clone_ref(py), a.clone_ref(py))))
}
pub(crate) async fn run_async_command_loop(
mut cmd_rx: mpsc::Receiver<PyCommand>,
chat_timeout: Duration,
inter_agent_delay: Duration,
) -> PyResult<()> {
tracing::info!(
timeout_secs = chat_timeout.as_secs(),
"Chat round-trip timeout configured"
);
let registry: AgentRegistry = std::sync::Arc::new(std::sync::Mutex::new(RegistryInner::new()));
let mut active_tasks =
futures::stream::FuturesUnordered::<futures::future::BoxFuture<'static, ()>>::new();
loop {
tokio::select! {
cmd_opt = cmd_rx.recv() => {
let Some(cmd) = cmd_opt else {
break;
};
tracing::debug!("Live-SDK command loop: received command");
if let DispatchResult::Shutdown = dispatch_async_command(
cmd,
®istry,
chat_timeout,
inter_agent_delay,
&mut active_tasks,
).await {
break;
}
}
_ = active_tasks.next(), if !active_tasks.is_empty() => {
}
}
}
cleanup_remaining_agents(®istry).await;
Ok(())
}
async fn cleanup_remaining_agents(registry: &AgentRegistry) {
let remaining: Vec<_> = registry
.lock()
.unwrap_or_else(|e| {
tracing::warn!("Agent registry mutex poisoned during cleanup — recovering: {e}");
e.into_inner()
})
.drain()
.collect();
if !remaining.is_empty() {
tracing::info!(
count = remaining.len(),
"Cleaning up agents remaining in registry after command loop exit"
);
}
for (agent_id, (ctx_py, _instance)) in remaining {
tracing::debug!(agent_id = ?agent_id, "Calling __aexit__ on leftover agent");
cleanup_single_agent(agent_id, ctx_py).await;
}
}
async fn cleanup_single_agent(agent_id: AgentId, ctx_py: PyObject) {
let aexit_result = Python::with_gil(|py| {
let ctx_bound = ctx_py.bind(py);
let none = py.None();
let coro = ctx_bound.call_method1("__aexit__", (&none, &none, &none))?;
Ok::<_, PyErr>(coro.to_object(py))
});
match aexit_result {
Ok(aexit_coro_py) => {
let aexit_fut = Python::with_gil(|py| {
let coro = aexit_coro_py.into_bound(py);
pyo3_async_runtimes::tokio::into_future(coro)
});
match aexit_fut {
Ok(fut) => {
match timeout(Duration::from_secs(10), fut).await {
Ok(Ok(_)) => {
tracing::debug!(agent_id = ?agent_id, "Agent __aexit__ completed");
}
Ok(Err(e)) => {
tracing::warn!(
agent_id = ?agent_id,
error = %e,
"Agent __aexit__ returned error during cleanup"
);
}
Err(_) => {
tracing::warn!(
agent_id = ?agent_id,
"Agent __aexit__ timed out during cleanup"
);
}
}
}
Err(e) => {
tracing::warn!(
agent_id = ?agent_id,
error = %e,
"Failed to convert __aexit__ coro to future"
);
}
}
}
Err(e) => {
tracing::warn!(
agent_id = ?agent_id,
error = %e,
"Failed to call __aexit__ during cleanup"
);
}
}
if let Ok(mut map) = super::bridge_state().write() {
map.remove(&agent_id.0);
} else {
tracing::warn!(
agent_id = agent_id.0,
"BRIDGE_STATE RwLock poisoned during cleanup"
);
}
}
enum DispatchResult {
Continue,
Shutdown,
}
fn dispatch_query_command(cmd: PyCommand, registry: &AgentRegistry) -> Result<(), PyCommand> {
match cmd {
PyCommand::GetHistory { agent_id, reply } => {
query::handle_get_history(registry, agent_id, reply);
}
PyCommand::GetTurnCount { agent_id, reply } => {
query::handle_get_turn_count(registry, agent_id, reply);
}
PyCommand::GetTotalUsage { agent_id, reply } => {
query::handle_get_total_usage(registry, agent_id, reply);
}
PyCommand::GetLastTurnUsage { agent_id, reply } => {
query::handle_get_last_turn_usage(registry, agent_id, reply);
}
PyCommand::GetCompactionIndices { agent_id, reply } => {
query::handle_get_compaction_indices(registry, agent_id, reply);
}
PyCommand::GetLastResponse { agent_id, reply } => {
query::handle_get_last_response(registry, agent_id, reply);
}
PyCommand::IsIdle { agent_id, reply } => {
query::handle_is_idle(registry, agent_id, reply);
}
other => return Err(other),
}
Ok(())
}
async fn dispatch_async_command(
cmd: PyCommand,
registry: &AgentRegistry,
chat_timeout: Duration,
inter_agent_delay: Duration,
active_tasks: &mut futures::stream::FuturesUnordered<futures::future::BoxFuture<'static, ()>>,
) -> DispatchResult {
let cmd = match dispatch_query_command(cmd, registry) {
Ok(()) => return DispatchResult::Continue,
Err(cmd) => cmd,
};
let cmd = match dispatch_lifecycle_command(cmd, registry, chat_timeout, active_tasks) {
Ok(()) => return DispatchResult::Continue,
Err(cmd) => cmd,
};
let cmd = match cmd {
PyCommand::Chat {
agent_id,
prompt,
reply,
} => {
chat::dispatch_chat_command(
registry,
agent_id,
prompt,
reply,
chat_timeout,
active_tasks,
inter_agent_delay,
)
.await;
return DispatchResult::Continue;
}
other => other,
};
let cmd = match dispatch_agent_operation(cmd, registry, active_tasks) {
Ok(()) => return DispatchResult::Continue,
Err(cmd) => cmd,
};
match cmd {
PyCommand::Shutdown => {
tracing::info!("Shutdown command received, exiting async command loop");
DispatchResult::Shutdown
}
PyCommand::GetHistory { .. }
| PyCommand::GetTurnCount { .. }
| PyCommand::GetTotalUsage { .. }
| PyCommand::GetLastTurnUsage { .. }
| PyCommand::GetCompactionIndices { .. }
| PyCommand::GetLastResponse { .. }
| PyCommand::IsIdle { .. }
| PyCommand::CreateAgent { .. }
| PyCommand::ShutdownAgent { .. }
| PyCommand::Chat { .. }
| PyCommand::Cancel { .. }
| PyCommand::WaitForIdle { .. }
| PyCommand::ClearHistory { .. }
| PyCommand::Send { .. }
| PyCommand::SignalIdle { .. }
| PyCommand::WaitForWakeup { .. }
| PyCommand::Delete { .. }
| PyCommand::Disconnect { .. } => {
unreachable!("all variants handled by earlier dispatch phases")
}
}
}
fn spawn_agent_task(
active_tasks: &mut futures::stream::FuturesUnordered<futures::future::BoxFuture<'static, ()>>,
fut: impl std::future::Future<Output = ()> + Send + 'static,
) {
active_tasks.push(Box::pin(fut));
}
fn dispatch_lifecycle_command(
cmd: PyCommand,
registry: &AgentRegistry,
chat_timeout: Duration,
active_tasks: &mut futures::stream::FuturesUnordered<futures::future::BoxFuture<'static, ()>>,
) -> Result<(), PyCommand> {
match cmd {
PyCommand::CreateAgent { config_json, reply } => {
let registry = registry.clone();
spawn_agent_task(active_tasks, async move {
agent::handle_create_agent(registry, chat_timeout, config_json, reply).await;
});
}
PyCommand::ShutdownAgent { agent_id, reply } => {
let registry = registry.clone();
spawn_agent_task(active_tasks, async move {
agent::handle_shutdown_agent(registry, chat_timeout, agent_id, reply).await;
});
}
other => return Err(other),
}
Ok(())
}
fn dispatch_agent_operation(
cmd: PyCommand,
registry: &AgentRegistry,
active_tasks: &mut futures::stream::FuturesUnordered<futures::future::BoxFuture<'static, ()>>,
) -> Result<(), PyCommand> {
match cmd {
PyCommand::Cancel { agent_id, reply } => {
let registry = registry.clone();
spawn_agent_task(active_tasks, async move {
async_ops::handle_cancel(registry, agent_id, reply).await;
});
}
PyCommand::WaitForIdle { agent_id, reply } => {
let registry = registry.clone();
spawn_agent_task(active_tasks, async move {
async_ops::handle_wait_for_idle(registry, agent_id, reply).await;
});
}
PyCommand::ClearHistory { agent_id, reply } => {
let registry = registry.clone();
spawn_agent_task(active_tasks, async move {
async_ops::handle_clear_history(registry, agent_id, reply).await;
});
}
PyCommand::Send {
agent_id,
prompt,
reply,
} => {
let registry = registry.clone();
spawn_agent_task(active_tasks, async move {
async_ops::handle_send(registry, agent_id, prompt, reply).await;
});
}
PyCommand::SignalIdle { agent_id, reply } => {
let registry = registry.clone();
spawn_agent_task(active_tasks, async move {
async_ops::handle_signal_idle(registry, agent_id, reply).await;
});
}
PyCommand::WaitForWakeup {
agent_id,
timeout_secs,
reply,
} => {
let registry = registry.clone();
spawn_agent_task(active_tasks, async move {
async_ops::handle_wait_for_wakeup(registry, agent_id, timeout_secs, reply).await;
});
}
PyCommand::Delete { agent_id, reply } => {
let registry = registry.clone();
spawn_agent_task(active_tasks, async move {
async_ops::handle_delete(registry, agent_id, reply).await;
});
}
PyCommand::Disconnect { agent_id, reply } => {
let registry = registry.clone();
spawn_agent_task(active_tasks, async move {
async_ops::handle_disconnect(registry, agent_id, reply).await;
});
}
other => return Err(other),
}
Ok(())
}