use argentor_agent::evaluator::ResponseEvaluator;
use argentor_agent::guardrails::{GuardrailEngine, RuleSeverity as GuardrailSeverity};
use argentor_agent::prompt_manager::PromptManager;
use argentor_agent::{AgentRunner, ModelConfig, StreamEvent};
use argentor_memory::conversation::{ConversationMemory, ConversationSummarizer};
use argentor_orchestrator::workflow::{
lead_qualification_workflow, support_ticket_workflow, WorkflowEngine,
};
use argentor_security::audit::AuditOutcome;
use argentor_security::tenant_limits::{TenantLimitManager, TenantPlan};
use argentor_security::{AuditLog, PermissionSet};
use argentor_session::Session;
use argentor_skills::SkillRegistry;
use axum::{
extract::{Json, Path, State},
http::{HeaderMap, StatusCode},
response::{
sse::{Event as SseEvent, Sse},
IntoResponse,
},
routing::{get, post},
Router,
};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{error, info};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct XcapitAgentProfile {
pub role: String,
pub model: ModelConfig,
pub system_prompt: String,
}
pub fn default_xcapit_profiles() -> HashMap<String, XcapitAgentProfile> {
let mut profiles = HashMap::new();
profiles.insert("sales_qualifier".to_string(), XcapitAgentProfile {
role: "sales_qualifier".to_string(),
model: ModelConfig {
provider: argentor_agent::config::LlmProvider::Claude,
model_id: "claude-sonnet-4-6-20250514".to_string(),
api_key: String::new(), api_base_url: None,
temperature: 0.3,
max_tokens: 2048,
max_turns: 5,
max_context_tokens: 200_000,
fallback_models: vec![ModelConfig {
provider: argentor_agent::config::LlmProvider::OpenAi,
model_id: "gpt-4o-mini".to_string(),
api_key: String::new(),
api_base_url: None,
temperature: 0.3,
max_tokens: 2048,
max_turns: 5,
max_context_tokens: 200_000,
fallback_models: vec![],
retry_policy: None,
}],
retry_policy: None,
},
system_prompt: "Sos el agente de calificación de ventas de Xcapit. Evaluás leads usando ICP scoring (Region, C-Level, Afinidad, Score). Clasificás como hot (>=70), warm (>=45), cool (>=25), cold (<25). Para cada lead, respondé con: score, clasificación, acción recomendada, prioridad de outreach. Sé conciso y accionable.".to_string(),
});
profiles.insert("outreach_composer".to_string(), XcapitAgentProfile {
role: "outreach_composer".to_string(),
model: ModelConfig {
provider: argentor_agent::config::LlmProvider::Claude,
model_id: "claude-sonnet-4-6-20250514".to_string(),
api_key: String::new(),
api_base_url: None,
temperature: 0.7,
max_tokens: 4096,
max_turns: 5,
max_context_tokens: 200_000,
fallback_models: vec![ModelConfig {
provider: argentor_agent::config::LlmProvider::OpenAi,
model_id: "gpt-4o-mini".to_string(),
api_key: String::new(),
api_base_url: None,
temperature: 0.7,
max_tokens: 4096,
max_turns: 5,
max_context_tokens: 200_000,
fallback_models: vec![],
retry_policy: None,
}],
retry_policy: None,
},
system_prompt: "Sos el agente de outreach de Xcapit, empresa de tecnología financiera (inversión automatizada, gestión de activos digitales, DeFi). Componés mensajes personalizados según canal (email/linkedin/whatsapp), región (LATAM=español neutro, Iberia=español peninsular), seniority (C-Level=ROI estratégico, otros=operativo), y afinidad (HIGH=directo, MEDIUM=educativo, LOW=nurturing). Siempre incluí mensaje principal, variante A/B, y siguiente paso sugerido.".to_string(),
});
profiles.insert("support_responder".to_string(), XcapitAgentProfile {
role: "support_responder".to_string(),
model: ModelConfig {
provider: argentor_agent::config::LlmProvider::Claude,
model_id: "claude-sonnet-4-6-20250514".to_string(),
api_key: String::new(),
api_base_url: None,
temperature: 0.4,
max_tokens: 4096,
max_turns: 5,
max_context_tokens: 200_000,
fallback_models: vec![ModelConfig {
provider: argentor_agent::config::LlmProvider::OpenAi,
model_id: "gpt-4o-mini".to_string(),
api_key: String::new(),
api_base_url: None,
temperature: 0.4,
max_tokens: 4096,
max_turns: 5,
max_context_tokens: 200_000,
fallback_models: vec![],
retry_policy: None,
}],
retry_policy: None,
},
system_prompt: "Sos el agente de soporte al cliente de Xcapit (fintech, inversión automatizada, activos digitales). Resolvés tickets de forma empática, precisa y rápida. Si no tenés certeza, decilo. Si es tema de fondos/dinero, NUNCA dar instrucciones sin verificación. Si es bug, documentar pasos de reproducción. Siempre ofrecer siguiente paso claro. Español LATAM por defecto. Respondé con: respuesta al cliente, notas internas, y si hay que escalar.".to_string(),
});
profiles.insert("ticket_router".to_string(), XcapitAgentProfile {
role: "ticket_router".to_string(),
model: ModelConfig {
provider: argentor_agent::config::LlmProvider::Claude,
model_id: "claude-sonnet-4-6-20250514".to_string(),
api_key: String::new(),
api_base_url: None,
temperature: 0.2,
max_tokens: 1024,
max_turns: 3,
max_context_tokens: 200_000,
fallback_models: vec![ModelConfig {
provider: argentor_agent::config::LlmProvider::OpenAi,
model_id: "gpt-4o-mini".to_string(),
api_key: String::new(),
api_base_url: None,
temperature: 0.2,
max_tokens: 1024,
max_turns: 3,
max_context_tokens: 200_000,
fallback_models: vec![],
retry_policy: None,
}],
retry_policy: None,
},
system_prompt: "Clasificás tickets de soporte por categoría (billing, technical, account, crypto, general) y prioridad (urgent, high, medium, low). Respondé SOLO con JSON: {\"category\": \"...\", \"priority\": \"...\", \"confidence\": 0.0-1.0, \"requires_human_review\": true/false, \"reasoning\": \"...\"}."
.to_string(),
});
profiles
}
fn resolve_api_keys(config: &mut ModelConfig) {
if config.api_key.is_empty() {
let (env_var, provider_label) = match config.provider {
argentor_agent::config::LlmProvider::Claude
| argentor_agent::config::LlmProvider::ClaudeCode => {
("ANTHROPIC_API_KEY", "Anthropic/Claude")
}
argentor_agent::config::LlmProvider::OpenAi => ("OPENAI_API_KEY", "OpenAI"),
argentor_agent::config::LlmProvider::Gemini => ("GEMINI_API_KEY", "Gemini"),
_ => ("OPENAI_API_KEY", "OpenAI-compatible"),
};
match std::env::var(env_var) {
Ok(key) if !key.is_empty() => {
config.api_key = key;
}
_ => {
tracing::warn!(
provider = provider_label,
env_var = env_var,
model = %config.model_id,
"API key not found — {} provider will be unavailable. \
Set {} to enable it.",
provider_label,
env_var,
);
}
}
}
for fallback in &mut config.fallback_models {
resolve_api_keys(fallback);
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct XcapitSffHealth {
pub url: String,
pub status: String,
pub last_check: Option<DateTime<Utc>>,
pub last_error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct XcapitConfig {
pub url: String,
pub health_check_interval: u64,
pub allowed_webhook_sources: Vec<String>,
pub webhook_hmac_secret: String,
pub cors_origins: Vec<String>,
}
impl Default for XcapitConfig {
fn default() -> Self {
Self {
url: std::env::var("XCAPITSFF_URL")
.unwrap_or_else(|_| "http://localhost:8000".to_string()),
health_check_interval: 30,
allowed_webhook_sources: vec![
"hubspot".to_string(),
"salesforce".to_string(),
"stripe".to_string(),
"intercom".to_string(),
],
webhook_hmac_secret: std::env::var("WEBHOOK_SECRET").unwrap_or_else(|_| {
tracing::warn!(
"WEBHOOK_SECRET not set — webhook HMAC validation disabled. \
Set WEBHOOK_SECRET for production use."
);
String::new()
}),
cors_origins: std::env::var("XCAPITSFF_CORS_ORIGINS")
.map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
.unwrap_or_else(|_| {
let base = std::env::var("XCAPITSFF_URL")
.unwrap_or_else(|_| "http://localhost:8000".to_string());
vec![base, "http://xcapitsff:8000".to_string()]
}),
}
}
}
pub struct XcapitState {
pub profiles: HashMap<String, XcapitAgentProfile>,
pub skills: Arc<SkillRegistry>,
pub permissions: PermissionSet,
pub audit: Arc<AuditLog>,
pub config: XcapitConfig,
pub xcapitsff_health: Arc<RwLock<XcapitSffHealth>>,
pub http_client: reqwest::Client,
pub usage_tracker: TenantUsageTracker,
pub personas: Arc<RwLock<HashMap<(String, String), PersonaConfig>>>,
pub guardrails: GuardrailEngine,
pub tenant_limits: TenantLimitManager,
pub conversation_memory: ConversationMemory,
pub prompt_manager: PromptManager,
pub analytics: crate::analytics::AnalyticsEngine,
pub workflow_engine: WorkflowEngine,
}
impl XcapitState {
pub fn new(
skills: Arc<SkillRegistry>,
permissions: PermissionSet,
audit: Arc<AuditLog>,
config: XcapitConfig,
) -> Self {
let health = XcapitSffHealth {
url: config.url.clone(),
status: "unknown".to_string(),
last_check: None,
last_error: None,
};
Self {
profiles: default_xcapit_profiles(),
skills,
permissions,
audit,
config,
xcapitsff_health: Arc::new(RwLock::new(health)),
http_client: reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.unwrap_or_default(),
usage_tracker: TenantUsageTracker::new(),
personas: Arc::new(RwLock::new(HashMap::new())),
guardrails: GuardrailEngine::new(),
tenant_limits: TenantLimitManager::new(),
conversation_memory: ConversationMemory::new(),
prompt_manager: {
let pm = PromptManager::new();
argentor_agent::prompt_manager::register_xcapit_templates(&pm);
pm
},
analytics: crate::analytics::AnalyticsEngine::new(),
workflow_engine: WorkflowEngine::new(),
}
}
pub async fn init_workflows(&self) {
self.workflow_engine
.register_workflow(lead_qualification_workflow())
.await;
self.workflow_engine
.register_workflow(support_ticket_workflow())
.await;
}
pub fn start_health_loop(self: &Arc<Self>) {
let state = self.clone();
let interval = std::time::Duration::from_secs(state.config.health_check_interval);
tokio::spawn(async move {
loop {
let url = format!("{}/health", state.config.url);
let result = state.http_client.get(&url).send().await;
let mut health = state.xcapitsff_health.write().await;
match result {
Ok(resp) if resp.status().is_success() => {
health.status = "ok".to_string();
health.last_check = Some(Utc::now());
health.last_error = None;
}
Ok(resp) => {
health.status = "degraded".to_string();
health.last_check = Some(Utc::now());
health.last_error = Some(format!("HTTP {}", resp.status()));
}
Err(e) => {
health.status = "unreachable".to_string();
health.last_check = Some(Utc::now());
health.last_error = Some(e.to_string());
}
}
tokio::time::sleep(interval).await;
}
});
}
}
#[derive(Debug, Deserialize)]
pub struct RunTaskRequest {
pub agent_role: String,
pub system_prompt: Option<String>,
pub context: String,
pub session_id: Option<String>,
pub max_tokens: Option<u32>,
pub temperature: Option<f32>,
pub tenant_id: Option<String>,
pub routing_hint: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct RunTaskResponse {
pub response: String,
pub session_id: String,
pub model_used: String,
pub tokens_input: u64,
pub tokens_output: u64,
pub tool_calls: Vec<String>,
pub compliance_flags: Vec<String>,
pub duration_ms: u64,
}
#[derive(Debug, Deserialize)]
pub struct BatchTaskItem {
pub agent_role: String,
pub context: String,
pub system_prompt: Option<String>,
pub max_tokens: Option<u32>,
}
#[derive(Debug, Deserialize)]
pub struct BatchRequest {
pub tasks: Vec<BatchTaskItem>,
pub max_concurrent: Option<usize>,
}
#[derive(Debug, Serialize)]
pub struct BatchResultItem {
pub index: usize,
pub success: bool,
pub response: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
pub tokens_input: u64,
pub tokens_output: u64,
}
#[derive(Debug, Serialize)]
pub struct BatchResponse {
pub results: Vec<BatchResultItem>,
pub total: usize,
pub succeeded: usize,
pub failed: usize,
pub total_tokens: u64,
pub total_duration_ms: u64,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct WebhookProxyRequest {
pub event: String,
pub data: serde_json::Value,
pub source: String,
}
#[derive(Debug, Serialize)]
pub struct WebhookProxyResponse {
pub forwarded: bool,
pub upstream_status: Option<u16>,
pub audit_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct ExtendedHealthResponse {
pub status: String,
pub version: String,
pub checks: HealthChecks,
pub xcapitsff: XcapitSffHealth,
}
#[derive(Debug, Serialize)]
pub struct HealthChecks {
pub llm_backends: String,
pub xcapitsff: String,
pub compliance: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UsageRecord {
pub agent_role: String,
pub model: String,
pub tokens_in: u64,
pub tokens_out: u64,
pub cost_usd: f64,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone, Deserialize)]
pub enum UsagePeriod {
#[serde(rename = "hours")]
Hours(u64),
#[serde(rename = "days")]
Days(u64),
#[serde(rename = "all")]
All,
}
impl Default for UsagePeriod {
fn default() -> Self {
Self::All
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UsageBreakdown {
pub tenant_id: String,
pub total_tokens_in: u64,
pub total_tokens_out: u64,
pub total_cost_usd: f64,
pub request_count: usize,
pub by_agent: HashMap<String, AgentUsageSummary>,
pub by_model: HashMap<String, ModelUsageSummary>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct AgentUsageSummary {
pub tokens_in: u64,
pub tokens_out: u64,
pub cost_usd: f64,
pub count: usize,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ModelUsageSummary {
pub tokens_in: u64,
pub tokens_out: u64,
pub cost_usd: f64,
pub count: usize,
}
pub struct TenantUsageTracker {
records: Arc<RwLock<HashMap<String, Vec<UsageRecord>>>>,
}
impl TenantUsageTracker {
pub fn new() -> Self {
Self {
records: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn record(
&self,
tenant_id: &str,
agent_role: &str,
model: &str,
tokens_in: u64,
tokens_out: u64,
cost_usd: f64,
) {
let record = UsageRecord {
agent_role: agent_role.to_string(),
model: model.to_string(),
tokens_in,
tokens_out,
cost_usd,
timestamp: Utc::now(),
};
let mut records = self.records.write().await;
records
.entry(tenant_id.to_string())
.or_default()
.push(record);
}
pub async fn get_usage(&self, tenant_id: &str, period: &UsagePeriod) -> UsageBreakdown {
let records = self.records.read().await;
let empty = vec![];
let tenant_records = records.get(tenant_id).unwrap_or(&empty);
let cutoff = match period {
UsagePeriod::Hours(h) => Some(Utc::now() - chrono::Duration::hours(*h as i64)),
UsagePeriod::Days(d) => Some(Utc::now() - chrono::Duration::days(*d as i64)),
UsagePeriod::All => None,
};
let filtered: Vec<&UsageRecord> = tenant_records
.iter()
.filter(|r| match cutoff {
Some(ref c) => r.timestamp >= *c,
None => true,
})
.collect();
let mut by_agent: HashMap<String, AgentUsageSummary> = HashMap::new();
let mut by_model: HashMap<String, ModelUsageSummary> = HashMap::new();
let mut total_tokens_in: u64 = 0;
let mut total_tokens_out: u64 = 0;
let mut total_cost_usd: f64 = 0.0;
for r in &filtered {
total_tokens_in += r.tokens_in;
total_tokens_out += r.tokens_out;
total_cost_usd += r.cost_usd;
let agent_entry = by_agent.entry(r.agent_role.clone()).or_default();
agent_entry.tokens_in += r.tokens_in;
agent_entry.tokens_out += r.tokens_out;
agent_entry.cost_usd += r.cost_usd;
agent_entry.count += 1;
let model_entry = by_model.entry(r.model.clone()).or_default();
model_entry.tokens_in += r.tokens_in;
model_entry.tokens_out += r.tokens_out;
model_entry.cost_usd += r.cost_usd;
model_entry.count += 1;
}
UsageBreakdown {
tenant_id: tenant_id.to_string(),
total_tokens_in,
total_tokens_out,
total_cost_usd,
request_count: filtered.len(),
by_agent,
by_model,
}
}
}
impl Default for TenantUsageTracker {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersonaConfig {
pub name: String,
pub tone: String,
pub language_style: String,
#[serde(default)]
pub signature: String,
#[serde(default)]
pub custom_instructions: String,
}
#[derive(Debug, Deserialize)]
pub struct CreatePersonaRequest {
pub tenant_id: String,
pub agent_role: String,
pub persona: PersonaConfig,
}
#[derive(Debug, Serialize)]
pub struct CreatePersonaResponse {
pub created: bool,
pub tenant_id: String,
pub agent_role: String,
pub persona_name: String,
}
#[derive(Debug, Serialize)]
pub struct ListPersonasResponse {
pub tenant_id: String,
pub personas: HashMap<String, PersonaConfig>,
}
#[derive(Debug, Deserialize)]
pub struct EvaluateRequest {
pub response: String,
pub context: String,
#[serde(default)]
pub criteria: Vec<String>,
}
#[derive(Debug, Serialize)]
pub struct EvaluateResponse {
pub overall_score: f32,
pub by_criteria: HashMap<String, f32>,
pub suggestions: Vec<String>,
}
#[derive(Debug, Deserialize)]
pub struct UsageQueryParams {
#[serde(default)]
pub period: Option<String>,
}
pub fn xcapitsff_router(state: Arc<XcapitState>) -> Router {
Router::new()
.route("/api/v1/agent/run-task", post(run_task_handler))
.route(
"/api/v1/agent/run-task-stream",
post(run_task_stream_handler),
)
.route("/api/v1/agent/batch", post(batch_handler))
.route("/api/v1/agent/evaluate", post(evaluate_handler))
.route("/api/v1/agent/personas", post(create_persona_handler))
.route(
"/api/v1/agent/personas/{tenant_id}",
get(list_personas_handler),
)
.route("/api/v1/agent/profiles", get(list_profiles_handler))
.route("/api/v1/proxy/webhook", post(webhook_proxy_handler))
.route(
"/api/v1/usage/tenant/{tenant_id}",
get(tenant_usage_handler),
)
.route("/api/v1/health", get(extended_health_handler))
.route(
"/api/v1/tenants/{tenant_id}/register",
post(register_tenant_handler),
)
.route(
"/api/v1/tenants/{tenant_id}/status",
get(tenant_status_handler),
)
.route("/api/v1/workflows/runs", get(list_workflow_runs_handler))
.with_state(state)
}
fn resolve_routing_hint(hint: &str) -> Option<(String, argentor_agent::config::LlmProvider)> {
match hint {
"fast_cheap" => Some((
"gpt-4o-mini".to_string(),
argentor_agent::config::LlmProvider::OpenAi,
)),
"balanced" => None, "quality_max" => Some((
"claude-opus-4-6-20250514".to_string(),
argentor_agent::config::LlmProvider::Claude,
)),
_ => None,
}
}
fn estimate_cost_usd(model_id: &str, tokens_in: u64, tokens_out: u64) -> f64 {
let (in_price, out_price) = match model_id {
m if m.contains("gpt-4o-mini") => (0.15, 0.60),
m if m.contains("opus") => (15.0, 75.0),
m if m.contains("sonnet") => (3.0, 15.0),
_ => (3.0, 15.0), };
(tokens_in as f64 * in_price + tokens_out as f64 * out_price) / 1_000_000.0
}
async fn run_task_handler(
State(state): State<Arc<XcapitState>>,
headers: HeaderMap,
Json(req): Json<RunTaskRequest>,
) -> impl IntoResponse {
let start = Instant::now();
let mut compliance_flags: Vec<String> = Vec::new();
let tenant_id = req.tenant_id.clone().or_else(|| {
headers
.get("X-Tenant-ID")
.and_then(|v| v.to_str().ok())
.map(String::from)
});
if let Some(ref tid) = tenant_id {
let check = state.tenant_limits.check_request(tid);
if !check.allowed {
let reason = check.reason.unwrap_or_else(|| "rate_limited".to_string());
compliance_flags.push(format!("rate_limited:{reason}"));
return (
StatusCode::TOO_MANY_REQUESTS,
Json(serde_json::json!({
"error": format!("Rate limit exceeded: {reason}"),
"tenant_id": tid,
"retry_after_seconds": 60,
})),
)
.into_response();
}
}
let input_check = state.guardrails.check_input(&req.context);
if !input_check.passed {
let blocking: Vec<String> = input_check
.violations
.iter()
.filter(|v| v.severity == GuardrailSeverity::Block)
.map(|v| format!("{}: {}", v.rule_name, v.message))
.collect();
if !blocking.is_empty() {
compliance_flags.push("input_blocked".to_string());
state.audit.log_action(
Uuid::new_v4(),
"guardrail_blocked",
Some(req.agent_role.clone()),
serde_json::json!({"violations": blocking}),
AuditOutcome::Error,
);
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": "Input blocked by guardrails",
"violations": blocking,
})),
)
.into_response();
}
for v in &input_check.violations {
compliance_flags.push(format!("input_warn:{}", v.rule_name));
}
}
let safe_context = input_check
.sanitized_text
.unwrap_or_else(|| req.context.clone());
let profile = match state.profiles.get(&req.agent_role) {
Some(p) => p.clone(),
None => {
let available: Vec<&String> = state.profiles.keys().collect();
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": format!("Unknown agent_role: '{}'. Available: {:?}", req.agent_role, available)
})),
)
.into_response();
}
};
let mut model_config = profile.model.clone();
resolve_api_keys(&mut model_config);
if let Some(max_tokens) = req.max_tokens {
model_config.max_tokens = max_tokens;
}
if let Some(temp) = req.temperature {
model_config.temperature = temp;
}
if let Some(ref hint) = req.routing_hint {
if let Some((model_id, provider)) = resolve_routing_hint(hint) {
model_config.model_id = model_id;
model_config.provider = provider;
resolve_api_keys(&mut model_config);
}
}
let mut system_prompt = req
.system_prompt
.unwrap_or_else(|| profile.system_prompt.clone());
if let Some(ref tid) = tenant_id {
let personas = state.personas.read().await;
if let Some(persona) = personas.get(&(tid.clone(), req.agent_role.clone())) {
let persona_prefix = format!(
"[Persona: {} | Tono: {} | Estilo: {}]\n{}\n\n",
persona.name, persona.tone, persona.language_style, persona.custom_instructions
);
system_prompt = format!("{persona_prefix}{system_prompt}");
}
}
let customer_id = headers
.get("X-Customer-ID")
.and_then(|v| v.to_str().ok())
.map(String::from);
if let Some(ref cid) = customer_id {
let context_str = ConversationSummarizer::build_context(
&state.conversation_memory,
cid,
500, )
.await;
if !context_str.is_empty() {
system_prompt = format!("{system_prompt}\n\n{context_str}");
}
}
let model_id = model_config.model_id.clone();
let runner = AgentRunner::new(
model_config,
state.skills.clone(),
state.permissions.clone(),
state.audit.clone(),
)
.with_system_prompt(&system_prompt);
let mut session = Session::new();
let session_id = req.session_id.unwrap_or_else(|| session.id.to_string());
state.audit.log_action(
session.id,
"xcapitsff_run_task",
Some(req.agent_role.clone()),
serde_json::json!({
"role": req.agent_role,
"context_len": safe_context.len(),
"tenant_id": tenant_id,
"customer_id": customer_id,
"model": model_id,
}),
AuditOutcome::Success,
);
info!(role = %req.agent_role, session_id = %session_id, "XcapitSFF run-task started (full pipeline)");
let result = runner.run(&mut session, &safe_context).await;
let duration_ms = start.elapsed().as_millis() as u64;
match result {
Ok(mut response) => {
let output_check = state
.guardrails
.check_output(&response, Some(&safe_context));
if !output_check.passed {
for v in &output_check.violations {
compliance_flags.push(format!("output_{:?}:{}", v.severity, v.rule_name));
}
if let Some(sanitized) = output_check.sanitized_text {
response = sanitized;
}
}
let tokens_input = (safe_context.len() / 4) as u64;
let tokens_output = (response.len() / 4) as u64;
let evaluator = ResponseEvaluator::with_defaults();
let quality_score = evaluator.evaluate_heuristic(&safe_context, &response, &[]);
let quality = quality_score.overall;
if quality < 0.5 {
compliance_flags.push(format!("low_quality:{quality:.2}"));
}
if let Some(ref cid) = customer_id {
state
.conversation_memory
.record_turn(cid, &session_id, "user", &safe_context, HashMap::new())
.await;
let mut meta = HashMap::new();
meta.insert("agent_role".to_string(), req.agent_role.clone());
meta.insert("model".to_string(), model_id.clone());
meta.insert("quality".to_string(), format!("{quality:.2}"));
state
.conversation_memory
.record_turn(cid, &session_id, "assistant", &response, meta)
.await;
}
if let Some(ref tid) = tenant_id {
let cost = estimate_cost_usd(&model_id, tokens_input, tokens_output);
state
.usage_tracker
.record(
tid,
&req.agent_role,
&model_id,
tokens_input,
tokens_output,
cost,
)
.await;
state
.tenant_limits
.record_usage(tid, tokens_input, tokens_output, cost);
state
.analytics
.record_interaction(crate::analytics::InteractionEvent {
tenant_id: tid.clone(),
agent_role: req.agent_role.clone(),
channel: "api".to_string(),
customer_id: customer_id.clone(),
outcome: crate::analytics::InteractionOutcome::Resolved,
duration_ms,
tokens_used: tokens_input + tokens_output,
timestamp: Utc::now(),
})
.await;
state
.analytics
.record_quality_score(crate::analytics::QualityEvent {
tenant_id: tid.clone(),
agent_role: req.agent_role.clone(),
overall_score: quality,
criteria_scores: HashMap::new(),
timestamp: Utc::now(),
})
.await;
}
state.audit.log_action(
session.id,
"xcapitsff_run_task_complete",
Some(req.agent_role.clone()),
serde_json::json!({
"duration_ms": duration_ms,
"tokens_input": tokens_input,
"tokens_output": tokens_output,
"quality_score": quality,
"compliance_flags": compliance_flags,
}),
AuditOutcome::Success,
);
if req.agent_role == "sales_qualifier" {
let response_upper = response.to_uppercase();
if response_upper.contains("HOT") || response_upper.contains("\u{1f525}") {
let trigger_data = serde_json::json!({
"agent_role": "sales_qualifier",
"tenant_id": tenant_id,
"customer_id": customer_id,
"qualification_result": &response,
"session_id": &session_id,
});
if let Some(run_id) = state
.workflow_engine
.start("lead_qualification", trigger_data)
.await
{
info!(workflow = "lead_qualification", run_id = %run_id, "Auto-triggered lead qualification workflow");
compliance_flags
.push(format!("workflow_triggered:lead_qualification:{run_id}"));
}
}
} else if req.agent_role == "ticket_router" {
let response_lower = response.to_lowercase();
if response_lower.contains("\"priority\":\"urgent\"")
|| response_lower.contains("\"priority\": \"urgent\"")
{
let trigger_data = serde_json::json!({
"agent_role": "ticket_router",
"tenant_id": tenant_id,
"routing_result": &response,
"session_id": &session_id,
});
if let Some(run_id) = state
.workflow_engine
.start("support_ticket", trigger_data)
.await
{
info!(workflow = "support_ticket", run_id = %run_id, "Auto-triggered support ticket workflow");
compliance_flags
.push(format!("workflow_triggered:support_ticket:{run_id}"));
}
}
}
info!(role = %req.agent_role, duration_ms, quality, "XcapitSFF run-task completed (full pipeline)");
(
StatusCode::OK,
Json(
serde_json::to_value(RunTaskResponse {
response,
session_id,
model_used: model_id,
tokens_input,
tokens_output,
tool_calls: vec![],
compliance_flags,
duration_ms,
})
.unwrap_or_default(),
),
)
.into_response()
}
Err(e) => {
error!(role = %req.agent_role, error = %e, "XcapitSFF run-task failed");
if let Some(ref tid) = tenant_id {
state
.analytics
.record_interaction(crate::analytics::InteractionEvent {
tenant_id: tid.clone(),
agent_role: req.agent_role.clone(),
channel: "api".to_string(),
customer_id: customer_id.clone(),
outcome: crate::analytics::InteractionOutcome::Escalated,
duration_ms,
tokens_used: 0,
timestamp: Utc::now(),
})
.await;
}
state.audit.log_action(
session.id,
"xcapitsff_run_task_error",
Some(req.agent_role),
serde_json::json!({"error": e.to_string(), "duration_ms": duration_ms}),
AuditOutcome::Error,
);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": e.to_string(),
"duration_ms": duration_ms,
})),
)
.into_response()
}
}
}
async fn batch_handler(
State(state): State<Arc<XcapitState>>,
Json(req): Json<BatchRequest>,
) -> impl IntoResponse {
let start = Instant::now();
let max_concurrent = req.max_concurrent.unwrap_or(5);
let total = req.tasks.len();
info!(total, max_concurrent, "XcapitSFF batch started");
let semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent));
let mut handles = Vec::with_capacity(total);
for (index, task) in req.tasks.into_iter().enumerate() {
let state = state.clone();
let sem = semaphore.clone();
let handle = tokio::spawn(async move {
let _permit = sem.acquire().await;
let input_check = state.guardrails.check_input(&task.context);
if !input_check.passed {
let blocking: Vec<String> = input_check
.violations
.iter()
.filter(|v| v.severity == GuardrailSeverity::Block)
.map(|v| v.message.clone())
.collect();
if !blocking.is_empty() {
return BatchResultItem {
index,
success: false,
response: String::new(),
error: Some(format!("Guardrail blocked: {}", blocking.join("; "))),
tokens_input: 0,
tokens_output: 0,
};
}
}
let safe_context = input_check
.sanitized_text
.unwrap_or_else(|| task.context.clone());
let profile = match state.profiles.get(&task.agent_role) {
Some(p) => p.clone(),
None => {
return BatchResultItem {
index,
success: false,
response: String::new(),
error: Some(format!("Unknown role: {}", task.agent_role)),
tokens_input: 0,
tokens_output: 0,
};
}
};
let mut model_config = profile.model.clone();
resolve_api_keys(&mut model_config);
if let Some(mt) = task.max_tokens {
model_config.max_tokens = mt;
}
let system_prompt = task
.system_prompt
.unwrap_or_else(|| profile.system_prompt.clone());
let runner = AgentRunner::new(
model_config,
state.skills.clone(),
state.permissions.clone(),
state.audit.clone(),
)
.with_system_prompt(&system_prompt);
let mut session = Session::new();
match runner.run(&mut session, &safe_context).await {
Ok(mut response) => {
let output_check = state
.guardrails
.check_output(&response, Some(&safe_context));
if let Some(sanitized) = output_check.sanitized_text {
response = sanitized;
}
let ti = (safe_context.len() / 4) as u64;
let to = (response.len() / 4) as u64;
BatchResultItem {
index,
success: true,
response,
error: None,
tokens_input: ti,
tokens_output: to,
}
}
Err(e) => BatchResultItem {
index,
success: false,
response: String::new(),
error: Some(e.to_string()),
tokens_input: 0,
tokens_output: 0,
},
}
});
handles.push(handle);
}
let mut results = Vec::with_capacity(total);
for handle in handles {
match handle.await {
Ok(item) => results.push(item),
Err(e) => results.push(BatchResultItem {
index: results.len(),
success: false,
response: String::new(),
error: Some(format!("Task panicked: {e}")),
tokens_input: 0,
tokens_output: 0,
}),
}
}
results.sort_by_key(|r| r.index);
let succeeded = results.iter().filter(|r| r.success).count();
let failed = total - succeeded;
let total_tokens: u64 = results
.iter()
.map(|r| r.tokens_input + r.tokens_output)
.sum();
let total_duration_ms = start.elapsed().as_millis() as u64;
info!(
total,
succeeded, failed, total_duration_ms, "XcapitSFF batch completed"
);
(
StatusCode::OK,
Json(
serde_json::to_value(BatchResponse {
results,
total,
succeeded,
failed,
total_tokens,
total_duration_ms,
})
.unwrap_or_default(),
),
)
.into_response()
}
async fn webhook_proxy_handler(
State(state): State<Arc<XcapitState>>,
headers: HeaderMap,
Json(req): Json<WebhookProxyRequest>,
) -> impl IntoResponse {
let audit_id = Uuid::new_v4().to_string();
if !state.config.allowed_webhook_sources.contains(&req.source) {
state.audit.log_action(
Uuid::new_v4(),
"webhook_proxy_rejected",
Some(req.source.clone()),
serde_json::json!({"event": req.event, "reason": "source_not_allowed"}),
AuditOutcome::Error,
);
return (
StatusCode::FORBIDDEN,
Json(
serde_json::to_value(WebhookProxyResponse {
forwarded: false,
upstream_status: None,
audit_id,
error: Some(format!("Source '{}' not in allowed list", req.source)),
})
.unwrap_or_default(),
),
)
.into_response();
}
if !state.config.webhook_hmac_secret.is_empty() {
let provided_sig = headers
.get("x-webhook-secret")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if provided_sig.is_empty() {
state.audit.log_action(
Uuid::new_v4(),
"webhook_proxy_rejected",
Some(req.source.clone()),
serde_json::json!({"event": req.event, "reason": "missing_hmac"}),
AuditOutcome::Error,
);
return (
StatusCode::UNAUTHORIZED,
Json(serde_json::json!({
"forwarded": false,
"audit_id": audit_id,
"error": "Missing X-Webhook-Secret header",
})),
)
.into_response();
}
use sha2::Digest;
let body_bytes = serde_json::to_vec(&req).unwrap_or_default();
let mut mac = sha2::Sha256::new();
mac.update(state.config.webhook_hmac_secret.as_bytes());
mac.update(&body_bytes);
let expected = hex::encode(mac.finalize());
if provided_sig != expected {
state.audit.log_action(
Uuid::new_v4(),
"webhook_proxy_rejected",
Some(req.source.clone()),
serde_json::json!({"event": req.event, "reason": "invalid_hmac"}),
AuditOutcome::Error,
);
return (
StatusCode::UNAUTHORIZED,
Json(serde_json::json!({
"forwarded": false,
"audit_id": audit_id,
"error": "Invalid HMAC signature",
})),
)
.into_response();
}
}
state.audit.log_action(
Uuid::new_v4(),
"webhook_proxy_received",
Some(req.source.clone()),
serde_json::json!({"event": &req.event, "source": &req.source}),
AuditOutcome::Success,
);
let forward_url = format!("{}/api/v1/webhooks/generic", state.config.url);
let result = state.http_client.post(&forward_url).json(&req).send().await;
match result {
Ok(resp) => {
let status = resp.status().as_u16();
let success = resp.status().is_success();
state.audit.log_action(
Uuid::new_v4(),
"webhook_proxy_forwarded",
Some(req.source.clone()),
serde_json::json!({
"event": req.event,
"upstream_status": status,
"success": success,
}),
if success {
AuditOutcome::Success
} else {
AuditOutcome::Error
},
);
info!(
source = %req.source,
event = %req.event,
upstream_status = status,
"Webhook proxied to XcapitSFF"
);
(
StatusCode::OK,
Json(
serde_json::to_value(WebhookProxyResponse {
forwarded: success,
upstream_status: Some(status),
audit_id,
error: if success {
None
} else {
Some(format!("Upstream returned HTTP {status}"))
},
})
.unwrap_or_default(),
),
)
.into_response()
}
Err(e) => {
error!(error = %e, "Failed to forward webhook to XcapitSFF");
state.audit.log_action(
Uuid::new_v4(),
"webhook_proxy_error",
Some(req.source),
serde_json::json!({"error": e.to_string()}),
AuditOutcome::Error,
);
(
StatusCode::BAD_GATEWAY,
Json(
serde_json::to_value(WebhookProxyResponse {
forwarded: false,
upstream_status: None,
audit_id,
error: Some(e.to_string()),
})
.unwrap_or_default(),
),
)
.into_response()
}
}
}
async fn extended_health_handler(State(state): State<Arc<XcapitState>>) -> impl IntoResponse {
let xcapitsff_health = state.xcapitsff_health.read().await.clone();
let overall = if xcapitsff_health.status == "ok" {
"ok"
} else {
"degraded"
};
let response = ExtendedHealthResponse {
status: overall.to_string(),
version: "0.1.0".to_string(),
checks: HealthChecks {
llm_backends: "ok".to_string(),
xcapitsff: xcapitsff_health.status.clone(),
compliance: "ok".to_string(),
},
xcapitsff: xcapitsff_health,
};
(
StatusCode::OK,
Json(serde_json::to_value(response).unwrap_or_default()),
)
.into_response()
}
async fn run_task_stream_handler(
State(state): State<Arc<XcapitState>>,
headers: HeaderMap,
Json(req): Json<RunTaskRequest>,
) -> impl IntoResponse {
let tenant_id = req.tenant_id.clone().or_else(|| {
headers
.get("X-Tenant-ID")
.and_then(|v| v.to_str().ok())
.map(String::from)
});
let profile = match state.profiles.get(&req.agent_role) {
Some(p) => p.clone(),
None => {
let available: Vec<String> = state.profiles.keys().cloned().collect();
let role = req.agent_role.clone();
return Sse::new(futures_util::stream::once(async move {
Ok::<_, Infallible>(
SseEvent::default()
.data(serde_json::json!({
"type": "error",
"message": format!("Unknown agent_role: '{}'. Available: {:?}", role, available)
}).to_string()),
)
}))
.into_response();
}
};
let mut model_config = profile.model.clone();
resolve_api_keys(&mut model_config);
if let Some(max_tokens) = req.max_tokens {
model_config.max_tokens = max_tokens;
}
if let Some(temp) = req.temperature {
model_config.temperature = temp;
}
if let Some(ref hint) = req.routing_hint {
if let Some((model_id, provider)) = resolve_routing_hint(hint) {
model_config.model_id = model_id;
model_config.provider = provider;
resolve_api_keys(&mut model_config);
}
}
let mut system_prompt = req
.system_prompt
.unwrap_or_else(|| profile.system_prompt.clone());
if let Some(ref tid) = tenant_id {
let personas = state.personas.read().await;
if let Some(persona) = personas.get(&(tid.clone(), req.agent_role.clone())) {
let persona_prefix = format!(
"[Persona: {} | Tono: {} | Estilo: {}]\n{}\n\n",
persona.name, persona.tone, persona.language_style, persona.custom_instructions
);
system_prompt = format!("{persona_prefix}{system_prompt}");
}
}
let model_id = model_config.model_id.clone();
let agent_role = req.agent_role.clone();
let context = req.context.clone();
let runner = AgentRunner::new(
model_config,
state.skills.clone(),
state.permissions.clone(),
state.audit.clone(),
)
.with_system_prompt(&system_prompt);
let mut session = Session::new();
let session_id = req.session_id.unwrap_or_else(|| session.id.to_string());
info!(role = %agent_role, session_id = %session_id, "XcapitSFF run-task-stream started");
let (event_tx, event_rx) = mpsc::unbounded_channel::<StreamEvent>();
let state_clone = state.clone();
let model_id_clone = model_id.clone();
let agent_role_clone = agent_role.clone();
let tenant_id_clone = tenant_id.clone();
let context_len = context.len();
tokio::spawn(async move {
let result = runner
.run_streaming(&mut session, &context, event_tx.clone())
.await;
match result {
Ok(response) => {
let tokens_input = (context_len / 4) as u64;
let tokens_output = (response.len() / 4) as u64;
if let Some(ref tid) = tenant_id_clone {
let cost = estimate_cost_usd(&model_id_clone, tokens_input, tokens_output);
state_clone
.usage_tracker
.record(
tid,
&agent_role_clone,
&model_id_clone,
tokens_input,
tokens_output,
cost,
)
.await;
}
info!(role = %agent_role_clone, "XcapitSFF run-task-stream completed");
}
Err(e) => {
error!(role = %agent_role_clone, error = %e, "XcapitSFF run-task-stream failed");
let _ = event_tx.send(StreamEvent::Error {
message: e.to_string(),
});
}
}
});
let rx_stream = UnboundedReceiverStream::new(event_rx);
let session_id_for_stream = session_id.clone();
let context_len_for_stream = context_len;
let sse_stream = futures_util::stream::unfold(
(
rx_stream,
session_id_for_stream,
context_len_for_stream,
0usize,
),
|(mut rx, sid, ctx_len, mut output_chars)| async move {
use futures_util::StreamExt;
match rx.next().await {
Some(StreamEvent::TextDelta { text }) => {
output_chars += text.len();
let data = serde_json::json!({
"type": "token",
"content": text,
});
let event = Ok::<_, Infallible>(SseEvent::default().data(data.to_string()));
Some((event, (rx, sid, ctx_len, output_chars)))
}
Some(StreamEvent::Done) => {
let tokens_input = (ctx_len / 4) as u64;
let tokens_output = (output_chars / 4) as u64;
let data = serde_json::json!({
"type": "done",
"session_id": sid,
"tokens_input": tokens_input,
"tokens_output": tokens_output,
});
let event = Ok::<_, Infallible>(SseEvent::default().data(data.to_string()));
Some((event, (rx, sid, ctx_len, output_chars)))
}
Some(StreamEvent::Error { message }) => {
let data = serde_json::json!({
"type": "error",
"message": message,
});
let event = Ok::<_, Infallible>(SseEvent::default().data(data.to_string()));
Some((event, (rx, sid, ctx_len, output_chars)))
}
Some(StreamEvent::ToolCallStart { id, name }) => {
let data = serde_json::json!({
"type": "tool_call_start",
"id": id,
"name": name,
});
let event = Ok::<_, Infallible>(SseEvent::default().data(data.to_string()));
Some((event, (rx, sid, ctx_len, output_chars)))
}
Some(StreamEvent::ToolCallDelta {
id,
arguments_delta,
}) => {
let data = serde_json::json!({
"type": "tool_call_delta",
"id": id,
"arguments_delta": arguments_delta,
});
let event = Ok::<_, Infallible>(SseEvent::default().data(data.to_string()));
Some((event, (rx, sid, ctx_len, output_chars)))
}
Some(StreamEvent::ToolCallEnd { id }) => {
let data = serde_json::json!({
"type": "tool_call_end",
"id": id,
});
let event = Ok::<_, Infallible>(SseEvent::default().data(data.to_string()));
Some((event, (rx, sid, ctx_len, output_chars)))
}
None => None, }
},
);
Sse::new(sse_stream).into_response()
}
fn parse_usage_period(input: &str) -> UsagePeriod {
if input == "all" || input.is_empty() {
return UsagePeriod::All;
}
if let Some(hours) = input.strip_prefix("hours:") {
if let Ok(h) = hours.parse::<u64>() {
return UsagePeriod::Hours(h);
}
}
if let Some(days) = input.strip_prefix("days:") {
if let Ok(d) = days.parse::<u64>() {
return UsagePeriod::Days(d);
}
}
UsagePeriod::All
}
async fn tenant_usage_handler(
State(state): State<Arc<XcapitState>>,
Path(tenant_id): Path<String>,
axum::extract::Query(params): axum::extract::Query<UsageQueryParams>,
) -> impl IntoResponse {
let period = parse_usage_period(¶ms.period.unwrap_or_default());
let usage = state.usage_tracker.get_usage(&tenant_id, &period).await;
info!(tenant_id = %tenant_id, "Tenant usage queried");
(
StatusCode::OK,
Json(serde_json::to_value(usage).unwrap_or_default()),
)
.into_response()
}
async fn evaluate_handler(
State(_state): State<Arc<XcapitState>>,
Json(req): Json<EvaluateRequest>,
) -> impl IntoResponse {
let evaluator = ResponseEvaluator::with_defaults();
let quality = evaluator.evaluate_heuristic(&req.context, &req.response, &[]);
let mut by_criteria: HashMap<String, f32> = HashMap::new();
let criteria = if req.criteria.is_empty() {
vec![
"relevance".to_string(),
"helpfulness".to_string(),
"accuracy".to_string(),
"tone".to_string(),
]
} else {
req.criteria.clone()
};
for criterion in &criteria {
let score = match criterion.as_str() {
"relevance" => quality.relevance,
"accuracy" | "consistency" => quality.consistency,
"completeness" => quality.completeness,
"clarity" => quality.clarity,
"helpfulness" => {
(quality.relevance * 0.5 + quality.completeness * 0.5).min(1.0)
}
"tone" => {
score_tone(&req.response)
}
_ => 0.5, };
by_criteria.insert(criterion.clone(), score);
}
let overall_score = if by_criteria.is_empty() {
quality.overall
} else {
let sum: f32 = by_criteria.values().sum();
sum / by_criteria.len() as f32
};
let mut suggestions = Vec::new();
for (criterion, &score) in &by_criteria {
if score < 0.5 {
suggestions.push(format!(
"Improve {criterion}: current score is {score:.2}, consider enhancing this aspect."
));
}
}
if suggestions.is_empty() && overall_score < 0.7 {
suggestions.push(
"Response quality is below threshold. Consider providing more detail and structure."
.to_string(),
);
}
let response = EvaluateResponse {
overall_score,
by_criteria,
suggestions,
};
info!(overall_score, "Response evaluation completed");
(
StatusCode::OK,
Json(serde_json::to_value(response).unwrap_or_default()),
)
.into_response()
}
fn score_tone(response: &str) -> f32 {
let lower = response.to_lowercase();
let mut score: f32 = 0.5;
let polite_markers = [
"por favor",
"gracias",
"please",
"thank you",
"happy to help",
"con gusto",
"espero que",
"hope this helps",
"let me know",
"no dudes en",
"feel free",
];
for marker in &polite_markers {
if lower.contains(marker) {
score += 0.1;
}
}
let negative_markers = ["error", "wrong", "bad", "terrible", "stupid", "idiota"];
for marker in &negative_markers {
if lower.contains(marker) {
score -= 0.1;
}
}
score.clamp(0.0, 1.0)
}
async fn create_persona_handler(
State(state): State<Arc<XcapitState>>,
Json(req): Json<CreatePersonaRequest>,
) -> impl IntoResponse {
let key = (req.tenant_id.clone(), req.agent_role.clone());
let persona_name = req.persona.name.clone();
let mut personas = state.personas.write().await;
personas.insert(key, req.persona);
info!(
tenant_id = %req.tenant_id,
agent_role = %req.agent_role,
persona_name = %persona_name,
"Persona created/updated"
);
let response = CreatePersonaResponse {
created: true,
tenant_id: req.tenant_id,
agent_role: req.agent_role,
persona_name,
};
(
StatusCode::CREATED,
Json(serde_json::to_value(response).unwrap_or_default()),
)
.into_response()
}
async fn list_personas_handler(
State(state): State<Arc<XcapitState>>,
Path(tenant_id): Path<String>,
) -> impl IntoResponse {
let personas = state.personas.read().await;
let tenant_personas: HashMap<String, PersonaConfig> = personas
.iter()
.filter(|((tid, _), _)| tid == &tenant_id)
.map(|((_, role), persona)| (role.clone(), persona.clone()))
.collect();
info!(tenant_id = %tenant_id, count = tenant_personas.len(), "Listed tenant personas");
let response = ListPersonasResponse {
tenant_id,
personas: tenant_personas,
};
(
StatusCode::OK,
Json(serde_json::to_value(response).unwrap_or_default()),
)
.into_response()
}
async fn list_profiles_handler(State(state): State<Arc<XcapitState>>) -> impl IntoResponse {
let profiles: Vec<serde_json::Value> = state
.profiles
.iter()
.map(|(role, profile)| {
serde_json::json!({
"role": role,
"model": profile.model.model_id,
"temperature": profile.model.temperature,
"max_tokens": profile.model.max_tokens,
"system_prompt_preview": &profile.system_prompt[..profile.system_prompt.len().min(100)],
"has_fallback": !profile.model.fallback_models.is_empty(),
})
})
.collect();
(
StatusCode::OK,
Json(serde_json::json!({
"profiles": profiles,
"total": profiles.len(),
})),
)
.into_response()
}
async fn register_tenant_handler(
State(state): State<Arc<XcapitState>>,
Path(tenant_id): Path<String>,
Json(body): Json<serde_json::Value>,
) -> impl IntoResponse {
let plan_name = body["plan"].as_str().unwrap_or("free");
let plan = match plan_name {
"free" => TenantPlan::Free,
"pro" => TenantPlan::Pro,
"enterprise" => TenantPlan::Enterprise,
_ => TenantPlan::Free,
};
state.tenant_limits.register_tenant(&tenant_id, plan);
state.audit.log_action(
Uuid::new_v4(),
"tenant_registered",
None,
serde_json::json!({"tenant_id": &tenant_id, "plan": plan_name}),
AuditOutcome::Success,
);
info!(tenant_id = %tenant_id, plan = %plan_name, "Tenant registered");
(
StatusCode::CREATED,
Json(serde_json::json!({
"tenant_id": tenant_id,
"plan": plan_name,
"status": "active",
})),
)
.into_response()
}
async fn tenant_status_handler(
State(state): State<Arc<XcapitState>>,
Path(tenant_id): Path<String>,
) -> impl IntoResponse {
let limit_status = state.tenant_limits.get_status(&tenant_id);
let usage = state
.usage_tracker
.get_usage(&tenant_id, &UsagePeriod::All)
.await;
match limit_status {
Some(status) => {
(StatusCode::OK, Json(serde_json::json!({
"tenant_id": tenant_id,
"plan": status.plan,
"limits": {
"daily_requests": format!("{}/{}", status.daily_requests, status.daily_limit),
"monthly_tokens": format!("{}/{}", status.monthly_tokens, status.monthly_limit),
"monthly_cost": format!("${:.4}/${:.2}", status.monthly_cost_usd, status.monthly_budget_usd),
"utilization_percent": status.utilization_percent,
"is_throttled": status.is_throttled,
},
"usage": {
"total_requests": usage.request_count,
"total_tokens": usage.total_tokens_in + usage.total_tokens_out,
"total_cost_usd": usage.total_cost_usd,
"by_agent": usage.by_agent,
"by_model": usage.by_model,
},
}))).into_response()
}
None => {
(StatusCode::NOT_FOUND, Json(serde_json::json!({
"error": format!("Tenant '{}' not registered", tenant_id),
"hint": "POST /api/v1/tenants/{tenant_id}/register to register",
}))).into_response()
}
}
}
async fn list_workflow_runs_handler(State(state): State<Arc<XcapitState>>) -> impl IntoResponse {
let lead_runs = state.workflow_engine.list_runs("lead_qualification").await;
let support_runs = state.workflow_engine.list_runs("support_ticket").await;
(
StatusCode::OK,
Json(serde_json::json!({
"lead_qualification": lead_runs.iter().map(|r| serde_json::json!({
"run_id": r.run_id,
"status": format!("{:?}", r.status),
"created_at": r.created_at.to_rfc3339(),
"current_step": r.current_step_index,
"total_steps": r.step_results.len(),
})).collect::<Vec<_>>(),
"support_ticket": support_runs.iter().map(|r| serde_json::json!({
"run_id": r.run_id,
"status": format!("{:?}", r.status),
"created_at": r.created_at.to_rfc3339(),
"current_step": r.current_step_index,
"total_steps": r.step_results.len(),
})).collect::<Vec<_>>(),
})),
)
.into_response()
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn test_default_profiles_has_four() {
let profiles = default_xcapit_profiles();
assert_eq!(profiles.len(), 4);
assert!(profiles.contains_key("sales_qualifier"));
assert!(profiles.contains_key("outreach_composer"));
assert!(profiles.contains_key("support_responder"));
assert!(profiles.contains_key("ticket_router"));
}
#[test]
fn test_profile_temperatures() {
let profiles = default_xcapit_profiles();
assert!((profiles["sales_qualifier"].model.temperature - 0.3).abs() < 0.01);
assert!((profiles["outreach_composer"].model.temperature - 0.7).abs() < 0.01);
assert!((profiles["support_responder"].model.temperature - 0.4).abs() < 0.01);
assert!((profiles["ticket_router"].model.temperature - 0.2).abs() < 0.01);
}
#[test]
fn test_profile_max_tokens() {
let profiles = default_xcapit_profiles();
assert_eq!(profiles["sales_qualifier"].model.max_tokens, 2048);
assert_eq!(profiles["outreach_composer"].model.max_tokens, 4096);
assert_eq!(profiles["support_responder"].model.max_tokens, 4096);
assert_eq!(profiles["ticket_router"].model.max_tokens, 1024);
}
#[test]
fn test_profile_has_fallback() {
let profiles = default_xcapit_profiles();
for profile in profiles.values() {
assert_eq!(
profile.model.fallback_models.len(),
1,
"Profile {} should have 1 fallback",
profile.role
);
assert_eq!(profile.model.fallback_models[0].model_id, "gpt-4o-mini");
}
}
#[test]
fn test_default_config() {
let config = XcapitConfig::default();
assert!(config.url.contains("localhost:8000") || config.url.contains("xcapitsff"));
assert_eq!(config.health_check_interval, 30);
assert_eq!(config.allowed_webhook_sources.len(), 4);
}
#[test]
fn test_config_allowed_sources() {
let config = XcapitConfig::default();
assert!(config
.allowed_webhook_sources
.contains(&"hubspot".to_string()));
assert!(config
.allowed_webhook_sources
.contains(&"salesforce".to_string()));
assert!(config
.allowed_webhook_sources
.contains(&"stripe".to_string()));
assert!(config
.allowed_webhook_sources
.contains(&"intercom".to_string()));
}
#[test]
fn test_run_task_response_serialization() {
let resp = RunTaskResponse {
response: "Score: 82".to_string(),
session_id: "ses_abc".to_string(),
model_used: "claude-sonnet-4-6".to_string(),
tokens_input: 100,
tokens_output: 50,
tool_calls: vec![],
compliance_flags: vec![],
duration_ms: 1200,
};
let json = serde_json::to_string(&resp).unwrap();
assert!(json.contains("\"model_used\":\"claude-sonnet-4-6\""));
assert!(json.contains("\"duration_ms\":1200"));
}
#[test]
fn test_batch_response_serialization() {
let resp = BatchResponse {
results: vec![BatchResultItem {
index: 0,
success: true,
response: "ok".to_string(),
error: None,
tokens_input: 100,
tokens_output: 50,
}],
total: 1,
succeeded: 1,
failed: 0,
total_tokens: 150,
total_duration_ms: 500,
};
let json = serde_json::to_string(&resp).unwrap();
assert!(json.contains("\"succeeded\":1"));
}
#[test]
fn test_webhook_proxy_response_serialization() {
let resp = WebhookProxyResponse {
forwarded: true,
upstream_status: Some(200),
audit_id: "audit-123".to_string(),
error: None,
};
let json = serde_json::to_string(&resp).unwrap();
assert!(json.contains("\"forwarded\":true"));
assert!(!json.contains("\"error\"")); }
#[test]
fn test_health_response_serialization() {
let resp = ExtendedHealthResponse {
status: "ok".to_string(),
version: "0.1.0".to_string(),
checks: HealthChecks {
llm_backends: "ok".to_string(),
xcapitsff: "ok".to_string(),
compliance: "ok".to_string(),
},
xcapitsff: XcapitSffHealth {
url: "http://localhost:8000".to_string(),
status: "ok".to_string(),
last_check: Some(Utc::now()),
last_error: None,
},
};
let json = serde_json::to_string(&resp).unwrap();
assert!(json.contains("\"version\":\"0.1.0\""));
}
#[test]
fn test_resolve_api_keys_from_env() {
std::env::set_var("ANTHROPIC_API_KEY", "test-key-123");
let mut config = default_xcapit_profiles()["sales_qualifier"].model.clone();
resolve_api_keys(&mut config);
assert_eq!(config.api_key, "test-key-123");
std::env::remove_var("ANTHROPIC_API_KEY");
}
#[test]
fn test_system_prompts_non_empty() {
let profiles = default_xcapit_profiles();
for (name, profile) in &profiles {
assert!(
!profile.system_prompt.is_empty(),
"Profile {name} has empty system prompt"
);
assert!(
profile.system_prompt.len() > 50,
"Profile {name} prompt is too short"
);
}
}
#[test]
fn test_resolve_routing_hint_fast_cheap() {
let result = resolve_routing_hint("fast_cheap");
assert!(result.is_some());
let (model, provider) = result.unwrap();
assert_eq!(model, "gpt-4o-mini");
assert!(matches!(
provider,
argentor_agent::config::LlmProvider::OpenAi
));
}
#[test]
fn test_resolve_routing_hint_quality_max() {
let result = resolve_routing_hint("quality_max");
assert!(result.is_some());
let (model, provider) = result.unwrap();
assert_eq!(model, "claude-opus-4-6-20250514");
assert!(matches!(
provider,
argentor_agent::config::LlmProvider::Claude
));
}
#[test]
fn test_resolve_routing_hint_balanced_returns_none() {
let result = resolve_routing_hint("balanced");
assert!(
result.is_none(),
"balanced should return None (keep current model)"
);
}
#[test]
fn test_resolve_routing_hint_unknown_returns_none() {
assert!(resolve_routing_hint("unknown_hint").is_none());
assert!(resolve_routing_hint("").is_none());
}
#[test]
fn test_estimate_cost_usd_sonnet() {
let cost = estimate_cost_usd("claude-sonnet-4-6-20250514", 1000, 500);
assert!(
(cost - 0.0105).abs() < 0.0001,
"Expected ~0.0105, got {cost}"
);
}
#[test]
fn test_estimate_cost_usd_gpt4o_mini() {
let cost = estimate_cost_usd("gpt-4o-mini", 10000, 5000);
assert!(
(cost - 0.0045).abs() < 0.0001,
"Expected ~0.0045, got {cost}"
);
}
#[test]
fn test_estimate_cost_usd_opus() {
let cost = estimate_cost_usd("claude-opus-4-6-20250514", 1000, 500);
assert!(
(cost - 0.0525).abs() < 0.0001,
"Expected ~0.0525, got {cost}"
);
}
#[tokio::test]
async fn test_tenant_usage_tracker_record_and_query() {
let tracker = TenantUsageTracker::new();
tracker
.record(
"t_001",
"sales_qualifier",
"claude-sonnet-4-6",
100,
50,
0.01,
)
.await;
tracker
.record("t_001", "outreach_composer", "gpt-4o-mini", 200, 100, 0.005)
.await;
tracker
.record("t_002", "ticket_router", "claude-sonnet-4-6", 50, 25, 0.005)
.await;
let usage = tracker.get_usage("t_001", &UsagePeriod::All).await;
assert_eq!(usage.tenant_id, "t_001");
assert_eq!(usage.request_count, 2);
assert_eq!(usage.total_tokens_in, 300);
assert_eq!(usage.total_tokens_out, 150);
assert!((usage.total_cost_usd - 0.015).abs() < 0.0001);
assert_eq!(usage.by_agent.len(), 2);
assert!(usage.by_agent.contains_key("sales_qualifier"));
assert!(usage.by_agent.contains_key("outreach_composer"));
}
#[tokio::test]
async fn test_tenant_usage_tracker_empty_tenant() {
let tracker = TenantUsageTracker::new();
let usage = tracker.get_usage("nonexistent", &UsagePeriod::All).await;
assert_eq!(usage.request_count, 0);
assert_eq!(usage.total_tokens_in, 0);
assert_eq!(usage.total_cost_usd, 0.0);
}
#[tokio::test]
async fn test_tenant_usage_tracker_by_model_breakdown() {
let tracker = TenantUsageTracker::new();
tracker
.record(
"t_100",
"sales_qualifier",
"claude-sonnet-4-6",
100,
50,
0.01,
)
.await;
tracker
.record("t_100", "sales_qualifier", "gpt-4o-mini", 100, 50, 0.002)
.await;
let usage = tracker.get_usage("t_100", &UsagePeriod::All).await;
assert_eq!(usage.by_model.len(), 2);
assert!(usage.by_model.contains_key("claude-sonnet-4-6"));
assert!(usage.by_model.contains_key("gpt-4o-mini"));
assert_eq!(usage.by_model["claude-sonnet-4-6"].count, 1);
assert_eq!(usage.by_model["gpt-4o-mini"].count, 1);
}
#[test]
fn test_persona_config_serialization() {
let persona = PersonaConfig {
name: "Sofía".to_string(),
tone: "friendly".to_string(),
language_style: "es_latam".to_string(),
signature: "— Sofía, equipo Xcapit".to_string(),
custom_instructions: "Siempre saludar con nombre del cliente".to_string(),
};
let json = serde_json::to_string(&persona).unwrap();
assert!(json.contains("\"name\":\"Sofía\""));
assert!(json.contains("\"tone\":\"friendly\""));
assert!(json.contains("\"language_style\":\"es_latam\""));
let deserialized: PersonaConfig = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.name, "Sofía");
assert_eq!(deserialized.signature, "— Sofía, equipo Xcapit");
}
#[test]
fn test_persona_config_default_fields() {
let json = r#"{"name": "Carlos", "tone": "professional", "language_style": "es_iberia"}"#;
let persona: PersonaConfig = serde_json::from_str(json).unwrap();
assert_eq!(persona.name, "Carlos");
assert!(persona.signature.is_empty());
assert!(persona.custom_instructions.is_empty());
}
#[test]
fn test_evaluate_response_serialization() {
let mut by_criteria = HashMap::new();
by_criteria.insert("relevance".to_string(), 0.8_f32);
by_criteria.insert("tone".to_string(), 0.7_f32);
let resp = EvaluateResponse {
overall_score: 0.75,
by_criteria,
suggestions: vec!["Consider adding more detail.".to_string()],
};
let json = serde_json::to_string(&resp).unwrap();
assert!(json.contains("\"overall_score\":0.75"));
}
#[test]
fn test_score_tone_polite() {
let score = score_tone(
"Gracias por tu consulta. Por favor, no dudes en preguntar si necesitás más ayuda.",
);
assert!(score > 0.5, "Polite text should score > 0.5, got {score}");
}
#[test]
fn test_score_tone_neutral() {
let score = score_tone("The result is 42.");
assert!(
(score - 0.5).abs() < f32::EPSILON,
"Neutral text should score 0.5, got {score}"
);
}
#[test]
fn test_score_tone_negative() {
let score = score_tone("This is a terrible and stupid response with bad results.");
assert!(score < 0.5, "Negative text should score < 0.5, got {score}");
}
#[test]
fn test_parse_usage_period_hours() {
let period = parse_usage_period("hours:24");
assert!(matches!(period, UsagePeriod::Hours(24)));
}
#[test]
fn test_parse_usage_period_days() {
let period = parse_usage_period("days:7");
assert!(matches!(period, UsagePeriod::Days(7)));
}
#[test]
fn test_parse_usage_period_all() {
let period = parse_usage_period("all");
assert!(matches!(period, UsagePeriod::All));
}
#[test]
fn test_parse_usage_period_empty_defaults_to_all() {
let period = parse_usage_period("");
assert!(matches!(period, UsagePeriod::All));
}
#[test]
fn test_parse_usage_period_invalid_defaults_to_all() {
let period = parse_usage_period("invalid:foo");
assert!(matches!(period, UsagePeriod::All));
}
#[test]
fn test_create_persona_response_serialization() {
let resp = CreatePersonaResponse {
created: true,
tenant_id: "t_123".to_string(),
agent_role: "support_responder".to_string(),
persona_name: "Sofía".to_string(),
};
let json = serde_json::to_string(&resp).unwrap();
assert!(json.contains("\"created\":true"));
assert!(json.contains("\"tenant_id\":\"t_123\""));
}
#[test]
fn test_list_personas_response_serialization() {
let mut personas = HashMap::new();
personas.insert(
"support_responder".to_string(),
PersonaConfig {
name: "Sofía".to_string(),
tone: "friendly".to_string(),
language_style: "es_latam".to_string(),
signature: String::new(),
custom_instructions: String::new(),
},
);
let resp = ListPersonasResponse {
tenant_id: "t_456".to_string(),
personas,
};
let json = serde_json::to_string(&resp).unwrap();
assert!(json.contains("\"tenant_id\":\"t_456\""));
assert!(json.contains("\"support_responder\""));
}
#[test]
fn test_usage_breakdown_serialization() {
let breakdown = UsageBreakdown {
tenant_id: "t_999".to_string(),
total_tokens_in: 5000,
total_tokens_out: 2500,
total_cost_usd: 0.05,
request_count: 10,
by_agent: HashMap::new(),
by_model: HashMap::new(),
};
let json = serde_json::to_string(&breakdown).unwrap();
assert!(json.contains("\"total_tokens_in\":5000"));
assert!(json.contains("\"request_count\":10"));
}
}