ares-server 0.7.5

A.R.E.S - Agentic Retrieval Enhanced Server: A production-grade agentic chatbot server with multi-provider LLM support, tool calling, RAG, and MCP integration
Documentation
use crate::types::{AppError, Result};
use serde::{Deserialize, Serialize};
use sqlx::{PgPool, Row};
use std::time::{SystemTime, UNIX_EPOCH};

fn now_ts() -> i64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_secs() as i64
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentRun {
    pub id: String,
    pub tenant_id: String,
    pub agent_name: String,
    pub user_id: Option<String>,
    pub status: String,
    pub input_tokens: i64,
    pub output_tokens: i64,
    pub duration_ms: i64,
    pub error: Option<String>,
    pub created_at: i64,
}

#[derive(Debug, Clone, Serialize)]
pub struct AgentRunStats {
    pub total_runs: i64,
    pub success_count: i64,
    pub failed_count: i64,
    pub avg_duration_ms: i64,
    pub total_input_tokens: i64,
    pub total_output_tokens: i64,
}

#[derive(Debug, Clone, Serialize)]
pub struct PlatformStats {
    pub total_tenants: i64,
    pub total_agents: i64,
    pub total_runs_today: i64,
    pub total_tokens_today: i64,
    pub active_alerts: i64,
}

#[derive(Debug, Clone, Serialize)]
pub struct AllAgentsEntry {
    pub tenant_id: String,
    pub tenant_name: String,
    pub agent_name: String,
    pub display_name: String,
    pub model: String,
    pub enabled: bool,
    pub total_runs: i64,
    pub last_run_at: Option<i64>,
}

pub async fn insert_agent_run(
    pool: &PgPool,
    tenant_id: &str,
    agent_name: &str,
    user_id: Option<&str>,
    status: &str,
    input_tokens: i64,
    output_tokens: i64,
    duration_ms: i64,
    error: Option<&str>,
    model_name: &str,
    provider_name: &str,
    is_streaming: bool,
) -> Result<String> {
    let id = uuid::Uuid::new_v4().to_string();
    let now = now_ts();

    sqlx::query(
        "INSERT INTO agent_runs (id, tenant_id, agent_name, user_id, status, input_tokens, output_tokens, duration_ms, error, created_at, model_name, provider_name, is_streaming)
         VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)"
    )
    .bind(&id)
    .bind(tenant_id)
    .bind(agent_name)
    .bind(user_id)
    .bind(status)
    .bind(input_tokens)
    .bind(output_tokens)
    .bind(duration_ms)
    .bind(error)
    .bind(now)
    .bind(model_name)
    .bind(provider_name)
    .bind(is_streaming)
    .execute(pool)
    .await
    .map_err(|e| AppError::Database(e.to_string()))?;

    Ok(id)
}

pub async fn list_agent_runs(
    pool: &PgPool,
    tenant_id: &str,
    agent_name: Option<&str>,
    limit: i64,
    offset: i64,
) -> Result<Vec<AgentRun>> {
    let rows = if let Some(name) = agent_name {
        sqlx::query(
            "SELECT id, tenant_id, agent_name, user_id, status, input_tokens, output_tokens, duration_ms, error, created_at
             FROM agent_runs WHERE tenant_id = $1 AND agent_name = $2
             ORDER BY created_at DESC LIMIT $3 OFFSET $4"
        )
        .bind(tenant_id)
        .bind(name)
        .bind(limit)
        .bind(offset)
        .fetch_all(pool)
        .await
    } else {
        sqlx::query(
            "SELECT id, tenant_id, agent_name, user_id, status, input_tokens, output_tokens, duration_ms, error, created_at
             FROM agent_runs WHERE tenant_id = $1
             ORDER BY created_at DESC LIMIT $2 OFFSET $3"
        )
        .bind(tenant_id)
        .bind(limit)
        .bind(offset)
        .fetch_all(pool)
        .await
    }
    .map_err(|e| AppError::Database(e.to_string()))?;

    rows.iter()
        .map(|row| {
            Ok(AgentRun {
                id: row.get("id"),
                tenant_id: row.get("tenant_id"),
                agent_name: row.get("agent_name"),
                user_id: row.get("user_id"),
                status: row.get("status"),
                input_tokens: row.get("input_tokens"),
                output_tokens: row.get("output_tokens"),
                duration_ms: row.get("duration_ms"),
                error: row.get("error"),
                created_at: row.get("created_at"),
            })
        })
        .collect()
}

