Skip to main content

smooth_operator_server/
admin.rs

1//! The admin HTTP API (Phase 12, increment 1).
2//!
3//! A REST surface, mounted under `/admin`, that the Next.js management console
4//! (increment 2) consumes: whoami, chat history, indexing status, and document
5//! sets. Everything except `/admin/health` is gated by [`require_role`] and
6//! org-scoped to the caller's [`Principal`].
7//!
8//! ## Routes + role gates
9//!
10//! | route | min role | scope |
11//! | --- | --- | --- |
12//! | `GET /admin/health` | — (public) | liveness only |
13//! | `GET /admin/me` | Basic | the caller's own principal |
14//! | `GET /admin/conversations` | Basic | Admin/Curator: org-wide; Basic: own only |
15//! | `GET /admin/conversations/{id}/messages` | Basic | role-scoped (Basic must own the convo) |
16//! | `GET /admin/indexing/runs` | Curator | org connectors |
17//! | `GET /admin/document-sets` | Curator | org document sets |
18//!
19//! ## Org-scoping + "Basic sees own"
20//!
21//! Every read filters to `principal.org_id` (the storage adapter's
22//! `list_conversations_by_org`). For a **Basic** caller, the result is further
23//! narrowed to conversations the caller *owns*: a conversation is owned when one
24//! of its `User` participants carries `external_id == principal.user_id`. An
25//! Admin or Curator sees the whole org. This mirrors the document-level
26//! [`AccessContext`](smooth_operator::access_control::AccessContext) model RBAC
27//! sits on top of.
28//!
29//! ## Errors
30//!
31//! Auth failures map to clean status codes (401 unauthenticated / invalid token /
32//! missing role; 403 insufficient role) with the protocol's `error` envelope
33//! shape (`{ code, message }`) reused for the body. Never leaks a token.
34
35use axum::extract::{Path, Query, State};
36use axum::http::request::Parts;
37use axum::http::StatusCode;
38use axum::response::{IntoResponse, Response};
39use axum::routing::{get, post};
40use axum::{Json, Router};
41use serde::{Deserialize, Serialize};
42use serde_json::Value;
43
44use smooth_operator::auth::{AuthError, Principal, Role};
45use smooth_operator::backplane::Target;
46use smooth_operator::connector_config::{ConnectorConfig, ConnectorKind};
47use smooth_operator::domain::ParticipantType;
48use smooth_operator::settings::AgentSettings;
49
50use smooth_operator_ingestion::{
51    Chunker, Connector, FileConnector, GithubAuth, GithubConnector, GithubConnectorConfig,
52    GithubVisibility, IndexingService, WebConnector,
53};
54
55use crate::embedder::{build_embedder, EmbedderConfig};
56use crate::protocol;
57use crate::state::{scoped_connector_key, AppState};
58
59/// Build the `/admin` router over the shared [`AppState`].
60pub fn router() -> Router<AppState> {
61    Router::new()
62        .route("/admin/health", get(health))
63        .route("/admin/me", get(me))
64        .route("/admin/conversations", get(list_conversations))
65        .route(
66            "/admin/conversations/{id}/messages",
67            get(conversation_messages),
68        )
69        .route("/admin/indexing/runs", get(indexing_runs))
70        .route("/admin/document-sets", get(document_sets))
71        // Write API (Phase 12, increment 3) — connector CRUD, index trigger,
72        // settings. RBAC: list/get are Curator; create/update/delete are Admin;
73        // index trigger is Curator; settings read is Curator, write is Admin.
74        .route(
75            "/admin/connectors",
76            get(list_connectors).post(create_connector),
77        )
78        .route(
79            "/admin/connectors/{id}",
80            get(get_connector)
81                .put(update_connector)
82                .delete(delete_connector),
83        )
84        .route("/admin/connectors/{id}/index", post(index_connector))
85        .route("/admin/settings", get(get_settings).put(put_settings))
86        // Realtime publish (Phase: backplane) — push an event to a backplane
87        // target over the WebSocket fleet. The plug point for non-AI publishers
88        // (job status, ingestion progress, notifications). Admin-gated.
89        .route("/admin/publish", post(publish_event))
90}
91
92// ---------------------------------------------------------------------------
93// Auth extractor — `require_role`
94// ---------------------------------------------------------------------------
95
96/// An authenticated [`Principal`] guaranteed to hold at least role `MIN`.
97///
98/// Used as an axum extractor: it reads `Authorization: Bearer <token>`, verifies
99/// it via the configured [`AuthVerifier`](smooth_operator::auth::AuthVerifier) in
100/// [`AppState`], and rejects with 401/403 if the token is missing/invalid or the
101/// role is insufficient — *before* the handler body runs. `MIN` is a const role
102/// rank: `0 = Basic`, `1 = Curator`, `2 = Admin`.
103pub struct RequireRole<const MIN: u8>(pub Principal);
104
105/// Map a [`Role`] to the const rank used by [`RequireRole`].
106const fn role_rank(role: Role) -> u8 {
107    match role {
108        Role::Basic => 0,
109        Role::Curator => 1,
110        Role::Admin => 2,
111    }
112}
113
114/// The minimum [`Role`] a const rank denotes (for error messages).
115const fn rank_role(min: u8) -> Role {
116    match min {
117        0 => Role::Basic,
118        1 => Role::Curator,
119        _ => Role::Admin,
120    }
121}
122
123impl<const MIN: u8> axum::extract::FromRequestParts<AppState> for RequireRole<MIN> {
124    type Rejection = AuthRejection;
125
126    async fn from_request_parts(
127        parts: &mut Parts,
128        state: &AppState,
129    ) -> Result<Self, Self::Rejection> {
130        let token = bearer_token(parts).ok_or(AuthRejection(AuthError::Unauthenticated))?;
131        let principal = state.auth.verify(&token).map_err(AuthRejection)?;
132        if role_rank(principal.role) < MIN {
133            return Err(AuthRejection(AuthError::Forbidden {
134                required: rank_role(MIN),
135                actual: principal.role,
136            }));
137        }
138        Ok(RequireRole(principal))
139    }
140}
141
142/// Extract the raw bearer token (without the `Bearer ` prefix) from the
143/// `Authorization` header. Returns `None` when absent or not a bearer scheme.
144fn bearer_token(parts: &Parts) -> Option<String> {
145    let header = parts.headers.get(axum::http::header::AUTHORIZATION)?;
146    let value = header.to_str().ok()?;
147    let rest = value
148        .strip_prefix("Bearer ")
149        .or_else(|| value.strip_prefix("bearer "))?;
150    let trimmed = rest.trim();
151    if trimmed.is_empty() {
152        None
153    } else {
154        Some(trimmed.to_string())
155    }
156}
157
158/// An auth/authorization rejection rendered as the protocol's `error` envelope
159/// with the right HTTP status.
160pub struct AuthRejection(AuthError);
161
162impl IntoResponse for AuthRejection {
163    fn into_response(self) -> Response {
164        let (status, code) = match &self.0 {
165            AuthError::Unauthenticated => (StatusCode::UNAUTHORIZED, "UNAUTHENTICATED"),
166            AuthError::InvalidToken(_) => (StatusCode::UNAUTHORIZED, "INVALID_TOKEN"),
167            AuthError::MissingRole(_) => (StatusCode::UNAUTHORIZED, "MISSING_ROLE"),
168            AuthError::Forbidden { .. } => (StatusCode::FORBIDDEN, "FORBIDDEN"),
169            // A misconfigured verifier is a server error, surfaced as 500 with a
170            // non-leaking message.
171            AuthError::Misconfigured(_) => {
172                (StatusCode::INTERNAL_SERVER_ERROR, "AUTH_MISCONFIGURED")
173            }
174        };
175        let body = protocol::error(None, code, &self.0.to_string());
176        (status, Json(body)).into_response()
177    }
178}
179
180/// An error from a handler body (storage failure, etc.) rendered as a 500 with
181/// the protocol error shape.
182struct AdminError(StatusCode, String, &'static str);
183
184impl IntoResponse for AdminError {
185    fn into_response(self) -> Response {
186        let body = protocol::error(None, self.2, &self.1);
187        (self.0, Json(body)).into_response()
188    }
189}
190
191impl AdminError {
192    fn internal(msg: impl Into<String>) -> Self {
193        Self(
194            StatusCode::INTERNAL_SERVER_ERROR,
195            msg.into(),
196            "INTERNAL_ERROR",
197        )
198    }
199
200    fn forbidden(msg: impl Into<String>) -> Self {
201        Self(StatusCode::FORBIDDEN, msg.into(), "FORBIDDEN")
202    }
203
204    fn not_found(msg: impl Into<String>) -> Self {
205        Self(StatusCode::NOT_FOUND, msg.into(), "NOT_FOUND")
206    }
207
208    /// A 400 with the protocol's `VALIDATION_ERROR` code — used for unknown
209    /// connector kinds, malformed config payloads, and unresolvable `auth_ref`s.
210    fn validation(msg: impl Into<String>) -> Self {
211        Self(StatusCode::BAD_REQUEST, msg.into(), "VALIDATION_ERROR")
212    }
213}
214
215// ---------------------------------------------------------------------------
216// Handlers
217// ---------------------------------------------------------------------------
218
219/// `GET /admin/health` — unauthenticated liveness probe.
220async fn health() -> Json<Value> {
221    Json(serde_json::json!({ "status": "ok" }))
222}
223
224/// `GET /admin/me` — whoami. Returns the authenticated principal (any role).
225async fn me(RequireRole::<0>(principal): RequireRole<0>) -> Json<Principal> {
226    Json(principal)
227}
228
229/// Query params for `GET /admin/conversations`.
230#[derive(Debug, Deserialize)]
231struct ConversationsQuery {
232    /// Max conversations to return (defaults to 50, capped at 200).
233    limit: Option<usize>,
234    /// Opaque cursor: the index to start from (simple offset paging over the
235    /// org-scoped, newest-first list). `None` ⇒ start at the beginning.
236    cursor: Option<usize>,
237}
238
239/// A conversation row in the admin list response.
240#[derive(Debug, Serialize)]
241#[serde(rename_all = "camelCase")]
242struct ConversationRow {
243    id: String,
244    name: String,
245    platform: String,
246    created_at: chrono::DateTime<chrono::Utc>,
247    updated_at: chrono::DateTime<chrono::Utc>,
248}
249
250/// The `GET /admin/conversations` response envelope.
251#[derive(Debug, Serialize)]
252#[serde(rename_all = "camelCase")]
253struct ConversationsResponse {
254    conversations: Vec<ConversationRow>,
255    /// Opaque cursor for the next page, or `null` when exhausted.
256    next_cursor: Option<usize>,
257}
258
259/// `GET /admin/conversations` — chat history, org-scoped. Admin/Curator see the
260/// whole org; Basic sees only conversations they own.
261async fn list_conversations(
262    RequireRole::<0>(principal): RequireRole<0>,
263    State(state): State<AppState>,
264    Query(q): Query<ConversationsQuery>,
265) -> Result<Json<ConversationsResponse>, AdminError> {
266    let limit = q.limit.unwrap_or(50).clamp(1, 200);
267    let offset = q.cursor.unwrap_or(0);
268
269    let all = state
270        .storage
271        .list_conversations_by_org(&principal.org_id)
272        .await
273        .map_err(|e| AdminError::internal(format!("list conversations failed: {e}")))?;
274
275    // Basic callers only see conversations they own.
276    let visible: Vec<_> = if principal.role >= Role::Curator {
277        all
278    } else {
279        let mut owned = Vec::new();
280        for conv in all {
281            if conversation_owned_by(&state, &conv.id, &principal.user_id).await {
282                owned.push(conv);
283            }
284        }
285        owned
286    };
287
288    let total = visible.len();
289    let page: Vec<ConversationRow> = visible
290        .into_iter()
291        .skip(offset)
292        .take(limit)
293        .map(|c| ConversationRow {
294            id: c.id,
295            name: c.name,
296            platform: format!("{:?}", c.platform).to_lowercase(),
297            created_at: c.created_at,
298            updated_at: c.updated_at,
299        })
300        .collect();
301
302    let next = offset + page.len();
303    let next_cursor = if next < total { Some(next) } else { None };
304
305    Ok(Json(ConversationsResponse {
306        conversations: page,
307        next_cursor,
308    }))
309}
310
311/// `GET /admin/conversations/{id}/messages` — messages for one conversation,
312/// role-scoped (a Basic caller must own the conversation).
313async fn conversation_messages(
314    RequireRole::<0>(principal): RequireRole<0>,
315    State(state): State<AppState>,
316    Path(conversation_id): Path<String>,
317) -> Result<Json<Value>, AdminError> {
318    // The conversation must exist + belong to the caller's org.
319    let conv = state
320        .storage
321        .get_conversation(&conversation_id)
322        .await
323        .map_err(|e| AdminError::internal(format!("get conversation failed: {e}")))?
324        .ok_or_else(|| {
325            AdminError::not_found(format!("conversation '{conversation_id}' not found"))
326        })?;
327
328    if conv.organization_id != principal.org_id {
329        // Don't leak existence across orgs — 404, not 403.
330        return Err(AdminError::not_found(format!(
331            "conversation '{conversation_id}' not found"
332        )));
333    }
334
335    // Basic callers may only read conversations they own.
336    if principal.role < Role::Curator
337        && !conversation_owned_by(&state, &conversation_id, &principal.user_id).await
338    {
339        return Err(AdminError::forbidden(
340            "you do not have access to this conversation",
341        ));
342    }
343
344    let query = smooth_operator::adapter::MessageQuery::new(&conversation_id, 200);
345    let page = state
346        .storage
347        .list_messages_by_conversation(query)
348        .await
349        .map_err(|e| AdminError::internal(format!("list messages failed: {e}")))?;
350
351    Ok(Json(serde_json::json!({
352        "conversationId": conversation_id,
353        "messages": page.messages,
354        "nextCursor": page.next_cursor,
355    })))
356}
357
358/// `GET /admin/indexing/runs` — indexing-run status across **the caller's org's**
359/// connectors. Curator+ only.
360///
361/// Org-scoped two ways (cross-org leak fix): we iterate only the principal's
362/// org's connectors, and we query the indexing store under the **org-namespaced**
363/// key ([`scoped_connector_key`]) so a same-named connector in another org can't
364/// surface its runs here. The reported `connectorName` is the un-scoped display
365/// name (the namespace is an internal storage key, never exposed).
366async fn indexing_runs(
367    RequireRole::<1>(principal): RequireRole<1>,
368    State(state): State<AppState>,
369) -> Json<Value> {
370    let mut runs = Vec::new();
371    for connector in state.connectors(&principal.org_id) {
372        let key = scoped_connector_key(&principal.org_id, &connector);
373        for run in state.indexing.list_runs(&key) {
374            runs.push(serde_json::json!({
375                "id": run.id,
376                // Report the display name, never the internal org-namespaced key.
377                "connectorName": connector,
378                "status": format!("{:?}", run.status).to_lowercase(),
379                "startedAt": run.started_at,
380                "finishedAt": run.finished_at,
381                "documentsSeen": run.documents_seen,
382                "chunksIndexed": run.chunks_indexed,
383                "documentsSkipped": run.documents_skipped,
384                "cursor": run.cursor,
385                "error": run.error,
386            }));
387        }
388    }
389    Json(serde_json::json!({ "runs": runs }))
390}
391
392/// A document-set row.
393#[derive(Debug, Serialize)]
394#[serde(rename_all = "camelCase")]
395struct DocumentSetRow {
396    name: String,
397    document_count: usize,
398}
399
400/// `GET /admin/document-sets` — distinct document-set names + doc counts for
401/// **the caller's org**. Curator+ only. Org-scoped so org A's document sets are
402/// never reported to an org-B caller (cross-org leak fix).
403async fn document_sets(
404    RequireRole::<1>(principal): RequireRole<1>,
405    State(state): State<AppState>,
406) -> Json<Value> {
407    let sets: Vec<DocumentSetRow> = state
408        .document_sets(&principal.org_id)
409        .into_iter()
410        .map(|(name, document_count)| DocumentSetRow {
411            name,
412            document_count,
413        })
414        .collect();
415    Json(serde_json::json!({ "documentSets": sets }))
416}
417
418// ---------------------------------------------------------------------------
419// Connector config CRUD (Phase 12, increment 3)
420// ---------------------------------------------------------------------------
421
422/// The wire body for create/update of a connector. `kind` is validated against
423/// [`ConnectorKind`]; `config` is the kind-specific free-form payload (may carry
424/// an `auth_ref` naming a secret — never the secret itself).
425#[derive(Debug, Deserialize)]
426struct ConnectorWrite {
427    name: String,
428    kind: String,
429    #[serde(default)]
430    config: Value,
431    #[serde(default = "default_enabled")]
432    enabled: bool,
433}
434
435const fn default_enabled() -> bool {
436    true
437}
438
439/// Serialize a [`ConnectorConfig`] for an API response under a `connector` key.
440///
441/// The stored `config` is echoed as-is — it only ever holds an `auth_ref` *name*,
442/// never a secret value, so this can never leak a credential.
443fn connector_json(cfg: &ConnectorConfig) -> Value {
444    serde_json::json!({
445        "connector": {
446            "id": cfg.id,
447            "name": cfg.name,
448            "kind": cfg.kind.as_str(),
449            "config": cfg.config,
450            "enabled": cfg.enabled,
451            "createdAt": cfg.created_at,
452            "updatedAt": cfg.updated_at,
453        }
454    })
455}
456
457/// Validate `(kind, config)` and surface a clean 400 on an unknown kind or a
458/// payload missing the fields that kind needs to build a connector.
459fn validate_connector(kind: ConnectorKind, config: &Value) -> Result<(), AdminError> {
460    let missing = |field: &str| {
461        AdminError::validation(format!(
462            "{} connector config requires a '{field}' field",
463            kind.as_str()
464        ))
465    };
466    match kind {
467        ConnectorKind::Github => {
468            if config.get("owner").and_then(Value::as_str).is_none() {
469                return Err(missing("owner"));
470            }
471            if config.get("repo").and_then(Value::as_str).is_none() {
472                return Err(missing("repo"));
473            }
474        }
475        ConnectorKind::Web => {
476            if config.get("url").and_then(Value::as_str).is_none() {
477                return Err(missing("url"));
478            }
479        }
480        ConnectorKind::File => {
481            if config.get("path").and_then(Value::as_str).is_none() {
482                return Err(missing("path"));
483            }
484        }
485    }
486    Ok(())
487}
488
489/// `GET /admin/connectors` — list this org's connectors (Curator+).
490async fn list_connectors(
491    RequireRole::<1>(principal): RequireRole<1>,
492    State(state): State<AppState>,
493) -> Json<Value> {
494    let connectors: Vec<Value> = state
495        .connector_configs
496        .list(&principal.org_id)
497        .iter()
498        .map(|c| connector_json(c)["connector"].clone())
499        .collect();
500    Json(serde_json::json!({ "connectors": connectors }))
501}
502
503/// `GET /admin/connectors/{id}` — one connector, org-scoped (Curator+). A
504/// cross-org / unknown id is a 404 (existence not leaked across orgs).
505async fn get_connector(
506    RequireRole::<1>(principal): RequireRole<1>,
507    State(state): State<AppState>,
508    Path(id): Path<String>,
509) -> Result<Json<Value>, AdminError> {
510    let cfg = state
511        .connector_configs
512        .get(&principal.org_id, &id)
513        .ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
514    Ok(Json(connector_json(&cfg)))
515}
516
517/// `POST /admin/connectors` — create a connector (Admin only). Returns 201 with
518/// the created connector (a fresh uuid id, org from the principal).
519async fn create_connector(
520    RequireRole::<2>(principal): RequireRole<2>,
521    State(state): State<AppState>,
522    Json(body): Json<ConnectorWrite>,
523) -> Result<Response, AdminError> {
524    let kind = ConnectorKind::parse(&body.kind)
525        .map_err(|bad| AdminError::validation(format!("unknown connector kind '{bad}'")))?;
526    validate_connector(kind, &body.config)?;
527
528    let now = chrono::Utc::now();
529    let cfg = ConnectorConfig {
530        id: uuid::Uuid::new_v4().to_string(),
531        org_id: principal.org_id.clone(),
532        name: body.name,
533        kind,
534        config: body.config,
535        enabled: body.enabled,
536        created_at: now,
537        updated_at: now,
538    };
539    state.connector_configs.upsert(cfg.clone());
540    // Record the connector name under the caller's org so its runs are listed by
541    // /admin/indexing/runs — and ONLY for this org (cross-org scoping).
542    state.record_connector(principal.org_id.clone(), cfg.name.clone());
543    Ok((StatusCode::CREATED, Json(connector_json(&cfg))).into_response())
544}
545
546/// `PUT /admin/connectors/{id}` — update a connector (Admin only). The id +
547/// `created_at` are preserved; a cross-org / unknown id is a 404.
548async fn update_connector(
549    RequireRole::<2>(principal): RequireRole<2>,
550    State(state): State<AppState>,
551    Path(id): Path<String>,
552    Json(body): Json<ConnectorWrite>,
553) -> Result<Json<Value>, AdminError> {
554    let existing = state
555        .connector_configs
556        .get(&principal.org_id, &id)
557        .ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
558
559    let kind = ConnectorKind::parse(&body.kind)
560        .map_err(|bad| AdminError::validation(format!("unknown connector kind '{bad}'")))?;
561    validate_connector(kind, &body.config)?;
562
563    let cfg = ConnectorConfig {
564        id: existing.id,
565        org_id: existing.org_id,
566        name: body.name,
567        kind,
568        config: body.config,
569        enabled: body.enabled,
570        created_at: existing.created_at,
571        updated_at: chrono::Utc::now(),
572    };
573    state.connector_configs.upsert(cfg.clone());
574    state.record_connector(principal.org_id.clone(), cfg.name.clone());
575    Ok(Json(connector_json(&cfg)))
576}
577
578/// `DELETE /admin/connectors/{id}` — remove a connector (Admin only). 204 on
579/// success; a cross-org / unknown id is a 404.
580async fn delete_connector(
581    RequireRole::<2>(principal): RequireRole<2>,
582    State(state): State<AppState>,
583    Path(id): Path<String>,
584) -> Result<Response, AdminError> {
585    if state.connector_configs.delete(&principal.org_id, &id) {
586        Ok(StatusCode::NO_CONTENT.into_response())
587    } else {
588        Err(AdminError::not_found(format!("connector '{id}' not found")))
589    }
590}
591
592/// `POST /admin/connectors/{id}/index` — build the connector from its stored
593/// config and run one indexing pass (Curator+).
594///
595/// For `github`, the token is resolved from `auth_ref` → env at *this* moment
596/// (never persisted). An unresolvable `auth_ref` returns a clean 400 *before*
597/// any GitHub call — no panic, no network. The resulting [`IndexingRun`] is
598/// recorded in the shared `IndexingStore` (so it also shows in
599/// `GET /admin/indexing/runs`) and returned.
600async fn index_connector(
601    RequireRole::<1>(principal): RequireRole<1>,
602    State(state): State<AppState>,
603    Path(id): Path<String>,
604) -> Result<Json<Value>, AdminError> {
605    let cfg = state
606        .connector_configs
607        .get(&principal.org_id, &id)
608        .ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
609
610    // Build the live connector from the stored config (resolving any secret from
611    // env at this moment). A validation failure here is a clean 400.
612    //
613    // The connector is named with an ORG-NAMESPACED key so the indexing run is
614    // recorded in the store under `IXCONN#<org>...<name>` — a same-named
615    // connector in another org records + lists separately (cross-org scoping).
616    // The display name (`cfg.name`) is rewritten back into the response below.
617    let scoped_key = scoped_connector_key(&principal.org_id, &cfg.name);
618    let connector = build_connector(&cfg, &scoped_key)?;
619
620    let service = IndexingService::new(principal.org_id.clone());
621    let chunker = Chunker::default();
622    // Select the embedder from config: the real semantic GatewayEmbedder (1536-d)
623    // when the gateway is keyed, else the network-free DeterministicEmbedder
624    // (1024-d) with a loud warning. The knowledge store the docs land in was
625    // created with this same embedder's dim by the storage-backend wiring
626    // (`build_state_from_env_async`), so document and query vectors agree.
627    let embedder = build_embedder(&EmbedderConfig::from_server_config(&state.config));
628    let knowledge = state.storage.knowledge();
629
630    let run = service
631        .run_once(
632            connector.as_ref(),
633            state.indexing.as_ref(),
634            &chunker,
635            embedder.as_ref(),
636            knowledge,
637        )
638        .await
639        .map_err(|e| AdminError::internal(format!("indexing failed: {e}")))?;
640
641    // Surface the connector for /admin/indexing/runs listing (org-scoped).
642    state.record_connector(principal.org_id.clone(), cfg.name.clone());
643
644    Ok(Json(serde_json::json!({
645        "run": {
646            "id": run.id,
647            // Report the display name, never the internal org-namespaced key.
648            "connectorName": cfg.name,
649            "status": format!("{:?}", run.status).to_lowercase(),
650            "startedAt": run.started_at,
651            "finishedAt": run.finished_at,
652            "documentsSeen": run.documents_seen,
653            "chunksIndexed": run.chunks_indexed,
654            "documentsSkipped": run.documents_skipped,
655            "cursor": run.cursor,
656            "error": run.error,
657        }
658    })))
659}
660
661/// Build a live [`Connector`] from a stored [`ConnectorConfig`], resolving any
662/// secret named by `auth_ref` from the environment *now* (never persisted).
663///
664/// `connector_name` is the name stamped onto the built connector — the caller
665/// passes an **org-namespaced** key so the indexing run is recorded per-org
666/// (cross-org scoping), keeping the human display name out of the storage key.
667///
668/// Returns a clean 400 ([`AdminError::validation`]) — never a panic — for a
669/// malformed config or an unresolvable `auth_ref`, so `/index` can surface the
670/// problem without touching the network.
671fn build_connector(
672    cfg: &ConnectorConfig,
673    connector_name: &str,
674) -> Result<Box<dyn Connector>, AdminError> {
675    let connector_name = connector_name.to_string();
676    match cfg.kind {
677        ConnectorKind::Web => {
678            let url = cfg
679                .config
680                .get("url")
681                .and_then(Value::as_str)
682                .ok_or_else(|| AdminError::validation("web connector requires a 'url'"))?;
683            Ok(Box::new(NamedConnector::new(
684                connector_name,
685                WebConnector::new(url),
686            )))
687        }
688        ConnectorKind::File => {
689            let path = cfg
690                .config
691                .get("path")
692                .and_then(Value::as_str)
693                .ok_or_else(|| AdminError::validation("file connector requires a 'path'"))?;
694            Ok(Box::new(NamedConnector::new(
695                connector_name,
696                FileConnector::new(path),
697            )))
698        }
699        ConnectorKind::Github => {
700            let owner = cfg
701                .config
702                .get("owner")
703                .and_then(Value::as_str)
704                .ok_or_else(|| AdminError::validation("github connector requires an 'owner'"))?;
705            let repo = cfg
706                .config
707                .get("repo")
708                .and_then(Value::as_str)
709                .ok_or_else(|| AdminError::validation("github connector requires a 'repo'"))?;
710
711            // Resolve the token from auth_ref→env. A private repo MUST have a
712            // resolvable token; a public repo may index unauthenticated.
713            let visibility = match cfg.config.get("visibility").and_then(Value::as_str) {
714                Some("private") => GithubVisibility::Private,
715                _ => GithubVisibility::Public,
716            };
717            let auth = resolve_github_auth(cfg, visibility)?;
718
719            let mut gh = GithubConnectorConfig::new(owner, repo, auth).visibility(visibility);
720            if let Some(r) = cfg.config.get("ref").and_then(Value::as_str) {
721                gh = gh.at_ref(r);
722            }
723            Ok(Box::new(NamedConnector::new(
724                connector_name,
725                GithubConnector::new(gh),
726            )))
727        }
728    }
729}
730
731/// Resolve a [`GithubAuth`] from the connector's `auth_ref` → env var.
732///
733/// - `auth_ref` set + env present ⇒ `Token`.
734/// - `auth_ref` set but env **missing/empty** ⇒ a clean 400 (no GitHub call).
735/// - no `auth_ref`: a **public** repo indexes `Unauthenticated`; a **private**
736///   repo is a 400 (a private repo needs a credential).
737fn resolve_github_auth(
738    cfg: &ConnectorConfig,
739    visibility: GithubVisibility,
740) -> Result<GithubAuth, AdminError> {
741    match cfg.auth_ref() {
742        Some(name) => match std::env::var(name) {
743            Ok(token) if !token.trim().is_empty() => Ok(GithubAuth::Token(token)),
744            _ => Err(AdminError::validation(format!(
745                "github connector auth_ref '{name}' did not resolve to a token \
746                 (set the named secret/env var); refusing to index"
747            ))),
748        },
749        None => match visibility {
750            GithubVisibility::Public => Ok(GithubAuth::Unauthenticated),
751            GithubVisibility::Private => Err(AdminError::validation(
752                "github connector for a private repo requires an 'auth_ref' \
753                 naming a token secret",
754            )),
755        },
756    }
757}
758
759/// Wraps a connector to override its `name()` with the configured connector
760/// name, so the indexing run + its `/admin/indexing/runs` row are keyed by the
761/// human-chosen connector name (not the generic `"web"` / `"file"` / `"github"`
762/// kind label). Delegates `pull` unchanged.
763struct NamedConnector<C: Connector> {
764    name: String,
765    inner: C,
766}
767
768impl<C: Connector> NamedConnector<C> {
769    fn new(name: String, inner: C) -> Self {
770        Self { name, inner }
771    }
772}
773
774#[async_trait::async_trait]
775impl<C: Connector> Connector for NamedConnector<C> {
776    fn name(&self) -> &str {
777        &self.name
778    }
779
780    async fn pull(
781        &self,
782        since: Option<smooth_operator_ingestion::Timestamp>,
783    ) -> anyhow::Result<Vec<smooth_operator_ingestion::RawDocument>> {
784        self.inner.pull(since).await
785    }
786}
787
788// ---------------------------------------------------------------------------
789// Agent settings (Phase 12, increment 3)
790// ---------------------------------------------------------------------------
791
792/// The wire body for `PUT /admin/settings`.
793#[derive(Debug, Deserialize)]
794#[serde(rename_all = "camelCase")]
795struct SettingsWrite {
796    model: String,
797    system_prompt: String,
798    /// Optional per-org agent persona override (see [`AgentSettings::persona`]).
799    /// Omitted ⇒ `None` ⇒ the runner stays on its built-in const prompt.
800    #[serde(default)]
801    persona: Option<String>,
802    #[serde(default)]
803    default_tools: Vec<String>,
804}
805
806/// Serialize [`AgentSettings`] under a `settings` key.
807fn settings_json(s: &AgentSettings) -> Value {
808    serde_json::json!({
809        "settings": {
810            "orgId": s.org_id,
811            "model": s.model,
812            "systemPrompt": s.system_prompt,
813            "persona": s.persona,
814            "defaultTools": s.default_tools,
815            "updatedAt": s.updated_at,
816        }
817    })
818}
819
820/// `GET /admin/settings` — the org's agent settings (defaults if unset). Curator+.
821async fn get_settings(
822    RequireRole::<1>(principal): RequireRole<1>,
823    State(state): State<AppState>,
824) -> Json<Value> {
825    let settings = state.settings.get(&principal.org_id);
826    Json(settings_json(&settings))
827}
828
829/// `PUT /admin/settings` — replace the org's agent settings (Admin only).
830async fn put_settings(
831    RequireRole::<2>(principal): RequireRole<2>,
832    State(state): State<AppState>,
833    Json(body): Json<SettingsWrite>,
834) -> Json<Value> {
835    let settings = AgentSettings {
836        org_id: principal.org_id.clone(),
837        model: body.model,
838        system_prompt: body.system_prompt,
839        persona: body.persona,
840        default_tools: body.default_tools,
841        updated_at: chrono::Utc::now(),
842    };
843    state.settings.put(settings.clone());
844    Json(settings_json(&settings))
845}
846
847// ---------------------------------------------------------------------------
848// Helpers
849// ---------------------------------------------------------------------------
850
851/// Whether `user_id` owns the conversation — true when a `User` participant in
852/// the conversation carries `external_id == user_id`.
853async fn conversation_owned_by(state: &AppState, conversation_id: &str, user_id: &str) -> bool {
854    match state
855        .storage
856        .list_participants_by_conversation(conversation_id)
857        .await
858    {
859        Ok(parts) => parts.iter().any(|p| {
860            p.participant_type == ParticipantType::User && p.external_id.as_deref() == Some(user_id)
861        }),
862        Err(_) => false,
863    }
864}
865
866// ---------------------------------------------------------------------------
867// Realtime publish — `POST /admin/publish`
868// ---------------------------------------------------------------------------
869
870/// A delivery target in the publish request, in a friendlier `{type, id}` shape
871/// than [`Target`]'s default enum serialization.
872#[derive(Deserialize)]
873#[serde(tag = "type", content = "id", rename_all = "snake_case")]
874enum PublishTarget {
875    Connection(String),
876    Session(String),
877    User(String),
878    Org(String),
879    Agent(String),
880}
881
882impl From<PublishTarget> for Target {
883    fn from(t: PublishTarget) -> Self {
884        match t {
885            PublishTarget::Connection(id) => Target::Connection(id),
886            PublishTarget::Session(id) => Target::Session(id),
887            PublishTarget::User(id) => Target::User(id),
888            PublishTarget::Org(id) => Target::Org(id),
889            PublishTarget::Agent(id) => Target::Agent(id),
890        }
891    }
892}
893
894/// `POST /admin/publish` body: the [`PublishTarget`] and the event payload to
895/// deliver verbatim to every connection for that target.
896#[derive(Deserialize)]
897struct PublishRequest {
898    target: PublishTarget,
899    event: Value,
900}
901
902/// `POST /admin/publish` response.
903#[derive(Serialize)]
904struct PublishResponse {
905    /// Connections this **pod** delivered to. With a distributed backplane the
906    /// event also fans out to connections on other pods, which this count omits
907    /// (each pod delivers to its own sockets) — so `0` here does NOT mean
908    /// "delivered to nobody", only "nobody on the pod that served this request".
909    delivered: usize,
910}
911
912/// Push a realtime event to a backplane target over the WebSocket fleet — the
913/// plug point for **non-AI publishers** (job status, ingestion progress,
914/// notifications, billing): any service can deliver to a connected client
915/// without going through an agent turn.
916///
917/// Admin-gated. Targets are opaque ids matched against the backplane's
918/// connection registry; this layer does not org-validate session/user/agent ids
919/// (the backplane is an id-routing layer, not an authz layer). A host that needs
920/// hard tenant isolation namespaces those ids in its own wrapper before they
921/// reach the backplane. Callers are trusted internal services holding an Admin
922/// credential.
923async fn publish_event(
924    RequireRole::<2>(_principal): RequireRole<2>,
925    State(state): State<AppState>,
926    Json(body): Json<PublishRequest>,
927) -> Json<PublishResponse> {
928    let delivered = state
929        .backplane
930        .publish(body.target.into(), body.event)
931        .await;
932    Json(PublishResponse { delivered })
933}