Skip to main content

construct/gateway/
api_agents.rs

1//! REST API handlers for agent management (`/api/agents`).
2//!
3//! Proxies to Kumiho FastAPI for persistent agent storage.  Each agent is a
4//! Kumiho item of kind `"agent"` in the `Construct/AgentPool` space.  Agent
5//! metadata (identity, soul, expertise, etc.) is stored as revision metadata.
6
7use super::AppState;
8use super::api::require_auth;
9use super::kumiho_client::{ItemResponse, KumihoClient, KumihoError, RevisionResponse, slugify};
10
11/// Normalize a kref from a URL path — strip existing `kref://` prefix to avoid doubling.
12fn normalize_kref(raw: &str) -> String {
13    let stripped = raw.strip_prefix("kref://").unwrap_or(raw);
14    format!("kref://{stripped}")
15}
16use axum::{
17    extract::{Path, Query, State},
18    http::{HeaderMap, StatusCode},
19    response::{IntoResponse, Json},
20};
21use parking_lot::Mutex;
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24use std::sync::OnceLock;
25use std::time::Instant;
26
27// ── Response cache (avoids N+1 Kumiho calls on rapid dashboard polls) ───
28
29struct AgentCache {
30    agents: Vec<AgentResponse>,
31    include_deprecated: bool,
32    fetched_at: Instant,
33}
34
35static AGENT_CACHE: OnceLock<Mutex<Option<AgentCache>>> = OnceLock::new();
36const CACHE_TTL_SECS: u64 = 3;
37
38fn get_cached_agents(include_deprecated: bool) -> Option<Vec<AgentResponse>> {
39    let lock = AGENT_CACHE.get_or_init(|| Mutex::new(None));
40    let cache = lock.lock();
41    if let Some(ref c) = *cache {
42        if c.include_deprecated == include_deprecated
43            && c.fetched_at.elapsed().as_secs() < CACHE_TTL_SECS
44        {
45            return Some(c.agents.clone());
46        }
47    }
48    None
49}
50
51fn set_cached_agents(agents: &[AgentResponse], include_deprecated: bool) {
52    let lock = AGENT_CACHE.get_or_init(|| Mutex::new(None));
53    let mut cache = lock.lock();
54    *cache = Some(AgentCache {
55        agents: agents.to_vec(),
56        include_deprecated,
57        fetched_at: Instant::now(),
58    });
59}
60
61pub fn invalidate_agent_cache() {
62    if let Some(lock) = AGENT_CACHE.get() {
63        let mut cache = lock.lock();
64        *cache = None;
65    }
66}
67
68/// Space name within the project.
69const AGENT_SPACE_NAME: &str = "AgentPool";
70
71/// Kumiho project used for harness items (agents/teams/workflows), from config.
72fn agent_project(state: &AppState) -> String {
73    state.config.lock().kumiho.harness_project.clone()
74}
75
76/// Full space path for agents, e.g. "/Construct/AgentPool".
77fn agent_space_path(state: &AppState) -> String {
78    format!("/{}/{}", agent_project(state), AGENT_SPACE_NAME)
79}
80
81// ── Query / request types ───────────────────────────────────────────────
82
83#[derive(Deserialize)]
84pub struct AgentListQuery {
85    /// Include deprecated (disabled) agents.
86    #[serde(default)]
87    pub include_deprecated: bool,
88    /// Full-text search query.  When present, uses Kumiho search instead of list.
89    pub q: Option<String>,
90    /// Page number (1-based). Default: 1.
91    pub page: Option<u32>,
92    /// Items per page. Default: 9, max: 50.
93    pub per_page: Option<u32>,
94}
95
96#[derive(Deserialize)]
97pub struct CreateAgentBody {
98    pub name: String,
99    pub identity: String,
100    pub soul: String,
101    #[serde(default)]
102    pub expertise: Vec<String>,
103    #[serde(default)]
104    pub tone: Option<String>,
105    #[serde(default)]
106    pub role: Option<String>,
107    #[serde(default)]
108    pub agent_type: Option<String>,
109    #[serde(default)]
110    pub model: Option<String>,
111    #[serde(default)]
112    pub system_hint: Option<String>,
113}
114
115#[derive(Deserialize)]
116pub struct DeprecateBody {
117    pub kref: String,
118    pub deprecated: bool,
119}
120
121// ── Response types ──────────────────────────────────────────────────────
122
123#[derive(Serialize, Clone)]
124pub struct AgentResponse {
125    pub kref: String,
126    pub name: String,
127    pub kind: String,
128    pub deprecated: bool,
129    pub created_at: Option<String>,
130    // Metadata fields from latest revision
131    pub identity: String,
132    pub soul: String,
133    pub expertise: Vec<String>,
134    pub tone: String,
135    pub role: String,
136    pub agent_type: String,
137    pub model: String,
138    pub system_hint: String,
139    pub revision: Option<i32>,
140}
141
142// ── Helpers ─────────────────────────────────────────────────────────────
143
144/// Build a `KumihoClient` from the current config + env.
145/// Shared Kumiho client — reuses TCP connections and TLS sessions across requests.
146static KUMIHO_CLIENT: std::sync::OnceLock<KumihoClient> = std::sync::OnceLock::new();
147
148pub(super) fn build_kumiho_client(state: &AppState) -> KumihoClient {
149    KUMIHO_CLIENT
150        .get_or_init(|| {
151            let config = state.config.lock();
152            let base_url = config.kumiho.api_url.clone();
153            drop(config);
154            let service_token = std::env::var("KUMIHO_SERVICE_TOKEN").unwrap_or_default();
155            KumihoClient::new(base_url, service_token)
156        })
157        .clone()
158}
159
160/// Convert Kumiho error to an HTTP response.
161fn kumiho_err(e: KumihoError) -> (StatusCode, Json<serde_json::Value>) {
162    match &e {
163        KumihoError::Unreachable(_) => (
164            StatusCode::SERVICE_UNAVAILABLE,
165            Json(serde_json::json!({ "error": format!("Kumiho service unavailable: {e}") })),
166        ),
167        KumihoError::Api { status, body } => {
168            // Never forward 401/403 from Kumiho — the browser would confuse them
169            // with Construct pairing auth failures and force a re-pair.
170            let code = if *status == 401 || *status == 403 {
171                StatusCode::BAD_GATEWAY
172            } else {
173                StatusCode::from_u16(*status).unwrap_or(StatusCode::BAD_GATEWAY)
174            };
175            (
176                code,
177                Json(serde_json::json!({ "error": format!("Kumiho upstream: {body}") })),
178            )
179        }
180        KumihoError::Decode(msg) => (
181            StatusCode::BAD_GATEWAY,
182            Json(serde_json::json!({ "error": format!("Bad response from Kumiho: {msg}") })),
183        ),
184    }
185}
186
187/// Build metadata `HashMap` from the create/update body.
188fn agent_metadata(body: &CreateAgentBody) -> HashMap<String, String> {
189    let mut meta = HashMap::new();
190    meta.insert("display_name".to_string(), body.name.clone());
191    meta.insert("identity".to_string(), body.identity.clone());
192    meta.insert("soul".to_string(), body.soul.clone());
193    if !body.expertise.is_empty() {
194        meta.insert("expertise".to_string(), body.expertise.join(","));
195    }
196    if let Some(ref tone) = body.tone {
197        meta.insert("tone".to_string(), tone.clone());
198    }
199    if let Some(ref role) = body.role {
200        meta.insert("role".to_string(), role.clone());
201    }
202    if let Some(ref agent_type) = body.agent_type {
203        meta.insert("agent_type".to_string(), agent_type.clone());
204    }
205    if let Some(ref model) = body.model {
206        meta.insert("model".to_string(), model.clone());
207    }
208    if let Some(ref hint) = body.system_hint {
209        meta.insert("system_hint".to_string(), hint.clone());
210    }
211    meta
212}
213
214/// Build an `AgentResponse` from an item + its latest revision metadata.
215fn to_agent_response(item: &ItemResponse, rev: Option<&RevisionResponse>) -> AgentResponse {
216    let meta = rev.map(|r| &r.metadata);
217    let get = |key: &str| -> String { meta.and_then(|m| m.get(key)).cloned().unwrap_or_default() };
218    let expertise_str = get("expertise");
219    let expertise: Vec<String> = if expertise_str.is_empty() {
220        Vec::new()
221    } else {
222        expertise_str
223            .split(',')
224            .map(|s| s.trim().to_string())
225            .collect()
226    };
227
228    let display_name = {
229        let n = get("display_name");
230        if n.is_empty() {
231            item.item_name.clone()
232        } else {
233            n
234        }
235    };
236
237    AgentResponse {
238        kref: item.kref.clone(),
239        name: display_name,
240        kind: item.kind.clone(),
241        deprecated: item.deprecated,
242        created_at: item.created_at.clone(),
243        identity: get("identity"),
244        soul: get("soul"),
245        expertise,
246        tone: get("tone"),
247        role: get("role"),
248        agent_type: get("agent_type"),
249        model: get("model"),
250        system_hint: get("system_hint"),
251        revision: rev.map(|r| r.number),
252    }
253}
254
255/// Fetch published (or latest) revision for each item and build responses.
256///
257/// Uses batch API for a single HTTP call instead of N parallel requests.
258/// Falls back to parallel individual calls if the batch endpoint is unavailable.
259async fn enrich_items(client: &KumihoClient, items: Vec<ItemResponse>) -> Vec<AgentResponse> {
260    if items.is_empty() {
261        return Vec::new();
262    }
263
264    let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
265
266    // Try batch fetch (published tag first, then latest as fallback)
267    if let Ok(rev_map) = client.batch_get_revisions(&krefs, "published").await {
268        // Find items missing a published revision and fetch latest for those
269        let missing: Vec<String> = krefs
270            .iter()
271            .filter(|k| !rev_map.contains_key(*k))
272            .cloned()
273            .collect();
274        let latest_map = if !missing.is_empty() {
275            client
276                .batch_get_revisions(&missing, "latest")
277                .await
278                .unwrap_or_default()
279        } else {
280            std::collections::HashMap::new()
281        };
282
283        return items
284            .iter()
285            .map(|item| {
286                let rev = rev_map
287                    .get(&item.kref)
288                    .or_else(|| latest_map.get(&item.kref));
289                to_agent_response(item, rev)
290            })
291            .collect();
292    }
293
294    // Fallback: parallel individual calls
295    let handles: Vec<_> = items
296        .iter()
297        .map(|item| {
298            let kref = item.kref.clone();
299            let client = client.clone();
300            tokio::spawn(async move { client.get_published_or_latest(&kref).await.ok() })
301        })
302        .collect();
303    let mut agents = Vec::with_capacity(items.len());
304    for (item, handle) in items.iter().zip(handles) {
305        let rev = handle.await.ok().flatten();
306        agents.push(to_agent_response(item, rev.as_ref()));
307    }
308    agents
309}
310
311// ── Handlers ────────────────────────────────────────────────────────────
312
313/// GET /api/agents
314pub async fn handle_list_agents(
315    State(state): State<AppState>,
316    headers: HeaderMap,
317    Query(query): Query<AgentListQuery>,
318) -> impl IntoResponse {
319    if let Err(e) = require_auth(&state, &headers) {
320        return e.into_response();
321    }
322
323    let client = build_kumiho_client(&state);
324
325    let project_name = agent_project(&state);
326    let space_path = agent_space_path(&state);
327
328    // Search or list
329    let items_result = if let Some(ref q) = query.q {
330        client
331            .search_items(q, &project_name, "agent", query.include_deprecated)
332            .await
333            .map(|results| results.into_iter().map(|sr| sr.item).collect::<Vec<_>>())
334    } else {
335        client
336            .list_items(&space_path, query.include_deprecated)
337            .await
338    };
339
340    // Pagination parameters
341    let per_page = query.per_page.unwrap_or(9).min(50).max(1);
342    let page = query.page.unwrap_or(1).max(1);
343
344    // Check cache for non-search list requests
345    if query.q.is_none() {
346        if let Some(cached) = get_cached_agents(query.include_deprecated) {
347            let total_count = cached.len() as u32;
348            let skip = ((page - 1) * per_page) as usize;
349            let agents: Vec<_> = cached
350                .into_iter()
351                .skip(skip)
352                .take(per_page as usize)
353                .collect();
354            return Json(serde_json::json!({
355                "agents": agents,
356                "total_count": total_count,
357                "page": page,
358                "per_page": per_page,
359            }))
360            .into_response();
361        }
362    }
363
364    match items_result {
365        Ok(items) => {
366            let agents = enrich_items(&client, items).await;
367            // Cache non-search results
368            if query.q.is_none() {
369                set_cached_agents(&agents, query.include_deprecated);
370            }
371            let total_count = agents.len() as u32;
372            let skip = ((page - 1) * per_page) as usize;
373            let agents: Vec<_> = agents
374                .into_iter()
375                .skip(skip)
376                .take(per_page as usize)
377                .collect();
378            Json(serde_json::json!({
379                "agents": agents,
380                "total_count": total_count,
381                "page": page,
382                "per_page": per_page,
383            }))
384            .into_response()
385        }
386        Err(ref e) if matches!(e, KumihoError::Api { status: 404, .. }) => {
387            // Project or space doesn't exist yet — create them and return empty list.
388            let _ = client.ensure_project(&project_name).await;
389            let _ = client.ensure_space(&project_name, AGENT_SPACE_NAME).await;
390            Json(serde_json::json!({
391                "agents": [],
392                "total_count": 0,
393                "page": page,
394                "per_page": per_page,
395            }))
396            .into_response()
397        }
398        Err(e) => kumiho_err(e).into_response(),
399    }
400}
401
402/// POST /api/agents
403pub async fn handle_create_agent(
404    State(state): State<AppState>,
405    headers: HeaderMap,
406    Json(body): Json<CreateAgentBody>,
407) -> impl IntoResponse {
408    if let Err(e) = require_auth(&state, &headers) {
409        return e.into_response();
410    }
411
412    let client = build_kumiho_client(&state);
413    let project_name = agent_project(&state);
414    let space_path = agent_space_path(&state);
415
416    // 1. Ensure project + space exist (idempotent)
417    if let Err(e) = client.ensure_project(&project_name).await {
418        return kumiho_err(e).into_response();
419    }
420    if let Err(e) = client.ensure_space(&project_name, AGENT_SPACE_NAME).await {
421        return kumiho_err(e).into_response();
422    }
423
424    // 2. Create item (slugify name for kref-safe identifier)
425    let slug = slugify(&body.name);
426    let item = match client
427        .create_item(&space_path, &slug, "agent", HashMap::new())
428        .await
429    {
430        Ok(item) => item,
431        Err(e) => return kumiho_err(e).into_response(),
432    };
433
434    // 3. Create revision with metadata
435    let metadata = agent_metadata(&body);
436    let rev = match client.create_revision(&item.kref, metadata).await {
437        Ok(rev) => rev,
438        Err(e) => return kumiho_err(e).into_response(),
439    };
440
441    // 4. Tag revision as published
442    let _ = client.tag_revision(&rev.kref, "published").await;
443
444    invalidate_agent_cache();
445    let agent = to_agent_response(&item, Some(&rev));
446    (
447        StatusCode::CREATED,
448        Json(serde_json::json!({ "agent": agent })),
449    )
450        .into_response()
451}
452
453/// PUT /api/agents/:kref
454///
455/// The kref is passed as `*kref` to capture the full `kref://...` path.
456pub async fn handle_update_agent(
457    State(state): State<AppState>,
458    headers: HeaderMap,
459    Path(kref): Path<String>,
460    Json(body): Json<CreateAgentBody>,
461) -> impl IntoResponse {
462    if let Err(e) = require_auth(&state, &headers) {
463        return e.into_response();
464    }
465
466    let kref = normalize_kref(&kref);
467    let client = build_kumiho_client(&state);
468    let space_path = agent_space_path(&state);
469
470    // Create new revision on existing item with updated metadata
471    let metadata = agent_metadata(&body);
472    let rev = match client.create_revision(&kref, metadata).await {
473        Ok(rev) => rev,
474        Err(e) => return kumiho_err(e).into_response(),
475    };
476
477    // Tag revision as published
478    let _ = client.tag_revision(&rev.kref, "published").await;
479
480    // Fetch item details for the full response
481    let items = match client.list_items(&space_path, true).await {
482        Ok(items) => items,
483        Err(e) => return kumiho_err(e).into_response(),
484    };
485
486    invalidate_agent_cache();
487    let item = items.iter().find(|i| i.kref == kref);
488    match item {
489        Some(item) => {
490            let agent = to_agent_response(item, Some(&rev));
491            Json(serde_json::json!({ "agent": agent })).into_response()
492        }
493        None => {
494            // Item was found (revision succeeded) but not in list — build a minimal response
495            let fallback = ItemResponse {
496                kref: kref.clone(),
497                name: body.name.clone(),
498                item_name: body.name.clone(),
499                kind: "agent".to_string(),
500                deprecated: false,
501                created_at: None,
502                metadata: HashMap::new(),
503            };
504            let agent = to_agent_response(&fallback, Some(&rev));
505            Json(serde_json::json!({ "agent": agent })).into_response()
506        }
507    }
508}
509
510/// POST /api/agents/deprecate
511pub async fn handle_deprecate_agent(
512    State(state): State<AppState>,
513    headers: HeaderMap,
514    Json(body): Json<DeprecateBody>,
515) -> impl IntoResponse {
516    if let Err(e) = require_auth(&state, &headers) {
517        return e.into_response();
518    }
519
520    let kref = body.kref.clone();
521    let client = build_kumiho_client(&state);
522
523    match client.deprecate_item(&kref, body.deprecated).await {
524        Ok(item) => {
525            invalidate_agent_cache();
526            let rev = client.get_published_or_latest(&kref).await.ok();
527            let agent = to_agent_response(&item, rev.as_ref());
528            Json(serde_json::json!({ "agent": agent })).into_response()
529        }
530        Err(e) => kumiho_err(e).into_response(),
531    }
532}
533
534/// DELETE /api/agents/:kref
535pub async fn handle_delete_agent(
536    State(state): State<AppState>,
537    headers: HeaderMap,
538    Path(kref): Path<String>,
539) -> impl IntoResponse {
540    if let Err(e) = require_auth(&state, &headers) {
541        return e.into_response();
542    }
543
544    let kref = normalize_kref(&kref);
545    let client = build_kumiho_client(&state);
546
547    match client.delete_item(&kref).await {
548        Ok(()) => {
549            invalidate_agent_cache();
550            StatusCode::NO_CONTENT.into_response()
551        }
552        Err(e) => kumiho_err(e).into_response(),
553    }
554}