pub async fn get_agent_run_stats(
    pool: &PgPool,
    tenant_id: &str,
    agent_name: &str,
) -> Result<AgentRunStats> {
    let row = sqlx::query(
        "SELECT
            COUNT(*) as total_runs,
            COUNT(*) FILTER (WHERE status = 'completed') as success_count,
            COUNT(*) FILTER (WHERE status = 'failed') as failed_count,
            COALESCE(AVG(duration_ms), 0)::BIGINT as avg_duration_ms,
            COALESCE(SUM(input_tokens), 0)::BIGINT as total_input_tokens,
            COALESCE(SUM(output_tokens), 0)::BIGINT as total_output_tokens
         FROM agent_runs WHERE tenant_id = $1 AND agent_name = $2",
    )
    .bind(tenant_id)
    .bind(agent_name)
    .fetch_one(pool)
    .await
    .map_err(|e| AppError::Database(e.to_string()))?;

    Ok(AgentRunStats {
        total_runs: row.get("total_runs"),
        success_count: row.get("success_count"),
        failed_count: row.get("failed_count"),
        avg_duration_ms: row.get("avg_duration_ms"),
        total_input_tokens: row.get("total_input_tokens"),
        total_output_tokens: row.get("total_output_tokens"),
    })
}

pub async fn get_platform_stats(pool: &PgPool) -> Result<PlatformStats> {
    let today_start = {
        let now = now_ts();
        now - (now % 86400)
    };

    let row = sqlx::query(
        "SELECT
            (SELECT COUNT(*) FROM tenants) as total_tenants,
            (SELECT COUNT(*) FROM tenant_agents) as total_agents,
            (SELECT COUNT(*) FROM agent_runs WHERE created_at >= $1) as total_runs_today,
            (SELECT COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT FROM agent_runs WHERE created_at >= $1) as total_tokens_today,
            (SELECT COUNT(*) FROM alerts WHERE resolved = FALSE) as active_alerts"
    )
    .bind(today_start)
    .fetch_one(pool)
    .await
    .map_err(|e| AppError::Database(e.to_string()))?;

    Ok(PlatformStats {
        total_tenants: row.get("total_tenants"),
        total_agents: row.get("total_agents"),
        total_runs_today: row.get("total_runs_today"),
        total_tokens_today: row.get("total_tokens_today"),
        active_alerts: row.get("active_alerts"),
    })
}

pub async fn list_all_agents(pool: &PgPool) -> Result<Vec<AllAgentsEntry>> {
    let rows = sqlx::query(
        "SELECT
            ta.tenant_id,
            t.name as tenant_name,
            ta.agent_name,
            ta.display_name,
            COALESCE(ta.config->>'model', 'unknown') as model,
            ta.enabled,
            COALESCE(ar.total_runs, 0) as total_runs,
            ar.last_run_at
         FROM tenant_agents ta
         JOIN tenants t ON t.id = ta.tenant_id
         LEFT JOIN (
            SELECT tenant_id, agent_name, COUNT(*) as total_runs, MAX(created_at) as last_run_at
            FROM agent_runs GROUP BY tenant_id, agent_name
         ) ar ON ar.tenant_id = ta.tenant_id AND ar.agent_name = ta.agent_name
         ORDER BY t.name, ta.agent_name",
    )
    .fetch_all(pool)
    .await
    .map_err(|e| AppError::Database(e.to_string()))?;

    rows.iter()
        .map(|row| {
            Ok(AllAgentsEntry {
                tenant_id: row.get("tenant_id"),
                tenant_name: row.get("tenant_name"),
                agent_name: row.get("agent_name"),
                display_name: row.get("display_name"),
                model: row.get("model"),
                enabled: row.get("enabled"),
                total_runs: row.get("total_runs"),
                last_run_at: row.get("last_run_at"),
            })
        })
        .collect()
}