velocia 0.3.5

velocia – production-ready AI agent framework using ADK-Rust, A2A protocol, and AWS DynamoDB
//! ADK-backed agent strategy.
//!
//! Wraps `adk-rust` to provide agent creation and execution.
//! Mirrors Python's `ADKAgentStrategy`.

use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use async_trait::async_trait;
use futures::StreamExt;
use tokio::sync::Mutex;
use tracing::info;
use uuid::Uuid;

use crate::a2a::client::connector::RemoteAgentConnection;
use crate::a2a::types::{AgentCard, Artifact, Message, Part, Task, TaskState};
use crate::agents::strategy::{AgentExecutor, AnyAgent};
use crate::config::model::ModelConfig;
use crate::error::{AgentKitError, Result};
use crate::tools::loader::DynTool;

// ── Concrete ADK agent handle ─────────────────────────────────────────────────

/// Holds the LLM model and configuration for the ADK agent.
pub struct AdkAgentHandle {
    pub name: String,
    pub model_id: String,
    pub instruction: String,
    /// Shared LLM model (Arc'd to allow cheap cloning across turns).
    model: Arc<dyn adk_rust::Llm>,
    /// Remote A2A agents exposed as sub-agents to the LLM.
    remote_agents: Vec<Arc<RemoteAgentConnection>>,
    /// Shared session store — persists conversation history across turns.
    session_service: Arc<dyn adk_rust::session::SessionService>,
}

impl AnyAgent for AdkAgentHandle {}

// ── Executor ──────────────────────────────────────────────────────────────────

pub struct AdkAgentExecutor {
    agent: Arc<AdkAgentHandle>,
    #[allow(dead_code)]
    card: AgentCard,
    active_sessions: Mutex<HashSet<String>>,
    /// Tracks the last context_id used so that clients that omit contextId
    /// (e.g. plain HTTP callers or the A2A inspector on follow-up messages)
    /// continue in the same conversation instead of starting fresh.
    last_context_id: Mutex<Option<String>>,
}

impl AdkAgentExecutor {
    pub fn new(agent: Arc<AdkAgentHandle>, card: AgentCard) -> Self {
        Self {
            agent,
            card,
            active_sessions: Mutex::new(HashSet::new()),
            last_context_id: Mutex::new(None),
        }
    }
}

