agy-bridge 0.1.0

Rust bridge for the Google Antigravity SDK (Python) via PyO3
Documentation
/// Async command loop and handlers.
use std::time::Duration;

use futures::stream::StreamExt;
use pyo3::prelude::*;
use tokio::{sync::mpsc, time::timeout};

use super::{
    AgentId, PyCommand,
    handlers::{agent, async_ops, chat, query},
};

/// Timeout applied to `handle_send`, `handle_signal_idle`, and `handle_wait_for_wakeup`.
pub(super) const HANDLER_TIMEOUT: Duration = Duration::from_mins(1);

/// Compile a Python helper function once, caching the result in a `OnceLock`.
pub(crate) fn get_or_compile_py_helper(
    cache: &'static std::sync::OnceLock<PyObject>,
    script: &str,
    fn_name: &str,
) -> Result<PyObject, String> {
    if let Some(cached) = cache.get() {
        return Python::with_gil(|py| Ok(cached.clone_ref(py)));
    }
    Python::with_gil(|py| {
        let locals = pyo3::types::PyDict::new_bound(py);
        py.run_bound(script, None, Some(&locals))
            .map_err(|e| e.to_string())?;
        let fn_obj = locals
            .get_item(fn_name)
            .map_err(|e| e.to_string())?
            .ok_or_else(|| format!("Failed to define {fn_name} helper"))?;
        let py_obj = fn_obj.to_object(py);
        // Ignore set error if another thread raced us.
        if let Err(e) = cache.set(py_obj.clone_ref(py)) {
            tracing::debug!("Cache was already set: {:?}", e);
        }
        Ok(py_obj)
    })
}

/// Python module name used for Rust ↔ Python global state.
pub(crate) const AGY_BRIDGE_GLOBALS_MODULE: &str = "_agy_bridge_globals";

pub(super) const CANCEL_FN_NAME: &str = "_cancel";
pub(super) static CANCEL_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
pub(super) const WAIT_FOR_IDLE_FN_NAME: &str = "_wait_for_idle";
pub(super) static WAIT_FOR_IDLE_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
pub(super) const SEND_FN_NAME: &str = "_send";
pub(super) static SEND_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
pub(super) const SIGNAL_IDLE_FN_NAME: &str = "_signal_idle";
pub(super) static SIGNAL_IDLE_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
pub(super) const WAIT_FOR_WAKEUP_FN_NAME: &str = "_wait_for_wakeup";
pub(super) static WAIT_FOR_WAKEUP_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
pub(super) const CLEAR_HISTORY_FN_NAME: &str = "_clear_history";
pub(super) static CLEAR_HISTORY_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
pub(super) const DELETE_FN_NAME: &str = "_delete";
pub(super) static DELETE_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
pub(super) const DISCONNECT_FN_NAME: &str = "_disconnect";
pub(super) static DISCONNECT_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();

/// Type alias for the agent registry mapping IDs to their Python context
/// manager and live agent instance objects.
pub(crate) type RegistryInner = std::collections::HashMap<AgentId, (PyObject, PyObject)>;
pub(crate) type AgentRegistry = std::sync::Arc<std::sync::Mutex<RegistryInner>>;

/// Look up an agent by ID in the registry, returning cloned Python objects.
///
/// Returns `None` if the agent is not registered or the mutex is poisoned.
///
/// # Poisoned mutex recovery
///
/// The registry mutex is recovered on poison because:
/// - Entries are fully constructed before insertion (no partial writes).
/// - The worst case after a panic is a stale entry for an agent that failed
///   mid-operation — the entry will be cleaned up by `AgentHandle::drop` or
///   the final `cleanup_remaining_agents` sweep.
/// - Panicking here would bring down the entire command loop, killing all
///   agents — disproportionate when only one agent may have failed.
pub(super) fn lookup_agent_instance(
    registry: &AgentRegistry,
    agent_id: AgentId,
) -> Option<(PyObject, PyObject)> {
    let lock = registry.lock().unwrap_or_else(|e| {
        tracing::warn!(
            "Agent registry mutex poisoned — recovering (data is safe because entries \
             are always fully formed before insertion): {e}"
        );
        e.into_inner()
    });
    lock.get(&agent_id)
        .map(|(c, a)| Python::with_gil(|py| (c.clone_ref(py), a.clone_ref(py))))
}

