Skip to main content

smooth_operator_server/
admin.rs

1//! The admin HTTP API (Phase 12, increment 1).
2//!
3//! A REST surface, mounted under `/admin`, that the Next.js management console
4//! (increment 2) consumes: whoami, chat history, indexing status, and document
5//! sets. Everything except `/admin/health` is gated by [`require_role`] and
6//! org-scoped to the caller's [`Principal`].
7//!
8//! ## Routes + role gates
9//!
10//! | route | min role | scope |
11//! | --- | --- | --- |
12//! | `GET /admin/health` | — (public) | liveness only |
13//! | `GET /admin/me` | Basic | the caller's own principal |
14//! | `GET /admin/conversations` | Basic | Admin/Curator: org-wide; Basic: own only |
15//! | `GET /admin/conversations/{id}/messages` | Basic | role-scoped (Basic must own the convo) |
16//! | `GET /admin/indexing/runs` | Curator | org connectors |
17//! | `GET /admin/document-sets` | Curator | org document sets |
18//!
19//! ## Org-scoping + "Basic sees own"
20//!
21//! Every read filters to `principal.org_id` (the storage adapter's
22//! `list_conversations_by_org`). For a **Basic** caller, the result is further
23//! narrowed to conversations the caller *owns*: a conversation is owned when one
24//! of its `User` participants carries `external_id == principal.user_id`. An
25//! Admin or Curator sees the whole org. This mirrors the document-level
26//! [`AccessContext`](smooth_operator::access_control::AccessContext) model RBAC
27//! sits on top of.
28//!
29//! ## Errors
30//!
31//! Auth failures map to clean status codes (401 unauthenticated / invalid token /
32//! missing role; 403 insufficient role) with the protocol's `error` envelope
33//! shape (`{ code, message }`) reused for the body. Never leaks a token.
34
35use axum::extract::{Path, Query, State};
36use axum::http::request::Parts;
37use axum::http::StatusCode;
38use axum::response::{IntoResponse, Response};
39use axum::routing::{get, post};
40use axum::{Json, Router};
41use serde::{Deserialize, Serialize};
42use serde_json::Value;
43
44use smooth_operator::auth::{AuthError, Principal, Role};
45use smooth_operator::backplane::Target;
46use smooth_operator::connector_config::{ConnectorConfig, ConnectorKind};
47use smooth_operator::domain::ParticipantType;
48use smooth_operator::settings::AgentSettings;
49
50use smooth_operator_ingestion::{
51    Chunker, Connector, FileConnector, GithubAuth, GithubConnector, GithubConnectorConfig,
52    GithubVisibility, IndexingService, WebConnector,
53};
54
55use crate::embedder::{build_embedder, EmbedderConfig};
56use crate::protocol;
57use crate::state::{scoped_connector_key, AppState};
58
59/// Build the `/admin` router over the shared [`AppState`].
60pub fn router() -> Router<AppState> {
61    Router::new()
62        .route("/admin/health", get(health))
63        .route("/admin/me", get(me))
64        .route("/admin/conversations", get(list_conversations))
65        .route(
66            "/admin/conversations/{id}/messages",
67            get(conversation_messages),
68        )
69        .route("/admin/indexing/runs", get(indexing_runs))
70        .route("/admin/document-sets", get(document_sets))
71        // Per-model pricing for cost badges (Smooth Modes). Intentionally
72        // UNGATED (like `/admin/health`): gateway pricing is not org-sensitive,
73        // and the UI must be able to render badges even on a tokenless local
74        // connection. Degrades to an empty object on any gateway error.
75        .route("/admin/model-costs", get(model_costs))
76        // Write API (Phase 12, increment 3) — connector CRUD, index trigger,
77        // settings. RBAC: list/get are Curator; create/update/delete are Admin;
78        // index trigger is Curator; settings read is Curator, write is Admin.
79        .route(
80            "/admin/connectors",
81            get(list_connectors).post(create_connector),
82        )
83        .route(
84            "/admin/connectors/{id}",
85            get(get_connector)
86                .put(update_connector)
87                .delete(delete_connector),
88        )
89        .route("/admin/connectors/{id}/index", post(index_connector))
90        .route("/admin/settings", get(get_settings).put(put_settings))
91        // Realtime publish (Phase: backplane) — push an event to a backplane
92        // target over the WebSocket fleet. The plug point for non-AI publishers
93        // (job status, ingestion progress, notifications). Admin-gated.
94        .route("/admin/publish", post(publish_event))
95        // CORS for the `/admin` surface only (NOT `/ws` — a WebSocket handshake
96        // isn't subject to the CORS preflight). The local-flavor daemon serves its
97        // smooth-web SPA same-origin, but in dev the SPA runs on the Vite origin
98        // (http://localhost:3100) and does a best-effort cross-origin
99        // `GET /admin/me` to populate the live model/identity in its header — which
100        // the browser blocks without these headers. The local flavor is
101        // loopback/tailnet-only and every `/admin` route is token-authed
102        // ([`require_role`]), so a permissive CORS here doesn't widen the trust
103        // boundary: a cross-origin caller still needs a valid bearer token.
104        .layer(admin_cors())
105}
106
107/// The permissive [`CorsLayer`](tower_http::cors::CorsLayer) applied to `/admin`.
108///
109/// Allows any origin with the verbs the admin API exposes (GET/POST/PUT/DELETE,
110/// plus the implicit `OPTIONS` preflight) and the only request headers the SPA
111/// sends (`authorization` for the bearer token, `content-type` for JSON bodies).
112/// Auth is unaffected — every route still runs [`require_role`], so this only
113/// relaxes the browser's same-origin policy, not the server's authorization.
114pub(crate) fn admin_cors() -> tower_http::cors::CorsLayer {
115    use axum::http::{header, Method};
116    tower_http::cors::CorsLayer::new()
117        .allow_origin(tower_http::cors::Any)
118        .allow_methods([
119            Method::GET,
120            Method::POST,
121            Method::PUT,
122            Method::DELETE,
123            Method::OPTIONS,
124        ])
125        .allow_headers([header::AUTHORIZATION, header::CONTENT_TYPE])
126}
127
128// ---------------------------------------------------------------------------
129// Auth extractor — `require_role`
130// ---------------------------------------------------------------------------
131
132/// An authenticated [`Principal`] guaranteed to hold at least role `MIN`.
133///
134/// Used as an axum extractor: it reads `Authorization: Bearer <token>`, verifies
135/// it via the configured [`AuthVerifier`](smooth_operator::auth::AuthVerifier) in
136/// [`AppState`], and rejects with 401/403 if the token is missing/invalid or the
137/// role is insufficient — *before* the handler body runs. `MIN` is a const role
138/// rank: `0 = Basic`, `1 = Curator`, `2 = Admin`.
139pub struct RequireRole<const MIN: u8>(pub Principal);
140
141/// Map a [`Role`] to the const rank used by [`RequireRole`].
142const fn role_rank(role: Role) -> u8 {
143    match role {
144        Role::Basic => 0,
145        Role::Curator => 1,
146        Role::Admin => 2,
147    }
148}
149
150/// The minimum [`Role`] a const rank denotes (for error messages).
151const fn rank_role(min: u8) -> Role {
152    match min {
153        0 => Role::Basic,
154        1 => Role::Curator,
155        _ => Role::Admin,
156    }
157}
158
159impl<const MIN: u8> axum::extract::FromRequestParts<AppState> for RequireRole<MIN> {
160    type Rejection = AuthRejection;
161
162    async fn from_request_parts(
163        parts: &mut Parts,
164        state: &AppState,
165    ) -> Result<Self, Self::Rejection> {
166        let token = bearer_token(parts).ok_or(AuthRejection(AuthError::Unauthenticated))?;
167        let principal = state.auth.verify(&token).map_err(AuthRejection)?;
168        if role_rank(principal.role) < MIN {
169            return Err(AuthRejection(AuthError::Forbidden {
170                required: rank_role(MIN),
171                actual: principal.role,
172            }));
173        }
174        Ok(RequireRole(principal))
175    }
176}
177
178/// Extract the raw bearer token (without the `Bearer ` prefix) from the
179/// `Authorization` header. Returns `None` when absent or not a bearer scheme.
180fn bearer_token(parts: &Parts) -> Option<String> {
181    let header = parts.headers.get(axum::http::header::AUTHORIZATION)?;
182    let value = header.to_str().ok()?;
183    let rest = value
184        .strip_prefix("Bearer ")
185        .or_else(|| value.strip_prefix("bearer "))?;
186    let trimmed = rest.trim();
187    if trimmed.is_empty() {
188        None
189    } else {
190        Some(trimmed.to_string())
191    }
192}
193
194/// An auth/authorization rejection rendered as the protocol's `error` envelope
195/// with the right HTTP status.
196pub struct AuthRejection(AuthError);
197
198impl IntoResponse for AuthRejection {
199    fn into_response(self) -> Response {
200        let (status, code) = match &self.0 {
201            AuthError::Unauthenticated => (StatusCode::UNAUTHORIZED, "UNAUTHENTICATED"),
202            AuthError::InvalidToken(_) => (StatusCode::UNAUTHORIZED, "INVALID_TOKEN"),
203            AuthError::MissingRole(_) => (StatusCode::UNAUTHORIZED, "MISSING_ROLE"),
204            AuthError::Forbidden { .. } => (StatusCode::FORBIDDEN, "FORBIDDEN"),
205            // A misconfigured verifier is a server error, surfaced as 500 with a
206            // non-leaking message.
207            AuthError::Misconfigured(_) => {
208                (StatusCode::INTERNAL_SERVER_ERROR, "AUTH_MISCONFIGURED")
209            }
210        };
211        let body = protocol::error(None, code, &self.0.to_string());
212        (status, Json(body)).into_response()
213    }
214}
215
216/// An error from a handler body (storage failure, etc.) rendered as a 500 with
217/// the protocol error shape.
218struct AdminError(StatusCode, String, &'static str);
219
220impl IntoResponse for AdminError {
221    fn into_response(self) -> Response {
222        let body = protocol::error(None, self.2, &self.1);
223        (self.0, Json(body)).into_response()
224    }
225}
226
227impl AdminError {
228    fn internal(msg: impl Into<String>) -> Self {
229        Self(
230            StatusCode::INTERNAL_SERVER_ERROR,
231            msg.into(),
232            "INTERNAL_ERROR",
233        )
234    }
235
236    fn forbidden(msg: impl Into<String>) -> Self {
237        Self(StatusCode::FORBIDDEN, msg.into(), "FORBIDDEN")
238    }
239
240    fn not_found(msg: impl Into<String>) -> Self {
241        Self(StatusCode::NOT_FOUND, msg.into(), "NOT_FOUND")
242    }
243
244    /// A 400 with the protocol's `VALIDATION_ERROR` code — used for unknown
245    /// connector kinds, malformed config payloads, and unresolvable `auth_ref`s.
246    fn validation(msg: impl Into<String>) -> Self {
247        Self(StatusCode::BAD_REQUEST, msg.into(), "VALIDATION_ERROR")
248    }
249}
250
251// ---------------------------------------------------------------------------
252// Handlers
253// ---------------------------------------------------------------------------
254
255/// `GET /admin/health` — unauthenticated liveness probe.
256async fn health() -> Json<Value> {
257    Json(serde_json::json!({ "status": "ok" }))
258}
259
260/// `GET /admin/me` — whoami. Returns the authenticated principal (any role).
261async fn me(RequireRole::<0>(principal): RequireRole<0>) -> Json<Principal> {
262    Json(principal)
263}
264
265/// Query params for `GET /admin/conversations`.
266#[derive(Debug, Deserialize)]
267struct ConversationsQuery {
268    /// Max conversations to return (defaults to 50, capped at 200).
269    limit: Option<usize>,
270    /// Opaque cursor: the index to start from (simple offset paging over the
271    /// org-scoped, newest-first list). `None` ⇒ start at the beginning.
272    cursor: Option<usize>,
273}
274
275/// A conversation row in the admin list response.
276#[derive(Debug, Serialize)]
277#[serde(rename_all = "camelCase")]
278struct ConversationRow {
279    id: String,
280    name: String,
281    platform: String,
282    created_at: chrono::DateTime<chrono::Utc>,
283    updated_at: chrono::DateTime<chrono::Utc>,
284}
285
286/// The `GET /admin/conversations` response envelope.
287#[derive(Debug, Serialize)]
288#[serde(rename_all = "camelCase")]
289struct ConversationsResponse {
290    conversations: Vec<ConversationRow>,
291    /// Opaque cursor for the next page, or `null` when exhausted.
292    next_cursor: Option<usize>,
293}
294
295/// `GET /admin/conversations` — chat history, org-scoped. Admin/Curator see the
296/// whole org; Basic sees only conversations they own.
297async fn list_conversations(
298    RequireRole::<0>(principal): RequireRole<0>,
299    State(state): State<AppState>,
300    Query(q): Query<ConversationsQuery>,
301) -> Result<Json<ConversationsResponse>, AdminError> {
302    let limit = q.limit.unwrap_or(50).clamp(1, 200);
303    let offset = q.cursor.unwrap_or(0);
304
305    let all = state
306        .storage
307        .list_conversations_by_org(&principal.org_id)
308        .await
309        .map_err(|e| AdminError::internal(format!("list conversations failed: {e}")))?;
310
311    // Basic callers only see conversations they own.
312    let visible: Vec<_> = if principal.role >= Role::Curator {
313        all
314    } else {
315        let mut owned = Vec::new();
316        for conv in all {
317            if conversation_owned_by(&state, &conv.id, &principal.user_id).await {
318                owned.push(conv);
319            }
320        }
321        owned
322    };
323
324    let total = visible.len();
325    let page: Vec<ConversationRow> = visible
326        .into_iter()
327        .skip(offset)
328        .take(limit)
329        .map(|c| ConversationRow {
330            id: c.id,
331            name: c.name,
332            platform: format!("{:?}", c.platform).to_lowercase(),
333            created_at: c.created_at,
334            updated_at: c.updated_at,
335        })
336        .collect();
337
338    let next = offset + page.len();
339    let next_cursor = if next < total { Some(next) } else { None };
340
341    Ok(Json(ConversationsResponse {
342        conversations: page,
343        next_cursor,
344    }))
345}
346
347/// `GET /admin/conversations/{id}/messages` — messages for one conversation,
348/// role-scoped (a Basic caller must own the conversation).
349async fn conversation_messages(
350    RequireRole::<0>(principal): RequireRole<0>,
351    State(state): State<AppState>,
352    Path(conversation_id): Path<String>,
353) -> Result<Json<Value>, AdminError> {
354    // The conversation must exist + belong to the caller's org.
355    let conv = state
356        .storage
357        .get_conversation(&conversation_id)
358        .await
359        .map_err(|e| AdminError::internal(format!("get conversation failed: {e}")))?
360        .ok_or_else(|| {
361            AdminError::not_found(format!("conversation '{conversation_id}' not found"))
362        })?;
363
364    if conv.organization_id != principal.org_id {
365        // Don't leak existence across orgs — 404, not 403.
366        return Err(AdminError::not_found(format!(
367            "conversation '{conversation_id}' not found"
368        )));
369    }
370
371    // Basic callers may only read conversations they own.
372    if principal.role < Role::Curator
373        && !conversation_owned_by(&state, &conversation_id, &principal.user_id).await
374    {
375        return Err(AdminError::forbidden(
376            "you do not have access to this conversation",
377        ));
378    }
379
380    let query = smooth_operator::adapter::MessageQuery::new(&conversation_id, 200);
381    let page = state
382        .storage
383        .list_messages_by_conversation(query)
384        .await
385        .map_err(|e| AdminError::internal(format!("list messages failed: {e}")))?;
386
387    Ok(Json(serde_json::json!({
388        "conversationId": conversation_id,
389        "messages": page.messages,
390        "nextCursor": page.next_cursor,
391    })))
392}
393
394/// `GET /admin/indexing/runs` — indexing-run status across **the caller's org's**
395/// connectors. Curator+ only.
396///
397/// Org-scoped two ways (cross-org leak fix): we iterate only the principal's
398/// org's connectors, and we query the indexing store under the **org-namespaced**
399/// key ([`scoped_connector_key`]) so a same-named connector in another org can't
400/// surface its runs here. The reported `connectorName` is the un-scoped display
401/// name (the namespace is an internal storage key, never exposed).
402async fn indexing_runs(
403    RequireRole::<1>(principal): RequireRole<1>,
404    State(state): State<AppState>,
405) -> Json<Value> {
406    let mut runs = Vec::new();
407    for connector in state.connectors(&principal.org_id) {
408        let key = scoped_connector_key(&principal.org_id, &connector);
409        for run in state.indexing.list_runs(&key) {
410            runs.push(serde_json::json!({
411                "id": run.id,
412                // Report the display name, never the internal org-namespaced key.
413                "connectorName": connector,
414                "status": format!("{:?}", run.status).to_lowercase(),
415                "startedAt": run.started_at,
416                "finishedAt": run.finished_at,
417                "documentsSeen": run.documents_seen,
418                "chunksIndexed": run.chunks_indexed,
419                "documentsSkipped": run.documents_skipped,
420                "cursor": run.cursor,
421                "error": run.error,
422            }));
423        }
424    }
425    Json(serde_json::json!({ "runs": runs }))
426}
427
428/// A document-set row.
429#[derive(Debug, Serialize)]
430#[serde(rename_all = "camelCase")]
431struct DocumentSetRow {
432    name: String,
433    document_count: usize,
434}
435
436/// `GET /admin/document-sets` — distinct document-set names + doc counts for
437/// **the caller's org**. Curator+ only. Org-scoped so org A's document sets are
438/// never reported to an org-B caller (cross-org leak fix).
439async fn document_sets(
440    RequireRole::<1>(principal): RequireRole<1>,
441    State(state): State<AppState>,
442) -> Json<Value> {
443    let sets: Vec<DocumentSetRow> = state
444        .document_sets(&principal.org_id)
445        .into_iter()
446        .map(|(name, document_count)| DocumentSetRow {
447            name,
448            document_count,
449        })
450        .collect();
451    Json(serde_json::json!({ "documentSets": sets }))
452}
453
454// ---------------------------------------------------------------------------
455// Model costs — `GET /admin/model-costs`
456// ---------------------------------------------------------------------------
457
458/// `GET /admin/model-costs` — per-model gateway pricing, keyed by gateway model
459/// id, for the UI's per-mode cost badges (Smooth Modes). Shape:
460///
461/// ```json
462/// { "<modelId>": { "inputCostPerToken": <number|null>,
463///                  "outputCostPerToken": <number|null>,
464///                  "tier": "<model_tier|null>",
465///                  "useCases": [<string>] } }
466/// ```
467///
468/// Ungated (see the route registration): pricing isn't org-sensitive and the
469/// badge must render even on a tokenless local connection. The gateway's
470/// `/v1/model/info` is fetched at most once per process (cached in
471/// [`AppState::model_costs_cache`]); on **any** gateway error this returns an
472/// empty object with status 200 — never a 500 — so the UI degrades to no-badge.
473async fn model_costs(State(state): State<AppState>) -> Json<Value> {
474    // Reuse the cached pricing if a prior request already fetched it.
475    if let Some(cached) = state.model_costs_cache.get() {
476        return Json(cached.clone());
477    }
478    match fetch_model_costs(&state.config).await {
479        Ok(map) => {
480            // Cache the first success; a lost race (another request set it first)
481            // is harmless — both computed the same stable pricing.
482            let _ = state.model_costs_cache.set(map.clone());
483            Json(map)
484        }
485        // Degrade to no-badge on any gateway/transport error — never surface a
486        // 500. NOT cached, so the next request retries.
487        Err(_) => Json(serde_json::json!({})),
488    }
489}
490
491/// Fetch the gateway's `/v1/model/info` (using the server's configured gateway
492/// base url + key — the same creds the turns use) and map it to the
493/// `/admin/model-costs` response shape via [`map_model_info`].
494///
495/// # Errors
496/// Returns an error on any transport / non-2xx / decode failure; the caller maps
497/// that to an empty object (UI degrades gracefully).
498async fn fetch_model_costs(config: &crate::config::ServerConfig) -> anyhow::Result<Value> {
499    // `gateway_url` already ends in `/v1` (e.g. `https://llm.smoo.ai/v1`), so the
500    // model-info endpoint is `{gateway_url}/model/info`.
501    let url = format!("{}/model/info", config.gateway_url.trim_end_matches('/'));
502    let client = reqwest::Client::new();
503    let mut req = client.get(&url);
504    if let Some(key) = config.gateway_key.as_deref() {
505        req = req.bearer_auth(key);
506    }
507    let payload: Value = req.send().await?.error_for_status()?.json().await?;
508    Ok(map_model_info(&payload))
509}
510
511/// Map the gateway's `/v1/model/info` payload
512/// (`{ data: [{ model_name, model_info: { input_cost_per_token,
513/// output_cost_per_token, model_tier, use_cases } }] }`) to the
514/// `/admin/model-costs` response object, keyed by `model_name`. Missing numeric
515/// fields become `null`, a missing tier becomes `null`, and a missing
516/// `use_cases` becomes `[]`. Entries without a `model_name` are skipped. Pure +
517/// network-free so it's unit-testable on a sample payload.
518fn map_model_info(payload: &Value) -> Value {
519    let mut out = serde_json::Map::new();
520    let Some(entries) = payload.get("data").and_then(Value::as_array) else {
521        return Value::Object(out);
522    };
523    for entry in entries {
524        let Some(name) = entry.get("model_name").and_then(Value::as_str) else {
525            continue;
526        };
527        let info = entry.get("model_info");
528        let input = info
529            .and_then(|i| i.get("input_cost_per_token"))
530            .and_then(Value::as_f64);
531        let output = info
532            .and_then(|i| i.get("output_cost_per_token"))
533            .and_then(Value::as_f64);
534        let tier = info
535            .and_then(|i| i.get("model_tier"))
536            .and_then(Value::as_str);
537        let use_cases = info
538            .and_then(|i| i.get("use_cases"))
539            .and_then(Value::as_array)
540            .cloned()
541            .unwrap_or_default();
542        out.insert(
543            name.to_string(),
544            serde_json::json!({
545                "inputCostPerToken": input,
546                "outputCostPerToken": output,
547                "tier": tier,
548                "useCases": use_cases,
549            }),
550        );
551    }
552    Value::Object(out)
553}
554
555// ---------------------------------------------------------------------------
556// Connector config CRUD (Phase 12, increment 3)
557// ---------------------------------------------------------------------------
558
559/// The wire body for create/update of a connector. `kind` is validated against
560/// [`ConnectorKind`]; `config` is the kind-specific free-form payload (may carry
561/// an `auth_ref` naming a secret — never the secret itself).
562#[derive(Debug, Deserialize)]
563struct ConnectorWrite {
564    name: String,
565    kind: String,
566    #[serde(default)]
567    config: Value,
568    #[serde(default = "default_enabled")]
569    enabled: bool,
570}
571
572const fn default_enabled() -> bool {
573    true
574}
575
576/// Serialize a [`ConnectorConfig`] for an API response under a `connector` key.
577///
578/// The stored `config` is echoed as-is — it only ever holds an `auth_ref` *name*,
579/// never a secret value, so this can never leak a credential.
580fn connector_json(cfg: &ConnectorConfig) -> Value {
581    serde_json::json!({
582        "connector": {
583            "id": cfg.id,
584            "name": cfg.name,
585            "kind": cfg.kind.as_str(),
586            "config": cfg.config,
587            "enabled": cfg.enabled,
588            "createdAt": cfg.created_at,
589            "updatedAt": cfg.updated_at,
590        }
591    })
592}
593
594/// Validate `(kind, config)` and surface a clean 400 on an unknown kind or a
595/// payload missing the fields that kind needs to build a connector.
596fn validate_connector(kind: ConnectorKind, config: &Value) -> Result<(), AdminError> {
597    let missing = |field: &str| {
598        AdminError::validation(format!(
599            "{} connector config requires a '{field}' field",
600            kind.as_str()
601        ))
602    };
603    match kind {
604        ConnectorKind::Github => {
605            if config.get("owner").and_then(Value::as_str).is_none() {
606                return Err(missing("owner"));
607            }
608            if config.get("repo").and_then(Value::as_str).is_none() {
609                return Err(missing("repo"));
610            }
611        }
612        ConnectorKind::Web => {
613            if config.get("url").and_then(Value::as_str).is_none() {
614                return Err(missing("url"));
615            }
616        }
617        ConnectorKind::File => {
618            if config.get("path").and_then(Value::as_str).is_none() {
619                return Err(missing("path"));
620            }
621        }
622    }
623    Ok(())
624}
625
626/// `GET /admin/connectors` — list this org's connectors (Curator+).
627async fn list_connectors(
628    RequireRole::<1>(principal): RequireRole<1>,
629    State(state): State<AppState>,
630) -> Json<Value> {
631    let connectors: Vec<Value> = state
632        .connector_configs
633        .list(&principal.org_id)
634        .iter()
635        .map(|c| connector_json(c)["connector"].clone())
636        .collect();
637    Json(serde_json::json!({ "connectors": connectors }))
638}
639
640/// `GET /admin/connectors/{id}` — one connector, org-scoped (Curator+). A
641/// cross-org / unknown id is a 404 (existence not leaked across orgs).
642async fn get_connector(
643    RequireRole::<1>(principal): RequireRole<1>,
644    State(state): State<AppState>,
645    Path(id): Path<String>,
646) -> Result<Json<Value>, AdminError> {
647    let cfg = state
648        .connector_configs
649        .get(&principal.org_id, &id)
650        .ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
651    Ok(Json(connector_json(&cfg)))
652}
653
654/// `POST /admin/connectors` — create a connector (Admin only). Returns 201 with
655/// the created connector (a fresh uuid id, org from the principal).
656async fn create_connector(
657    RequireRole::<2>(principal): RequireRole<2>,
658    State(state): State<AppState>,
659    Json(body): Json<ConnectorWrite>,
660) -> Result<Response, AdminError> {
661    let kind = ConnectorKind::parse(&body.kind)
662        .map_err(|bad| AdminError::validation(format!("unknown connector kind '{bad}'")))?;
663    validate_connector(kind, &body.config)?;
664
665    let now = chrono::Utc::now();
666    let cfg = ConnectorConfig {
667        id: uuid::Uuid::new_v4().to_string(),
668        org_id: principal.org_id.clone(),
669        name: body.name,
670        kind,
671        config: body.config,
672        enabled: body.enabled,
673        created_at: now,
674        updated_at: now,
675    };
676    state.connector_configs.upsert(cfg.clone());
677    // Record the connector name under the caller's org so its runs are listed by
678    // /admin/indexing/runs — and ONLY for this org (cross-org scoping).
679    state.record_connector(principal.org_id.clone(), cfg.name.clone());
680    Ok((StatusCode::CREATED, Json(connector_json(&cfg))).into_response())
681}
682
683/// `PUT /admin/connectors/{id}` — update a connector (Admin only). The id +
684/// `created_at` are preserved; a cross-org / unknown id is a 404.
685async fn update_connector(
686    RequireRole::<2>(principal): RequireRole<2>,
687    State(state): State<AppState>,
688    Path(id): Path<String>,
689    Json(body): Json<ConnectorWrite>,
690) -> Result<Json<Value>, AdminError> {
691    let existing = state
692        .connector_configs
693        .get(&principal.org_id, &id)
694        .ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
695
696    let kind = ConnectorKind::parse(&body.kind)
697        .map_err(|bad| AdminError::validation(format!("unknown connector kind '{bad}'")))?;
698    validate_connector(kind, &body.config)?;
699
700    let cfg = ConnectorConfig {
701        id: existing.id,
702        org_id: existing.org_id,
703        name: body.name,
704        kind,
705        config: body.config,
706        enabled: body.enabled,
707        created_at: existing.created_at,
708        updated_at: chrono::Utc::now(),
709    };
710    state.connector_configs.upsert(cfg.clone());
711    state.record_connector(principal.org_id.clone(), cfg.name.clone());
712    Ok(Json(connector_json(&cfg)))
713}
714
715/// `DELETE /admin/connectors/{id}` — remove a connector (Admin only). 204 on
716/// success; a cross-org / unknown id is a 404.
717async fn delete_connector(
718    RequireRole::<2>(principal): RequireRole<2>,
719    State(state): State<AppState>,
720    Path(id): Path<String>,
721) -> Result<Response, AdminError> {
722    if state.connector_configs.delete(&principal.org_id, &id) {
723        Ok(StatusCode::NO_CONTENT.into_response())
724    } else {
725        Err(AdminError::not_found(format!("connector '{id}' not found")))
726    }
727}
728
729/// `POST /admin/connectors/{id}/index` — build the connector from its stored
730/// config and run one indexing pass (Curator+).
731///
732/// For `github`, the token is resolved from `auth_ref` → env at *this* moment
733/// (never persisted). An unresolvable `auth_ref` returns a clean 400 *before*
734/// any GitHub call — no panic, no network. The resulting [`IndexingRun`] is
735/// recorded in the shared `IndexingStore` (so it also shows in
736/// `GET /admin/indexing/runs`) and returned.
737async fn index_connector(
738    RequireRole::<1>(principal): RequireRole<1>,
739    State(state): State<AppState>,
740    Path(id): Path<String>,
741) -> Result<Json<Value>, AdminError> {
742    let cfg = state
743        .connector_configs
744        .get(&principal.org_id, &id)
745        .ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
746
747    // Build the live connector from the stored config (resolving any secret from
748    // env at this moment). A validation failure here is a clean 400.
749    //
750    // The connector is named with an ORG-NAMESPACED key so the indexing run is
751    // recorded in the store under `IXCONN#<org>...<name>` — a same-named
752    // connector in another org records + lists separately (cross-org scoping).
753    // The display name (`cfg.name`) is rewritten back into the response below.
754    let scoped_key = scoped_connector_key(&principal.org_id, &cfg.name);
755    let connector = build_connector(&cfg, &scoped_key)?;
756
757    let service = IndexingService::new(principal.org_id.clone());
758    let chunker = Chunker::default();
759    // Select the embedder from config: the real semantic GatewayEmbedder (1536-d)
760    // when the gateway is keyed, else the network-free DeterministicEmbedder
761    // (1024-d) with a loud warning. The knowledge store the docs land in was
762    // created with this same embedder's dim by the storage-backend wiring
763    // (`build_state_from_env_async`), so document and query vectors agree.
764    let embedder = build_embedder(&EmbedderConfig::from_server_config(&state.config));
765    let knowledge = state.storage.knowledge();
766
767    let run = service
768        .run_once(
769            connector.as_ref(),
770            state.indexing.as_ref(),
771            &chunker,
772            embedder.as_ref(),
773            knowledge,
774        )
775        .await
776        .map_err(|e| AdminError::internal(format!("indexing failed: {e}")))?;
777
778    // Surface the connector for /admin/indexing/runs listing (org-scoped).
779    state.record_connector(principal.org_id.clone(), cfg.name.clone());
780
781    Ok(Json(serde_json::json!({
782        "run": {
783            "id": run.id,
784            // Report the display name, never the internal org-namespaced key.
785            "connectorName": cfg.name,
786            "status": format!("{:?}", run.status).to_lowercase(),
787            "startedAt": run.started_at,
788            "finishedAt": run.finished_at,
789            "documentsSeen": run.documents_seen,
790            "chunksIndexed": run.chunks_indexed,
791            "documentsSkipped": run.documents_skipped,
792            "cursor": run.cursor,
793            "error": run.error,
794        }
795    })))
796}
797
798/// Build a live [`Connector`] from a stored [`ConnectorConfig`], resolving any
799/// secret named by `auth_ref` from the environment *now* (never persisted).
800///
801/// `connector_name` is the name stamped onto the built connector — the caller
802/// passes an **org-namespaced** key so the indexing run is recorded per-org
803/// (cross-org scoping), keeping the human display name out of the storage key.
804///
805/// Returns a clean 400 ([`AdminError::validation`]) — never a panic — for a
806/// malformed config or an unresolvable `auth_ref`, so `/index` can surface the
807/// problem without touching the network.
808fn build_connector(
809    cfg: &ConnectorConfig,
810    connector_name: &str,
811) -> Result<Box<dyn Connector>, AdminError> {
812    let connector_name = connector_name.to_string();
813    match cfg.kind {
814        ConnectorKind::Web => {
815            let url = cfg
816                .config
817                .get("url")
818                .and_then(Value::as_str)
819                .ok_or_else(|| AdminError::validation("web connector requires a 'url'"))?;
820            Ok(Box::new(NamedConnector::new(
821                connector_name,
822                WebConnector::new(url),
823            )))
824        }
825        ConnectorKind::File => {
826            let path = cfg
827                .config
828                .get("path")
829                .and_then(Value::as_str)
830                .ok_or_else(|| AdminError::validation("file connector requires a 'path'"))?;
831            Ok(Box::new(NamedConnector::new(
832                connector_name,
833                FileConnector::new(path),
834            )))
835        }
836        ConnectorKind::Github => {
837            let owner = cfg
838                .config
839                .get("owner")
840                .and_then(Value::as_str)
841                .ok_or_else(|| AdminError::validation("github connector requires an 'owner'"))?;
842            let repo = cfg
843                .config
844                .get("repo")
845                .and_then(Value::as_str)
846                .ok_or_else(|| AdminError::validation("github connector requires a 'repo'"))?;
847
848            // Resolve the token from auth_ref→env. A private repo MUST have a
849            // resolvable token; a public repo may index unauthenticated.
850            let visibility = match cfg.config.get("visibility").and_then(Value::as_str) {
851                Some("private") => GithubVisibility::Private,
852                _ => GithubVisibility::Public,
853            };
854            let auth = resolve_github_auth(cfg, visibility)?;
855
856            let mut gh = GithubConnectorConfig::new(owner, repo, auth).visibility(visibility);
857            if let Some(r) = cfg.config.get("ref").and_then(Value::as_str) {
858                gh = gh.at_ref(r);
859            }
860            Ok(Box::new(NamedConnector::new(
861                connector_name,
862                GithubConnector::new(gh),
863            )))
864        }
865    }
866}
867
868/// Resolve a [`GithubAuth`] from the connector's `auth_ref` → env var.
869///
870/// - `auth_ref` set + env present ⇒ `Token`.
871/// - `auth_ref` set but env **missing/empty** ⇒ a clean 400 (no GitHub call).
872/// - no `auth_ref`: a **public** repo indexes `Unauthenticated`; a **private**
873///   repo is a 400 (a private repo needs a credential).
874fn resolve_github_auth(
875    cfg: &ConnectorConfig,
876    visibility: GithubVisibility,
877) -> Result<GithubAuth, AdminError> {
878    match cfg.auth_ref() {
879        Some(name) => match std::env::var(name) {
880            Ok(token) if !token.trim().is_empty() => Ok(GithubAuth::Token(token)),
881            _ => Err(AdminError::validation(format!(
882                "github connector auth_ref '{name}' did not resolve to a token \
883                 (set the named secret/env var); refusing to index"
884            ))),
885        },
886        None => match visibility {
887            GithubVisibility::Public => Ok(GithubAuth::Unauthenticated),
888            GithubVisibility::Private => Err(AdminError::validation(
889                "github connector for a private repo requires an 'auth_ref' \
890                 naming a token secret",
891            )),
892        },
893    }
894}
895
896/// Wraps a connector to override its `name()` with the configured connector
897/// name, so the indexing run + its `/admin/indexing/runs` row are keyed by the
898/// human-chosen connector name (not the generic `"web"` / `"file"` / `"github"`
899/// kind label). Delegates `pull` unchanged.
900struct NamedConnector<C: Connector> {
901    name: String,
902    inner: C,
903}
904
905impl<C: Connector> NamedConnector<C> {
906    fn new(name: String, inner: C) -> Self {
907        Self { name, inner }
908    }
909}
910
911#[async_trait::async_trait]
912impl<C: Connector> Connector for NamedConnector<C> {
913    fn name(&self) -> &str {
914        &self.name
915    }
916
917    async fn pull(
918        &self,
919        since: Option<smooth_operator_ingestion::Timestamp>,
920    ) -> anyhow::Result<Vec<smooth_operator_ingestion::RawDocument>> {
921        self.inner.pull(since).await
922    }
923}
924
925// ---------------------------------------------------------------------------
926// Agent settings (Phase 12, increment 3)
927// ---------------------------------------------------------------------------
928
929/// The wire body for `PUT /admin/settings`.
930#[derive(Debug, Deserialize)]
931#[serde(rename_all = "camelCase")]
932struct SettingsWrite {
933    model: String,
934    system_prompt: String,
935    /// Optional per-org agent persona override (see [`AgentSettings::persona`]).
936    /// Omitted ⇒ `None` ⇒ the runner stays on its built-in const prompt.
937    #[serde(default)]
938    persona: Option<String>,
939    #[serde(default)]
940    default_tools: Vec<String>,
941}
942
943/// Serialize [`AgentSettings`] under a `settings` key.
944fn settings_json(s: &AgentSettings) -> Value {
945    serde_json::json!({
946        "settings": {
947            "orgId": s.org_id,
948            "model": s.model,
949            "systemPrompt": s.system_prompt,
950            "persona": s.persona,
951            "defaultTools": s.default_tools,
952            "updatedAt": s.updated_at,
953        }
954    })
955}
956
957/// `GET /admin/settings` — the org's agent settings (defaults if unset). Curator+.
958async fn get_settings(
959    RequireRole::<1>(principal): RequireRole<1>,
960    State(state): State<AppState>,
961) -> Json<Value> {
962    let settings = state.settings.get(&principal.org_id);
963    Json(settings_json(&settings))
964}
965
966/// `PUT /admin/settings` — replace the org's agent settings (Admin only).
967async fn put_settings(
968    RequireRole::<2>(principal): RequireRole<2>,
969    State(state): State<AppState>,
970    Json(body): Json<SettingsWrite>,
971) -> Json<Value> {
972    let settings = AgentSettings {
973        org_id: principal.org_id.clone(),
974        model: body.model,
975        system_prompt: body.system_prompt,
976        persona: body.persona,
977        default_tools: body.default_tools,
978        updated_at: chrono::Utc::now(),
979    };
980    state.settings.put(settings.clone());
981    Json(settings_json(&settings))
982}
983
984// ---------------------------------------------------------------------------
985// Helpers
986// ---------------------------------------------------------------------------
987
988/// Whether `user_id` owns the conversation — true when a `User` participant in
989/// the conversation carries `external_id == user_id`.
990async fn conversation_owned_by(state: &AppState, conversation_id: &str, user_id: &str) -> bool {
991    match state
992        .storage
993        .list_participants_by_conversation(conversation_id)
994        .await
995    {
996        Ok(parts) => parts.iter().any(|p| {
997            p.participant_type == ParticipantType::User && p.external_id.as_deref() == Some(user_id)
998        }),
999        Err(_) => false,
1000    }
1001}
1002
1003// ---------------------------------------------------------------------------
1004// Realtime publish — `POST /admin/publish`
1005// ---------------------------------------------------------------------------
1006
1007/// A delivery target in the publish request, in a friendlier `{type, id}` shape
1008/// than [`Target`]'s default enum serialization.
1009#[derive(Deserialize)]
1010#[serde(tag = "type", content = "id", rename_all = "snake_case")]
1011enum PublishTarget {
1012    Connection(String),
1013    Session(String),
1014    User(String),
1015    Org(String),
1016    Agent(String),
1017}
1018
1019impl From<PublishTarget> for Target {
1020    fn from(t: PublishTarget) -> Self {
1021        match t {
1022            PublishTarget::Connection(id) => Target::Connection(id),
1023            PublishTarget::Session(id) => Target::Session(id),
1024            PublishTarget::User(id) => Target::User(id),
1025            PublishTarget::Org(id) => Target::Org(id),
1026            PublishTarget::Agent(id) => Target::Agent(id),
1027        }
1028    }
1029}
1030
1031/// `POST /admin/publish` body: the [`PublishTarget`] and the event payload to
1032/// deliver verbatim to every connection for that target.
1033#[derive(Deserialize)]
1034struct PublishRequest {
1035    target: PublishTarget,
1036    event: Value,
1037}
1038
1039/// `POST /admin/publish` response.
1040#[derive(Serialize)]
1041struct PublishResponse {
1042    /// Connections this **pod** delivered to. With a distributed backplane the
1043    /// event also fans out to connections on other pods, which this count omits
1044    /// (each pod delivers to its own sockets) — so `0` here does NOT mean
1045    /// "delivered to nobody", only "nobody on the pod that served this request".
1046    delivered: usize,
1047}
1048
1049/// Push a realtime event to a backplane target over the WebSocket fleet — the
1050/// plug point for **non-AI publishers** (job status, ingestion progress,
1051/// notifications, billing): any service can deliver to a connected client
1052/// without going through an agent turn.
1053///
1054/// Admin-gated. Targets are opaque ids matched against the backplane's
1055/// connection registry; this layer does not org-validate session/user/agent ids
1056/// (the backplane is an id-routing layer, not an authz layer). A host that needs
1057/// hard tenant isolation namespaces those ids in its own wrapper before they
1058/// reach the backplane. Callers are trusted internal services holding an Admin
1059/// credential.
1060async fn publish_event(
1061    RequireRole::<2>(_principal): RequireRole<2>,
1062    State(state): State<AppState>,
1063    Json(body): Json<PublishRequest>,
1064) -> Json<PublishResponse> {
1065    let delivered = state
1066        .backplane
1067        .publish(body.target.into(), body.event)
1068        .await;
1069    Json(PublishResponse { delivered })
1070}
1071
1072#[cfg(test)]
1073mod tests {
1074    use super::*;
1075
1076    #[test]
1077    fn map_model_info_maps_sample_payload() {
1078        // A representative `/v1/model/info` payload from the LiteLLM gateway.
1079        let payload = serde_json::json!({
1080            "data": [
1081                {
1082                    "model_name": "claude-opus-4-8",
1083                    "model_info": {
1084                        "input_cost_per_token": 0.000015,
1085                        "output_cost_per_token": 0.000075,
1086                        "model_tier": "frontier",
1087                        "use_cases": ["reasoning", "coding"]
1088                    }
1089                },
1090                {
1091                    "model_name": "claude-haiku-4-5",
1092                    "model_info": {
1093                        "input_cost_per_token": 0.0000008,
1094                        "output_cost_per_token": 0.000004,
1095                        "model_tier": "fast",
1096                        "use_cases": ["chat"]
1097                    }
1098                }
1099            ]
1100        });
1101
1102        let out = map_model_info(&payload);
1103        let opus = &out["claude-opus-4-8"];
1104        assert!((opus["inputCostPerToken"].as_f64().unwrap() - 0.000015).abs() < 1e-12);
1105        assert!((opus["outputCostPerToken"].as_f64().unwrap() - 0.000075).abs() < 1e-12);
1106        assert_eq!(opus["tier"], "frontier");
1107        assert_eq!(opus["useCases"], serde_json::json!(["reasoning", "coding"]));
1108
1109        let haiku = &out["claude-haiku-4-5"];
1110        assert_eq!(haiku["tier"], "fast");
1111        assert_eq!(haiku["useCases"], serde_json::json!(["chat"]));
1112    }
1113
1114    #[test]
1115    fn map_model_info_tolerates_missing_fields() {
1116        // Missing model_info / cost / tier / use_cases → nulls + empty array,
1117        // and an entry with no model_name is skipped.
1118        let payload = serde_json::json!({
1119            "data": [
1120                { "model_name": "bare", "model_info": {} },
1121                { "model_info": { "model_tier": "x" } }
1122            ]
1123        });
1124        let out = map_model_info(&payload);
1125        let obj = out.as_object().unwrap();
1126        assert_eq!(obj.len(), 1, "the model_name-less entry is skipped");
1127        let bare = &out["bare"];
1128        assert!(bare["inputCostPerToken"].is_null());
1129        assert!(bare["outputCostPerToken"].is_null());
1130        assert!(bare["tier"].is_null());
1131        assert_eq!(bare["useCases"], serde_json::json!([]));
1132    }
1133
1134    #[test]
1135    fn map_model_info_empty_on_missing_data() {
1136        // A payload with no `data` array maps to an empty object (UI no-badge).
1137        assert_eq!(
1138            map_model_info(&serde_json::json!({})),
1139            serde_json::json!({})
1140        );
1141        assert_eq!(
1142            map_model_info(&serde_json::json!({ "data": "nope" })),
1143            serde_json::json!({})
1144        );
1145    }
1146}