#[async_trait]
impl AgentExecutor for AdkAgentExecutor {
    async fn execute(&self, request: Message) -> Result<Task> {
        // Resolve context_id: use what the client sends; if absent, reuse the
        // last known context so the conversation continues rather than restarting.
        let context_id = match request.context_id.clone() {
            Some(id) => {
                *self.last_context_id.lock().await = Some(id.clone());
                id
            }
            None => {
                let last = self.last_context_id.lock().await.clone();
                match last {
                    Some(id) => {
                        info!("[{}] contextId ausente — reutilizando sesión previa: {}", self.agent.name, &id[..8.min(id.len())]);
                        id
                    }
                    None => {
                        let new_id = Uuid::new_v4().to_string();
                        *self.last_context_id.lock().await = Some(new_id.clone());
                        info!("[{}] Primera sesión: {}", self.agent.name, &new_id[..8.min(new_id.len())]);
                        new_id
                    }
                }
            }
        };

        self.active_sessions.lock().await.insert(context_id.clone());

        let input_text: String = request
            .parts
            .iter()
            .filter_map(|p| if let Part::Text { text, .. } = p { Some(text.as_str()) } else { None })
            .collect::<Vec<_>>()
            .join("\n");

        let ctx_short = &context_id[..8.min(context_id.len())];
        info!("┌── [{}] MENSAJE RECIBIDO (ctx: {ctx_short}…) ─────────────────────", self.agent.name);
        for line in input_text.lines() {
            info!("│  {line}");
        }
        info!("└──────────────────────────────────────────────────────────────────");

        // Build a fresh LlmAgent per turn, adding remote agents as sub-agents.
        let mut llm_builder = adk_rust::agent::LlmAgentBuilder::new(&self.agent.name)
            .description(&self.agent.name)
            .instruction(&self.agent.instruction)
            .model(Arc::clone(&self.agent.model));

        for conn in &self.agent.remote_agents {
            let tool = RemoteAgentTool::new(Arc::clone(conn));
            llm_builder = llm_builder.tool(Arc::new(tool));
        }

        let llm_agent = llm_builder
            .build()
            .map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;

        // Reuse the shared session service — keeps conversation history across turns.
        let session_service = Arc::clone(&self.agent.session_service);

        // Derive session_id from context_id so every turn of the same
        // conversation hits the same session (get-or-create).
        let session_id = adk_rust::SessionId::try_from(context_id.as_str())
            .unwrap_or_else(|_| adk_rust::SessionId::generate());

        let session_exists = session_service
            .get(adk_rust::session::GetRequest {
                app_name: self.agent.name.clone(),
                user_id: "user".to_string(),
                session_id: session_id.to_string(),
                num_recent_events: None,
                after: None,
            })
            .await
            .is_ok();

        if !session_exists {
            info!("[{}] Nueva sesión: {}", self.agent.name, session_id);
            session_service
                .create(adk_rust::session::CreateRequest {
                    app_name: self.agent.name.clone(),
                    user_id: "user".to_string(),
                    session_id: Some(session_id.to_string()),
                    state: HashMap::new(),
                })
                .await
                .map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;
        } else {
            info!("[{}] Sesión reutilizada: {}", self.agent.name, session_id);
        }

        let runner = adk_rust::runner::Runner::builder()
            .app_name(&self.agent.name)
            .agent(Arc::new(llm_agent))
            .session_service(Arc::clone(&session_service))
            .build()
            .map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;

        let content = adk_rust::Content::new("user").with_text(&input_text);
        let user_id = adk_rust::UserId::new("user")
            .map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;

        let mut stream = runner
            .run(user_id, session_id, content)
            .await
            .map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;

        let mut response_text = String::new();
        while let Some(event) = stream.next().await {
            let event = event.map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;
            if let Some(content) = &event.llm_response.content {
                for part in &content.parts {
                    if let Some(text) = part.text() {
                        response_text.push_str(text);
                    }
                }
            }
        }

        info!("┌── [{}] RESPUESTA ENVIADA (ctx: {ctx_short}…) ──────────────────────", self.agent.name);
        for line in response_text.lines() {
            info!("│  {line}");
        }
        info!("└──────────────────────────────────────────────────────────────────");

        self.active_sessions.lock().await.remove(&context_id);

        Ok(Task {
            id: Uuid::new_v4().to_string(),
            context_id,
            state: TaskState::Completed,
            artifacts: vec![Artifact {
                artifact_id: Uuid::new_v4().to_string(),
                name: None,
                description: None,
                parts: vec![Part::Text { text: response_text, metadata: None }],
                metadata: None,
            }],
            history: vec![request],
            metadata: None,
        })
    }

    async fn cancel(&self, task_id: &str) -> Result<()> {
        self.active_sessions.lock().await.remove(task_id);
        Ok(())
    }
}

// ── Remote agent tool ─────────────────────────────────────────────────────────

/// Wraps a remote A2A agent as an ADK `Tool` so the coordinator LLM can call
/// multiple remote agents in a single turn (unlike sub-agents, which end the
/// LLM run after the first transfer).
struct RemoteAgentTool {
    conn: Arc<RemoteAgentConnection>,
}

impl RemoteAgentTool {
    fn new(conn: Arc<RemoteAgentConnection>) -> Self {
        Self { conn }
    }
}

#[async_trait::async_trait]
impl adk_rust::tool::Tool for RemoteAgentTool {
    fn name(&self) -> &str {
        &self.conn.card.name
    }

    fn description(&self) -> &str {
        &self.conn.card.description
    }

    fn parameters_schema(&self) -> Option<serde_json::Value> {
        Some(serde_json::json!({
            "type": "object",
            "properties": {
                "task": {
                    "type": "string",
                    "description": "The task or question to send to this agent."
                }
            },
            "required": ["task"]
        }))
    }