/// Asynchronous command dispatch loop — live SDK mode.
///
/// Receives [`PyCommand`] messages and delegates each to a focused handler
/// function. The registry of live agents is threaded through the handlers.
pub(crate) async fn run_async_command_loop(
    mut cmd_rx: mpsc::Receiver<PyCommand>,
    chat_timeout: Duration,
    inter_agent_delay: Duration,
) -> PyResult<()> {
    tracing::info!(
        timeout_secs = chat_timeout.as_secs(),
        "Chat round-trip timeout configured"
    );
    let registry: AgentRegistry = std::sync::Arc::new(std::sync::Mutex::new(RegistryInner::new()));
    let mut active_tasks =
        futures::stream::FuturesUnordered::<futures::future::BoxFuture<'static, ()>>::new();

    loop {
        tokio::select! {
            cmd_opt = cmd_rx.recv() => {
                let Some(cmd) = cmd_opt else {
                    break;
                };
                tracing::debug!("Live-SDK command loop: received command");
                if let DispatchResult::Shutdown = dispatch_async_command(
                    cmd,
                    &registry,
                    chat_timeout,
                    inter_agent_delay,
                    &mut active_tasks,
                ).await {
                    break;
                }
            }
            _ = active_tasks.next(), if !active_tasks.is_empty() => {
                // A background task (chat, send, etc.) completed.
            }
        }
    }

    cleanup_remaining_agents(&registry).await;

    Ok(())
}

/// Clean up any agents still in the registry after the command loop exits.
///
/// Calls `__aexit__` on each context manager so Python-side resources
/// (WebSocket connections, localharness processes, file descriptors) are
/// released. Also clears the global tool/hook/policy registries for each agent.
///
/// Recovers from a poisoned mutex — see [`lookup_agent_instance`] for rationale.
async fn cleanup_remaining_agents(registry: &AgentRegistry) {
    let remaining: Vec<_> = registry
        .lock()
        .unwrap_or_else(|e| {
            tracing::warn!("Agent registry mutex poisoned during cleanup — recovering: {e}");
            e.into_inner()
        })
        .drain()
        .collect();
    if !remaining.is_empty() {
        tracing::info!(
            count = remaining.len(),
            "Cleaning up agents remaining in registry after command loop exit"
        );
    }
    for (agent_id, (ctx_py, _instance)) in remaining {
        tracing::debug!(agent_id = ?agent_id, "Calling __aexit__ on leftover agent");
        cleanup_single_agent(agent_id, ctx_py).await;
    }
}

/// Call `__aexit__` on a single agent's context manager and clean up its
/// global registry entries.
async fn cleanup_single_agent(agent_id: AgentId, ctx_py: PyObject) {
    let aexit_result = Python::with_gil(|py| {
        let ctx_bound = ctx_py.bind(py);
        let none = py.None();
        let coro = ctx_bound.call_method1("__aexit__", (&none, &none, &none))?;
        Ok::<_, PyErr>(coro.to_object(py))
    });

    match aexit_result {
        Ok(aexit_coro_py) => {
            let aexit_fut = Python::with_gil(|py| {
                let coro = aexit_coro_py.into_bound(py);
                pyo3_async_runtimes::tokio::into_future(coro)
            });
            match aexit_fut {
                Ok(fut) => {
                    // Use a short timeout — we're shutting down.
                    match timeout(Duration::from_secs(10), fut).await {
                        Ok(Ok(_)) => {
                            tracing::debug!(agent_id = ?agent_id, "Agent __aexit__ completed");
                        }
                        Ok(Err(e)) => {
                            tracing::warn!(
                                agent_id = ?agent_id,
                                error = %e,
                                "Agent __aexit__ returned error during cleanup"
                            );
                        }
                        Err(_) => {
                            tracing::warn!(
                                agent_id = ?agent_id,
                                "Agent __aexit__ timed out during cleanup"
                            );
                        }
                    }
                }
                Err(e) => {
                    tracing::warn!(
                        agent_id = ?agent_id,
                        error = %e,
                        "Failed to convert __aexit__ coro to future"
                    );
                }
            }
        }
        Err(e) => {
            tracing::warn!(
                agent_id = ?agent_id,
                error = %e,
                "Failed to call __aexit__ during cleanup"
            );
        }
    }
}

/// Outcome of dispatching a single command.
enum DispatchResult {
    Continue,
    Shutdown,
}

/// Dispatch synchronous query commands that don't spawn background tasks.
///
/// Returns `Ok(())` if the command was handled. Returns `Err(cmd)` if
/// the command is not a query variant, giving back ownership to the caller.
fn dispatch_query_command(cmd: PyCommand, registry: &AgentRegistry) -> Result<(), PyCommand> {
    match cmd {
        PyCommand::GetHistory { agent_id, reply } => {
            query::handle_get_history(registry, agent_id, reply);
        }
        PyCommand::GetTurnCount { agent_id, reply } => {
            query::handle_get_turn_count(registry, agent_id, reply);
        }
        PyCommand::GetTotalUsage { agent_id, reply } => {
            query::handle_get_total_usage(registry, agent_id, reply);
        }
        PyCommand::GetLastTurnUsage { agent_id, reply } => {
            query::handle_get_last_turn_usage(registry, agent_id, reply);
        }
        PyCommand::GetCompactionIndices { agent_id, reply } => {
            query::handle_get_compaction_indices(registry, agent_id, reply);
        }
        PyCommand::GetLastResponse { agent_id, reply } => {
            query::handle_get_last_response(registry, agent_id, reply);
        }
        PyCommand::IsIdle { agent_id, reply } => {
            query::handle_is_idle(registry, agent_id, reply);
        }
        other => return Err(other),
    }
    Ok(())
}

