use pyo3::prelude::*;
use tokio::sync::oneshot;
use super::super::{
AgentId,
command_loop::{AgentRegistry, get_or_compile_py_helper},
py_scripts::{
PYTHON_GET_HISTORY_SCRIPT, PYTHON_GET_LAST_TURN_USAGE_SCRIPT,
PYTHON_GET_TOTAL_USAGE_SCRIPT, PYTHON_GET_TURN_COUNT_SCRIPT, PYTHON_IS_IDLE_SCRIPT,
},
};
use crate::error::Error;
const GET_HISTORY_FN_NAME: &str = "_get_history";
static GET_HISTORY_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
const GET_TURN_COUNT_FN_NAME: &str = "_get_turn_count";
static GET_TURN_COUNT_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
const GET_TOTAL_USAGE_FN_NAME: &str = "_get_total_usage";
static GET_TOTAL_USAGE_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
const GET_LAST_TURN_USAGE_FN_NAME: &str = "_get_last_turn_usage";
static GET_LAST_TURN_USAGE_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
const IS_IDLE_FN_NAME: &str = "_is_idle";
static IS_IDLE_FN: std::sync::OnceLock<PyObject> = std::sync::OnceLock::new();
type PyAgentRef = (Py<PyAny>, Py<PyAny>);
fn lock_agent_instance(
registry: &AgentRegistry,
agent_id: AgentId,
) -> Result<Option<PyAgentRef>, Error> {
let guard = registry.lock().map_err(|e| {
tracing::error!(agent_id = ?agent_id, error = %e, "Agent registry mutex poisoned");
Error::BackendError {
message: "Agent registry mutex poisoned".to_owned(),
}
})?;
Ok(guard
.get(&agent_id)
.map(|(c, a)| Python::with_gil(|py| (c.clone_ref(py), a.clone_ref(py)))))
}
pub(in crate::runtime) fn handle_get_history(
registry: &AgentRegistry,
agent_id: AgentId,
reply: oneshot::Sender<Result<Vec<crate::types::ConversationMessage>, Error>>,
) {
let instance_opt = match lock_agent_instance(registry, agent_id) {
Ok(opt) => opt,
Err(e) => {
if let Err(send_err) = reply.send(Err(e)) {
tracing::warn!(agent_id = ?agent_id, error = ?send_err, "GetHistory reply receiver dropped (lock error)");
}
return;
}
};
let Some((_ctx, agent_instance)) = instance_opt else {
if let Err(e) = reply.send(Err(Error::BackendError {
message: format!("Agent ID {agent_id} not found in registry"),
})) {
tracing::warn!(agent_id = ?agent_id, error = ?e, "GetHistory reply receiver dropped (not found)");
}
return;
};
let result = Python::with_gil(
|py| -> Result<Vec<crate::types::ConversationMessage>, Error> {
let helper_fn = get_or_compile_py_helper(
&GET_HISTORY_FN,
PYTHON_GET_HISTORY_SCRIPT,
GET_HISTORY_FN_NAME,
)
.map_err(|e| Error::BackendError { message: e })?;
let helper_bound = helper_fn.bind(py);
let agent_bound = agent_instance.bind(py);
let result = helper_bound.call1((agent_bound,))?;
let json_str: String = result.extract()?;
serde_json::from_str(&json_str).map_err(|e| Error::BackendError {
message: format!("Failed to parse history JSON: {e}"),
})
},
);
if let Err(e) = reply.send(result) {
tracing::warn!(agent_id = ?agent_id, error = ?e, "GetHistory reply receiver dropped");
}
}
pub(in crate::runtime) fn handle_get_turn_count(
registry: &AgentRegistry,
agent_id: AgentId,
reply: oneshot::Sender<Result<u32, Error>>,
) {
let instance_opt = match lock_agent_instance(registry, agent_id) {
Ok(opt) => opt,
Err(e) => {
if let Err(send_err) = reply.send(Err(e)) {
tracing::warn!(agent_id = ?agent_id, error = ?send_err, "GetTurnCount reply receiver dropped (lock error)");
}
return;
}
};
let Some((_ctx, agent_instance)) = instance_opt else {
if let Err(e) = reply.send(Err(Error::BackendError {
message: format!("Agent ID {agent_id} not found in registry"),
})) {
tracing::warn!(agent_id = ?agent_id, error = ?e, "GetTurnCount reply receiver dropped (not found)");
}
return;
};
let result = Python::with_gil(|py| -> Result<u32, Error> {
let helper_fn = get_or_compile_py_helper(
&GET_TURN_COUNT_FN,
PYTHON_GET_TURN_COUNT_SCRIPT,
GET_TURN_COUNT_FN_NAME,
)
.map_err(|e| Error::BackendError { message: e })?;
let helper_bound = helper_fn.bind(py);
let agent_bound = agent_instance.bind(py);
let result = helper_bound.call1((agent_bound,))?;
Ok(result.extract::<u32>()?)
});
if let Err(e) = reply.send(result) {
tracing::warn!(agent_id = ?agent_id, error = ?e, "GetTurnCount reply receiver dropped");
}
}
pub(in crate::runtime) fn handle_get_total_usage(
registry: &AgentRegistry,
agent_id: AgentId,
reply: oneshot::Sender<Result<crate::types::UsageMetadata, Error>>,
) {
handle_get_usage_impl(
registry,
agent_id,
reply,
&GET_TOTAL_USAGE_FN,
PYTHON_GET_TOTAL_USAGE_SCRIPT,
GET_TOTAL_USAGE_FN_NAME,
"GetTotalUsage",
);
}
pub(in crate::runtime) fn handle_get_last_turn_usage(
registry: &AgentRegistry,
agent_id: AgentId,
reply: oneshot::Sender<Result<crate::types::UsageMetadata, Error>>,
) {
handle_get_usage_impl(
registry,
agent_id,
reply,
&GET_LAST_TURN_USAGE_FN,
PYTHON_GET_LAST_TURN_USAGE_SCRIPT,
GET_LAST_TURN_USAGE_FN_NAME,
"GetLastTurnUsage",
);
}
fn handle_get_usage_impl(
registry: &AgentRegistry,
agent_id: AgentId,
reply: oneshot::Sender<Result<crate::types::UsageMetadata, Error>>,
cache: &'static std::sync::OnceLock<PyObject>,
script: &str,
fn_name: &str,
label: &str,
) {
let instance_opt = match lock_agent_instance(registry, agent_id) {
Ok(opt) => opt,
Err(e) => {
if let Err(send_err) = reply.send(Err(e)) {
tracing::warn!(agent_id = ?agent_id, error = ?send_err, label, "reply receiver dropped (lock error)");
}
return;
}
};
let Some((_ctx, agent_instance)) = instance_opt else {
if let Err(e) = reply.send(Err(Error::BackendError {
message: format!("Agent ID {agent_id} not found in registry"),
})) {
tracing::warn!(agent_id = ?agent_id, label, error = ?e, "reply receiver dropped (not found)");
}
return;
};
let result = Python::with_gil(|py| -> Result<crate::types::UsageMetadata, Error> {
let helper_fn = get_or_compile_py_helper(cache, script, fn_name)
.map_err(|e| Error::BackendError { message: e })?;
let helper_bound = helper_fn.bind(py);
let agent_bound = agent_instance.bind(py);
let result = helper_bound.call1((agent_bound,))?;
let json_str: String = result.extract()?;
serde_json::from_str(&json_str).map_err(|e| Error::BackendError {
message: format!("Failed to parse usage JSON: {e}"),
})
});
if let Err(e) = reply.send(result) {
tracing::warn!(agent_id = ?agent_id, label, error = ?e, "reply receiver dropped");
}
}
pub(in crate::runtime) fn handle_get_compaction_indices(
registry: &AgentRegistry,
agent_id: AgentId,
reply: oneshot::Sender<Result<Vec<u32>, Error>>,
) {
let instance_opt = match lock_agent_instance(registry, agent_id) {
Ok(opt) => opt,
Err(e) => {
if let Err(send_err) = reply.send(Err(e)) {
tracing::warn!(agent_id = ?agent_id, error = ?send_err, "GetCompactionIndices reply receiver dropped (lock error)");
}
return;
}
};
let Some((_ctx, agent_instance)) = instance_opt else {
if let Err(e) = reply.send(Err(Error::BackendError {
message: format!("Agent ID {agent_id} not found in registry"),
})) {
tracing::warn!(agent_id = ?agent_id, error = ?e, "GetCompactionIndices reply receiver dropped (not found)");
}
return;
};
let result = Python::with_gil(|py| -> Result<Vec<u32>, Error> {
let agent_bound = agent_instance.bind(py);
if !agent_bound.hasattr("conversation")? {
return Ok(Vec::new());
}
let conv = agent_bound.getattr("conversation")?;
if !conv.hasattr("compaction_indices")? {
return Ok(Vec::new());
}
let indices = conv.getattr("compaction_indices")?;
Ok(indices.extract::<Vec<u32>>()?)
});
if let Err(e) = reply.send(result) {
tracing::warn!(agent_id = ?agent_id, error = ?e, "GetCompactionIndices reply receiver dropped");
}
}
pub(in crate::runtime) fn handle_get_last_response(
registry: &AgentRegistry,
agent_id: AgentId,
reply: oneshot::Sender<Result<Option<String>, Error>>,
) {
let instance_opt = match lock_agent_instance(registry, agent_id) {
Ok(opt) => opt,
Err(e) => {
if let Err(send_err) = reply.send(Err(e)) {
tracing::warn!(agent_id = ?agent_id, error = ?send_err, "GetLastResponse reply receiver dropped (lock error)");
}
return;
}
};
let Some((_ctx, agent_instance)) = instance_opt else {
if let Err(e) = reply.send(Err(Error::BackendError {
message: format!("Agent ID {agent_id} not found in registry"),
})) {
tracing::warn!(agent_id = ?agent_id, error = ?e, "GetLastResponse reply receiver dropped (not found)");
}
return;
};
let result = Python::with_gil(|py| -> Result<Option<String>, Error> {
let agent_bound = agent_instance.bind(py);
if !agent_bound.hasattr("conversation")? {
return Ok(None);
}
let conv = agent_bound.getattr("conversation")?;
if !conv.hasattr("last_response")? {
return Ok(None);
}
let response_str: String = conv.getattr("last_response")?.extract()?;
if response_str.is_empty() {
return Ok(None);
}
Ok(Some(response_str))
});
if let Err(e) = reply.send(result) {
tracing::warn!(agent_id = ?agent_id, error = ?e, "GetLastResponse reply receiver dropped");
}
}
pub(in crate::runtime) fn handle_is_idle(
registry: &AgentRegistry,
agent_id: AgentId,
reply: oneshot::Sender<Result<bool, Error>>,
) {
let instance_opt = match lock_agent_instance(registry, agent_id) {
Ok(opt) => opt,
Err(e) => {
if let Err(send_err) = reply.send(Err(e)) {
tracing::warn!(agent_id = ?agent_id, error = ?send_err, "IsIdle reply receiver dropped (lock error)");
}
return;
}
};
let Some((_ctx, agent_instance)) = instance_opt else {
if let Err(e) = reply.send(Err(Error::BackendError {
message: format!("Agent ID {agent_id} not found in registry"),
})) {
tracing::warn!(agent_id = ?agent_id, error = ?e, "IsIdle reply receiver dropped (not found)");
}
return;
};
let result = Python::with_gil(|py| -> Result<bool, Error> {
let helper_fn =
get_or_compile_py_helper(&IS_IDLE_FN, PYTHON_IS_IDLE_SCRIPT, IS_IDLE_FN_NAME)
.map_err(|e| Error::BackendError { message: e })?;
let helper_bound = helper_fn.bind(py);
let agent_bound = agent_instance.bind(py);
let result = helper_bound.call1((agent_bound,))?;
Ok(result.extract::<bool>()?)
});
if let Err(e) = reply.send(result) {
tracing::warn!(agent_id = ?agent_id, error = ?e, "IsIdle reply receiver dropped");
}
}