use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use async_trait::async_trait;
use futures::StreamExt;
use tokio::sync::Mutex;
use tracing::info;
use uuid::Uuid;
use crate::a2a::client::connector::RemoteAgentConnection;
use crate::a2a::types::{AgentCard, Artifact, Message, Part, Task, TaskState};
use crate::agents::strategy::{AgentExecutor, AnyAgent};
use crate::config::model::ModelConfig;
use crate::error::{AgentKitError, Result};
use crate::tools::loader::DynTool;
pub struct AdkAgentHandle {
pub name: String,
pub model_id: String,
pub instruction: String,
model: Arc<dyn adk_rust::Llm>,
remote_agents: Vec<Arc<RemoteAgentConnection>>,
session_service: Arc<dyn adk_rust::session::SessionService>,
}
impl AnyAgent for AdkAgentHandle {}
pub struct AdkAgentExecutor {
agent: Arc<AdkAgentHandle>,
#[allow(dead_code)]
card: AgentCard,
active_sessions: Mutex<HashSet<String>>,
last_context_id: Mutex<Option<String>>,
}
impl AdkAgentExecutor {
pub fn new(agent: Arc<AdkAgentHandle>, card: AgentCard) -> Self {
Self {
agent,
card,
active_sessions: Mutex::new(HashSet::new()),
last_context_id: Mutex::new(None),
}
}
}
#[async_trait]
impl AgentExecutor for AdkAgentExecutor {
async fn execute(&self, request: Message) -> Result<Task> {
let context_id = match request.context_id.clone() {
Some(id) => {
*self.last_context_id.lock().await = Some(id.clone());
id
}
None => {
let last = self.last_context_id.lock().await.clone();
match last {
Some(id) => {
info!("[{}] contextId ausente — reutilizando sesión previa: {}", self.agent.name, &id[..8.min(id.len())]);
id
}
None => {
let new_id = Uuid::new_v4().to_string();
*self.last_context_id.lock().await = Some(new_id.clone());
info!("[{}] Primera sesión: {}", self.agent.name, &new_id[..8.min(new_id.len())]);
new_id
}
}
}
};
self.active_sessions.lock().await.insert(context_id.clone());
let input_text: String = request
.parts
.iter()
.filter_map(|p| if let Part::Text { text, .. } = p { Some(text.as_str()) } else { None })
.collect::<Vec<_>>()
.join("\n");
let ctx_short = &context_id[..8.min(context_id.len())];
info!("┌── [{}] MENSAJE RECIBIDO (ctx: {ctx_short}…) ─────────────────────", self.agent.name);
for line in input_text.lines() {
info!("│ {line}");
}
info!("└──────────────────────────────────────────────────────────────────");
let mut llm_builder = adk_rust::agent::LlmAgentBuilder::new(&self.agent.name)
.description(&self.agent.name)
.instruction(&self.agent.instruction)
.model(Arc::clone(&self.agent.model));
for conn in &self.agent.remote_agents {
let tool = RemoteAgentTool::new(Arc::clone(conn));
llm_builder = llm_builder.tool(Arc::new(tool));
}
let llm_agent = llm_builder
.build()
.map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;
let session_service = Arc::clone(&self.agent.session_service);
let session_id = adk_rust::SessionId::try_from(context_id.as_str())
.unwrap_or_else(|_| adk_rust::SessionId::generate());
let session_exists = session_service
.get(adk_rust::session::GetRequest {
app_name: self.agent.name.clone(),
user_id: "user".to_string(),
session_id: session_id.to_string(),
num_recent_events: None,
after: None,
})
.await
.is_ok();
if !session_exists {
info!("[{}] Nueva sesión: {}", self.agent.name, session_id);
session_service
.create(adk_rust::session::CreateRequest {
app_name: self.agent.name.clone(),
user_id: "user".to_string(),
session_id: Some(session_id.to_string()),
state: HashMap::new(),
})
.await
.map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;
} else {
info!("[{}] Sesión reutilizada: {}", self.agent.name, session_id);
}
let runner = adk_rust::runner::Runner::builder()
.app_name(&self.agent.name)
.agent(Arc::new(llm_agent))
.session_service(Arc::clone(&session_service))
.build()
.map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;
let content = adk_rust::Content::new("user").with_text(&input_text);
let user_id = adk_rust::UserId::new("user")
.map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;
let mut stream = runner
.run(user_id, session_id, content)
.await
.map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;
let mut response_text = String::new();
while let Some(event) = stream.next().await {
let event = event.map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;
if let Some(content) = &event.llm_response.content {
for part in &content.parts {
if let Some(text) = part.text() {
response_text.push_str(text);
}
}
}
}
info!("┌── [{}] RESPUESTA ENVIADA (ctx: {ctx_short}…) ──────────────────────", self.agent.name);
for line in response_text.lines() {
info!("│ {line}");
}
info!("└──────────────────────────────────────────────────────────────────");
self.active_sessions.lock().await.remove(&context_id);
Ok(Task {
id: Uuid::new_v4().to_string(),
context_id,
state: TaskState::Completed,
artifacts: vec![Artifact {
artifact_id: Uuid::new_v4().to_string(),
name: None,
description: None,
parts: vec![Part::Text { text: response_text, metadata: None }],
metadata: None,
}],
history: vec![request],
metadata: None,
})
}
async fn cancel(&self, task_id: &str) -> Result<()> {
self.active_sessions.lock().await.remove(task_id);
Ok(())
}
}
struct RemoteAgentTool {
conn: Arc<RemoteAgentConnection>,
}
impl RemoteAgentTool {
fn new(conn: Arc<RemoteAgentConnection>) -> Self {
Self { conn }
}
}
#[async_trait::async_trait]
impl adk_rust::tool::Tool for RemoteAgentTool {
fn name(&self) -> &str {
&self.conn.card.name
}
fn description(&self) -> &str {
&self.conn.card.description
}
fn parameters_schema(&self) -> Option<serde_json::Value> {
Some(serde_json::json!({
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "The task or question to send to this agent."
}
},
"required": ["task"]
}))
}
async fn execute(
&self,
_ctx: std::sync::Arc<dyn adk_rust::tool::ToolContext>,
args: serde_json::Value,
) -> adk_rust::Result<serde_json::Value> {
let task_text = args
.get("task")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
info!("→ Delegando a '{}': {:.80}", self.conn.card.name, task_text);
let message = Message {
kind: "message".to_string(),
role: crate::a2a::types::Role::User,
parts: vec![Part::Text { text: task_text, metadata: None }],
message_id: Uuid::new_v4().to_string(),
context_id: None,
task_id: None,
metadata: None,
};
let task = self
.conn
.send_message(message, HashMap::new())
.await
.map_err(|e| adk_rust::AdkError::agent(e.to_string()))?;
let response_text: String = task
.artifacts
.iter()
.flat_map(|a| a.parts.iter())
.filter_map(|p| {
if let Part::Text { text, .. } = p {
Some(text.as_str())
} else {
None
}
})
.collect::<Vec<_>>()
.join("\n");
info!("← Respuesta de '{}': {:.80}", self.conn.card.name, response_text);
Ok(serde_json::json!({ "response": response_text }))
}
}
pub struct AdkAgentBuilder {
model_config: ModelConfig,
}
impl AdkAgentBuilder {
pub fn new(model_config: ModelConfig) -> Self {
Self { model_config }
}
fn resolve_model(&self) -> Result<(String, Arc<dyn adk_rust::Llm>)> {
if let Ok(proxy_key) = std::env::var("LITELLM_PROXY_API_KEY") {
if let Some(ref provider) = self.model_config.provider {
if let Some(ref endpoint) = provider.endpoint {
let model_id = format!("{}/{}", provider.name, self.model_config.name);
info!("LiteLLM proxy → {endpoint} | model: {model_id}");
let config =
adk_rust::model::openai_compatible::OpenAICompatibleConfig::new(
proxy_key,
&model_id,
)
.with_provider_name("litellm-proxy")
.with_base_url(endpoint.clone());
let model = adk_rust::model::openai_compatible::OpenAICompatible::new(config)
.map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;
return Ok((model_id, Arc::new(model)));
}
}
}
let model_id = match self.model_config.name.split_once(':') {
Some((_prefix, model)) => model.to_string(),
None => self.model_config.name.clone(),
};
let api_key = std::env::var("GOOGLE_API_KEY").map_err(|_| {
AgentKitError::AgentBuild("GOOGLE_API_KEY environment variable not set".to_string())
})?;
info!("Gemini model: {model_id}");
let model = adk_rust::model::GeminiModel::new(api_key, &model_id)
.map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;
Ok((model_id, Arc::new(model)))
}
pub fn build_agent(
&self,
name: &str,
instruction: &str,
_tools: Vec<DynTool>,
remote_agents: Vec<Arc<RemoteAgentConnection>>,
) -> Result<Arc<AdkAgentHandle>> {
let (model_id, model) = self.resolve_model()?;
info!("Creating ADK agent '{name}' with model '{model_id}' ({} remote sub-agent(s))", remote_agents.len());
Ok(Arc::new(AdkAgentHandle {
name: name.to_string(),
model_id,
instruction: instruction.to_string(),
model,
remote_agents,
session_service: Arc::new(adk_rust::session::InMemorySessionService::new()),
}))
}
pub fn build_executor(
&self,
agent: Arc<AdkAgentHandle>,
card: AgentCard,
) -> Arc<dyn AgentExecutor> {
Arc::new(AdkAgentExecutor::new(agent, card))
}
}