    async fn execute(
        &self,
        _ctx: std::sync::Arc<dyn adk_rust::tool::ToolContext>,
        args: serde_json::Value,
    ) -> adk_rust::Result<serde_json::Value> {
        let task_text = args
            .get("task")
            .and_then(|v| v.as_str())
            .unwrap_or("")
            .to_string();

        info!("→ Delegando a '{}': {:.80}", self.conn.card.name, task_text);

        let message = Message {
            kind: "message".to_string(),
            role: crate::a2a::types::Role::User,
            parts: vec![Part::Text { text: task_text, metadata: None }],
            message_id: Uuid::new_v4().to_string(),
            context_id: None,
            task_id: None,
            metadata: None,
        };

        let task = self
            .conn
            .send_message(message, HashMap::new())
            .await
            .map_err(|e| adk_rust::AdkError::agent(e.to_string()))?;

        let response_text: String = task
            .artifacts
            .iter()
            .flat_map(|a| a.parts.iter())
            .filter_map(|p| {
                if let Part::Text { text, .. } = p {
                    Some(text.as_str())
                } else {
                    None
                }
            })
            .collect::<Vec<_>>()
            .join("\n");

        info!("← Respuesta de '{}': {:.80}", self.conn.card.name, response_text);

        Ok(serde_json::json!({ "response": response_text }))
    }
}

// ── Strategy builder helper ───────────────────────────────────────────────────

/// Constructs an ADK agent + executor from configuration.
pub struct AdkAgentBuilder {
    model_config: ModelConfig,
}

impl AdkAgentBuilder {
    pub fn new(model_config: ModelConfig) -> Self {
        Self { model_config }
    }

    /// Resolve the correct LLM backend:
    /// - `LITELLM_PROXY_API_KEY` + `provider.name` + `provider.endpoint` → OpenAI-compatible
    ///   client pointed at the LiteLLM proxy (mirrors Python's `LiteLlm` path)
    /// - Otherwise → native Gemini model, stripping the `google_genai:` prefix if present
    fn resolve_model(&self) -> Result<(String, Arc<dyn adk_rust::Llm>)> {
        if let Ok(proxy_key) = std::env::var("LITELLM_PROXY_API_KEY") {
            if let Some(ref provider) = self.model_config.provider {
                if let Some(ref endpoint) = provider.endpoint {
                    let model_id = format!("{}/{}", provider.name, self.model_config.name);
                    info!("LiteLLM proxy → {endpoint} | model: {model_id}");

                    let config =
                        adk_rust::model::openai_compatible::OpenAICompatibleConfig::new(
                            proxy_key,
                            &model_id,
                        )
                        .with_provider_name("litellm-proxy")
                        .with_base_url(endpoint.clone());

                    let model = adk_rust::model::openai_compatible::OpenAICompatible::new(config)
                        .map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;

                    return Ok((model_id, Arc::new(model)));
                }
            }
        }

        // Gemini: strip optional "google_genai:" prefix from model name
        let model_id = match self.model_config.name.split_once(':') {
            Some((_prefix, model)) => model.to_string(),
            None => self.model_config.name.clone(),
        };

        let api_key = std::env::var("GOOGLE_API_KEY").map_err(|_| {
            AgentKitError::AgentBuild("GOOGLE_API_KEY environment variable not set".to_string())
        })?;

        info!("Gemini model: {model_id}");
        let model = adk_rust::model::GeminiModel::new(api_key, &model_id)
            .map_err(|e| AgentKitError::AgentBuild(e.to_string()))?;

        Ok((model_id, Arc::new(model)))
    }

    pub fn build_agent(
        &self,
        name: &str,
        instruction: &str,
        _tools: Vec<DynTool>,
        remote_agents: Vec<Arc<RemoteAgentConnection>>,
    ) -> Result<Arc<AdkAgentHandle>> {
        let (model_id, model) = self.resolve_model()?;
        info!("Creating ADK agent '{name}' with model '{model_id}' ({} remote sub-agent(s))", remote_agents.len());

        Ok(Arc::new(AdkAgentHandle {
            name: name.to_string(),
            model_id,
            instruction: instruction.to_string(),
            model,
            remote_agents,
            session_service: Arc::new(adk_rust::session::InMemorySessionService::new()),
        }))
    }

    pub fn build_executor(
        &self,
        agent: Arc<AdkAgentHandle>,
        card: AgentCard,
    ) -> Arc<dyn AgentExecutor> {
        Arc::new(AdkAgentExecutor::new(agent, card))
    }
}