/// Dispatch a single [`PyCommand`] to the appropriate handler, spawning
/// async work into `active_tasks` where needed.
#[expect(
    clippy::too_many_lines,
    reason = "flat match dispatcher — each arm is a trivial task spawn"
)]
async fn dispatch_async_command(
    cmd: PyCommand,
    registry: &AgentRegistry,
    chat_timeout: Duration,
    inter_agent_delay: Duration,
    active_tasks: &mut futures::stream::FuturesUnordered<futures::future::BoxFuture<'static, ()>>,
) -> DispatchResult {
    // Synchronous query commands — handled without spawning tasks.
    let cmd = match dispatch_query_command(cmd, registry) {
        Ok(()) => return DispatchResult::Continue,
        Err(cmd) => cmd,
    };

    match cmd {
        PyCommand::CreateAgent {
            agent_id,
            config_json,
            bridge_ctx,
            reply,
        } => {
            let registry = registry.clone();
            active_tasks.push(Box::pin(async move {
                agent::handle_create_agent(
                    registry,
                    chat_timeout,
                    agent_id,
                    config_json,
                    bridge_ctx,
                    reply,
                )
                .await;
            }));
        }
        PyCommand::Chat {
            agent_id,
            prompt,
            reply,
        } => {
            chat::dispatch_chat_command(
                registry,
                agent_id,
                prompt,
                reply,
                chat_timeout,
                active_tasks,
                inter_agent_delay,
            )
            .await;
        }
        PyCommand::ShutdownAgent { agent_id, reply } => {
            let registry = registry.clone();
            active_tasks.push(Box::pin(async move {
                agent::handle_shutdown_agent(registry, chat_timeout, agent_id, reply).await;
            }));
        }
        PyCommand::Cancel { agent_id, reply } => {
            let registry = registry.clone();
            active_tasks.push(Box::pin(async move {
                async_ops::handle_cancel(registry, agent_id, reply).await;
            }));
        }
        PyCommand::WaitForIdle { agent_id, reply } => {
            let registry = registry.clone();
            active_tasks.push(Box::pin(async move {
                async_ops::handle_wait_for_idle(registry, agent_id, reply).await;
            }));
        }
        PyCommand::ClearHistory { agent_id, reply } => {
            let registry = registry.clone();
            active_tasks.push(Box::pin(async move {
                async_ops::handle_clear_history(registry, agent_id, reply).await;
            }));
        }
        PyCommand::Send {
            agent_id,
            prompt,
            reply,
        } => {
            let registry = registry.clone();
            active_tasks.push(Box::pin(async move {
                async_ops::handle_send(registry, agent_id, prompt, reply).await;
            }));
        }
        PyCommand::SignalIdle { agent_id, reply } => {
            let registry = registry.clone();
            active_tasks.push(Box::pin(async move {
                async_ops::handle_signal_idle(registry, agent_id, reply).await;
            }));
        }
        PyCommand::WaitForWakeup {
            agent_id,
            timeout_secs,
            reply,
        } => {
            let registry = registry.clone();
            active_tasks.push(Box::pin(async move {
                async_ops::handle_wait_for_wakeup(registry, agent_id, timeout_secs, reply).await;
            }));
        }
        PyCommand::Delete { agent_id, reply } => {
            let registry = registry.clone();
            active_tasks.push(Box::pin(async move {
                async_ops::handle_delete(registry, agent_id, reply).await;
            }));
        }
        PyCommand::Disconnect { agent_id, reply } => {
            let registry = registry.clone();
            active_tasks.push(Box::pin(async move {
                async_ops::handle_disconnect(registry, agent_id, reply).await;
            }));
        }
        PyCommand::Shutdown => {
            tracing::info!("Shutdown command received, exiting async command loop");
            return DispatchResult::Shutdown;
        }
        // Query commands already handled by dispatch_query_command above.
        PyCommand::GetHistory { .. }
        | PyCommand::GetTurnCount { .. }
        | PyCommand::GetTotalUsage { .. }
        | PyCommand::GetLastTurnUsage { .. }
        | PyCommand::GetCompactionIndices { .. }
        | PyCommand::GetLastResponse { .. }
        | PyCommand::IsIdle { .. } => {
            unreachable!("handled by dispatch_query_command")
        }
    }
    DispatchResult::Continue
}