1use axum::extract::{Path, Query, State};
36use axum::http::request::Parts;
37use axum::http::StatusCode;
38use axum::response::{IntoResponse, Response};
39use axum::routing::{get, post};
40use axum::{Json, Router};
41use serde::{Deserialize, Serialize};
42use serde_json::Value;
43
44use smooth_operator::auth::{AuthError, Principal, Role};
45use smooth_operator::backplane::Target;
46use smooth_operator::connector_config::{ConnectorConfig, ConnectorKind};
47use smooth_operator::domain::ParticipantType;
48use smooth_operator::settings::AgentSettings;
49
50use smooth_operator_ingestion::{
51 Chunker, Connector, FileConnector, GithubAuth, GithubConnector, GithubConnectorConfig,
52 GithubVisibility, IndexingService, WebConnector,
53};
54
55use crate::embedder::{build_embedder, EmbedderConfig};
56use crate::protocol;
57use crate::state::{scoped_connector_key, AppState};
58
59pub fn router() -> Router<AppState> {
61 Router::new()
62 .route("/admin/health", get(health))
63 .route("/admin/me", get(me))
64 .route("/admin/conversations", get(list_conversations))
65 .route(
66 "/admin/conversations/{id}/messages",
67 get(conversation_messages),
68 )
69 .route("/admin/indexing/runs", get(indexing_runs))
70 .route("/admin/document-sets", get(document_sets))
71 .route(
75 "/admin/connectors",
76 get(list_connectors).post(create_connector),
77 )
78 .route(
79 "/admin/connectors/{id}",
80 get(get_connector)
81 .put(update_connector)
82 .delete(delete_connector),
83 )
84 .route("/admin/connectors/{id}/index", post(index_connector))
85 .route("/admin/settings", get(get_settings).put(put_settings))
86 .route("/admin/publish", post(publish_event))
90}
91
92pub struct RequireRole<const MIN: u8>(pub Principal);
104
105const fn role_rank(role: Role) -> u8 {
107 match role {
108 Role::Basic => 0,
109 Role::Curator => 1,
110 Role::Admin => 2,
111 }
112}
113
114const fn rank_role(min: u8) -> Role {
116 match min {
117 0 => Role::Basic,
118 1 => Role::Curator,
119 _ => Role::Admin,
120 }
121}
122
123impl<const MIN: u8> axum::extract::FromRequestParts<AppState> for RequireRole<MIN> {
124 type Rejection = AuthRejection;
125
126 async fn from_request_parts(
127 parts: &mut Parts,
128 state: &AppState,
129 ) -> Result<Self, Self::Rejection> {
130 let token = bearer_token(parts).ok_or(AuthRejection(AuthError::Unauthenticated))?;
131 let principal = state.auth.verify(&token).map_err(AuthRejection)?;
132 if role_rank(principal.role) < MIN {
133 return Err(AuthRejection(AuthError::Forbidden {
134 required: rank_role(MIN),
135 actual: principal.role,
136 }));
137 }
138 Ok(RequireRole(principal))
139 }
140}
141
142fn bearer_token(parts: &Parts) -> Option<String> {
145 let header = parts.headers.get(axum::http::header::AUTHORIZATION)?;
146 let value = header.to_str().ok()?;
147 let rest = value
148 .strip_prefix("Bearer ")
149 .or_else(|| value.strip_prefix("bearer "))?;
150 let trimmed = rest.trim();
151 if trimmed.is_empty() {
152 None
153 } else {
154 Some(trimmed.to_string())
155 }
156}
157
158pub struct AuthRejection(AuthError);
161
162impl IntoResponse for AuthRejection {
163 fn into_response(self) -> Response {
164 let (status, code) = match &self.0 {
165 AuthError::Unauthenticated => (StatusCode::UNAUTHORIZED, "UNAUTHENTICATED"),
166 AuthError::InvalidToken(_) => (StatusCode::UNAUTHORIZED, "INVALID_TOKEN"),
167 AuthError::MissingRole(_) => (StatusCode::UNAUTHORIZED, "MISSING_ROLE"),
168 AuthError::Forbidden { .. } => (StatusCode::FORBIDDEN, "FORBIDDEN"),
169 AuthError::Misconfigured(_) => {
172 (StatusCode::INTERNAL_SERVER_ERROR, "AUTH_MISCONFIGURED")
173 }
174 };
175 let body = protocol::error(None, code, &self.0.to_string());
176 (status, Json(body)).into_response()
177 }
178}
179
180struct AdminError(StatusCode, String, &'static str);
183
184impl IntoResponse for AdminError {
185 fn into_response(self) -> Response {
186 let body = protocol::error(None, self.2, &self.1);
187 (self.0, Json(body)).into_response()
188 }
189}
190
191impl AdminError {
192 fn internal(msg: impl Into<String>) -> Self {
193 Self(
194 StatusCode::INTERNAL_SERVER_ERROR,
195 msg.into(),
196 "INTERNAL_ERROR",
197 )
198 }
199
200 fn forbidden(msg: impl Into<String>) -> Self {
201 Self(StatusCode::FORBIDDEN, msg.into(), "FORBIDDEN")
202 }
203
204 fn not_found(msg: impl Into<String>) -> Self {
205 Self(StatusCode::NOT_FOUND, msg.into(), "NOT_FOUND")
206 }
207
208 fn validation(msg: impl Into<String>) -> Self {
211 Self(StatusCode::BAD_REQUEST, msg.into(), "VALIDATION_ERROR")
212 }
213}
214
215async fn health() -> Json<Value> {
221 Json(serde_json::json!({ "status": "ok" }))
222}
223
224async fn me(RequireRole::<0>(principal): RequireRole<0>) -> Json<Principal> {
226 Json(principal)
227}
228
229#[derive(Debug, Deserialize)]
231struct ConversationsQuery {
232 limit: Option<usize>,
234 cursor: Option<usize>,
237}
238
239#[derive(Debug, Serialize)]
241#[serde(rename_all = "camelCase")]
242struct ConversationRow {
243 id: String,
244 name: String,
245 platform: String,
246 created_at: chrono::DateTime<chrono::Utc>,
247 updated_at: chrono::DateTime<chrono::Utc>,
248}
249
250#[derive(Debug, Serialize)]
252#[serde(rename_all = "camelCase")]
253struct ConversationsResponse {
254 conversations: Vec<ConversationRow>,
255 next_cursor: Option<usize>,
257}
258
259async fn list_conversations(
262 RequireRole::<0>(principal): RequireRole<0>,
263 State(state): State<AppState>,
264 Query(q): Query<ConversationsQuery>,
265) -> Result<Json<ConversationsResponse>, AdminError> {
266 let limit = q.limit.unwrap_or(50).clamp(1, 200);
267 let offset = q.cursor.unwrap_or(0);
268
269 let all = state
270 .storage
271 .list_conversations_by_org(&principal.org_id)
272 .await
273 .map_err(|e| AdminError::internal(format!("list conversations failed: {e}")))?;
274
275 let visible: Vec<_> = if principal.role >= Role::Curator {
277 all
278 } else {
279 let mut owned = Vec::new();
280 for conv in all {
281 if conversation_owned_by(&state, &conv.id, &principal.user_id).await {
282 owned.push(conv);
283 }
284 }
285 owned
286 };
287
288 let total = visible.len();
289 let page: Vec<ConversationRow> = visible
290 .into_iter()
291 .skip(offset)
292 .take(limit)
293 .map(|c| ConversationRow {
294 id: c.id,
295 name: c.name,
296 platform: format!("{:?}", c.platform).to_lowercase(),
297 created_at: c.created_at,
298 updated_at: c.updated_at,
299 })
300 .collect();
301
302 let next = offset + page.len();
303 let next_cursor = if next < total { Some(next) } else { None };
304
305 Ok(Json(ConversationsResponse {
306 conversations: page,
307 next_cursor,
308 }))
309}
310
311async fn conversation_messages(
314 RequireRole::<0>(principal): RequireRole<0>,
315 State(state): State<AppState>,
316 Path(conversation_id): Path<String>,
317) -> Result<Json<Value>, AdminError> {
318 let conv = state
320 .storage
321 .get_conversation(&conversation_id)
322 .await
323 .map_err(|e| AdminError::internal(format!("get conversation failed: {e}")))?
324 .ok_or_else(|| {
325 AdminError::not_found(format!("conversation '{conversation_id}' not found"))
326 })?;
327
328 if conv.organization_id != principal.org_id {
329 return Err(AdminError::not_found(format!(
331 "conversation '{conversation_id}' not found"
332 )));
333 }
334
335 if principal.role < Role::Curator
337 && !conversation_owned_by(&state, &conversation_id, &principal.user_id).await
338 {
339 return Err(AdminError::forbidden(
340 "you do not have access to this conversation",
341 ));
342 }
343
344 let query = smooth_operator::adapter::MessageQuery::new(&conversation_id, 200);
345 let page = state
346 .storage
347 .list_messages_by_conversation(query)
348 .await
349 .map_err(|e| AdminError::internal(format!("list messages failed: {e}")))?;
350
351 Ok(Json(serde_json::json!({
352 "conversationId": conversation_id,
353 "messages": page.messages,
354 "nextCursor": page.next_cursor,
355 })))
356}
357
358async fn indexing_runs(
367 RequireRole::<1>(principal): RequireRole<1>,
368 State(state): State<AppState>,
369) -> Json<Value> {
370 let mut runs = Vec::new();
371 for connector in state.connectors(&principal.org_id) {
372 let key = scoped_connector_key(&principal.org_id, &connector);
373 for run in state.indexing.list_runs(&key) {
374 runs.push(serde_json::json!({
375 "id": run.id,
376 "connectorName": connector,
378 "status": format!("{:?}", run.status).to_lowercase(),
379 "startedAt": run.started_at,
380 "finishedAt": run.finished_at,
381 "documentsSeen": run.documents_seen,
382 "chunksIndexed": run.chunks_indexed,
383 "documentsSkipped": run.documents_skipped,
384 "cursor": run.cursor,
385 "error": run.error,
386 }));
387 }
388 }
389 Json(serde_json::json!({ "runs": runs }))
390}
391
392#[derive(Debug, Serialize)]
394#[serde(rename_all = "camelCase")]
395struct DocumentSetRow {
396 name: String,
397 document_count: usize,
398}
399
400async fn document_sets(
404 RequireRole::<1>(principal): RequireRole<1>,
405 State(state): State<AppState>,
406) -> Json<Value> {
407 let sets: Vec<DocumentSetRow> = state
408 .document_sets(&principal.org_id)
409 .into_iter()
410 .map(|(name, document_count)| DocumentSetRow {
411 name,
412 document_count,
413 })
414 .collect();
415 Json(serde_json::json!({ "documentSets": sets }))
416}
417
418#[derive(Debug, Deserialize)]
426struct ConnectorWrite {
427 name: String,
428 kind: String,
429 #[serde(default)]
430 config: Value,
431 #[serde(default = "default_enabled")]
432 enabled: bool,
433}
434
435const fn default_enabled() -> bool {
436 true
437}
438
439fn connector_json(cfg: &ConnectorConfig) -> Value {
444 serde_json::json!({
445 "connector": {
446 "id": cfg.id,
447 "name": cfg.name,
448 "kind": cfg.kind.as_str(),
449 "config": cfg.config,
450 "enabled": cfg.enabled,
451 "createdAt": cfg.created_at,
452 "updatedAt": cfg.updated_at,
453 }
454 })
455}
456
457fn validate_connector(kind: ConnectorKind, config: &Value) -> Result<(), AdminError> {
460 let missing = |field: &str| {
461 AdminError::validation(format!(
462 "{} connector config requires a '{field}' field",
463 kind.as_str()
464 ))
465 };
466 match kind {
467 ConnectorKind::Github => {
468 if config.get("owner").and_then(Value::as_str).is_none() {
469 return Err(missing("owner"));
470 }
471 if config.get("repo").and_then(Value::as_str).is_none() {
472 return Err(missing("repo"));
473 }
474 }
475 ConnectorKind::Web => {
476 if config.get("url").and_then(Value::as_str).is_none() {
477 return Err(missing("url"));
478 }
479 }
480 ConnectorKind::File => {
481 if config.get("path").and_then(Value::as_str).is_none() {
482 return Err(missing("path"));
483 }
484 }
485 }
486 Ok(())
487}
488
489async fn list_connectors(
491 RequireRole::<1>(principal): RequireRole<1>,
492 State(state): State<AppState>,
493) -> Json<Value> {
494 let connectors: Vec<Value> = state
495 .connector_configs
496 .list(&principal.org_id)
497 .iter()
498 .map(|c| connector_json(c)["connector"].clone())
499 .collect();
500 Json(serde_json::json!({ "connectors": connectors }))
501}
502
503async fn get_connector(
506 RequireRole::<1>(principal): RequireRole<1>,
507 State(state): State<AppState>,
508 Path(id): Path<String>,
509) -> Result<Json<Value>, AdminError> {
510 let cfg = state
511 .connector_configs
512 .get(&principal.org_id, &id)
513 .ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
514 Ok(Json(connector_json(&cfg)))
515}
516
517async fn create_connector(
520 RequireRole::<2>(principal): RequireRole<2>,
521 State(state): State<AppState>,
522 Json(body): Json<ConnectorWrite>,
523) -> Result<Response, AdminError> {
524 let kind = ConnectorKind::parse(&body.kind)
525 .map_err(|bad| AdminError::validation(format!("unknown connector kind '{bad}'")))?;
526 validate_connector(kind, &body.config)?;
527
528 let now = chrono::Utc::now();
529 let cfg = ConnectorConfig {
530 id: uuid::Uuid::new_v4().to_string(),
531 org_id: principal.org_id.clone(),
532 name: body.name,
533 kind,
534 config: body.config,
535 enabled: body.enabled,
536 created_at: now,
537 updated_at: now,
538 };
539 state.connector_configs.upsert(cfg.clone());
540 state.record_connector(principal.org_id.clone(), cfg.name.clone());
543 Ok((StatusCode::CREATED, Json(connector_json(&cfg))).into_response())
544}
545
546async fn update_connector(
549 RequireRole::<2>(principal): RequireRole<2>,
550 State(state): State<AppState>,
551 Path(id): Path<String>,
552 Json(body): Json<ConnectorWrite>,
553) -> Result<Json<Value>, AdminError> {
554 let existing = state
555 .connector_configs
556 .get(&principal.org_id, &id)
557 .ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
558
559 let kind = ConnectorKind::parse(&body.kind)
560 .map_err(|bad| AdminError::validation(format!("unknown connector kind '{bad}'")))?;
561 validate_connector(kind, &body.config)?;
562
563 let cfg = ConnectorConfig {
564 id: existing.id,
565 org_id: existing.org_id,
566 name: body.name,
567 kind,
568 config: body.config,
569 enabled: body.enabled,
570 created_at: existing.created_at,
571 updated_at: chrono::Utc::now(),
572 };
573 state.connector_configs.upsert(cfg.clone());
574 state.record_connector(principal.org_id.clone(), cfg.name.clone());
575 Ok(Json(connector_json(&cfg)))
576}
577
578async fn delete_connector(
581 RequireRole::<2>(principal): RequireRole<2>,
582 State(state): State<AppState>,
583 Path(id): Path<String>,
584) -> Result<Response, AdminError> {
585 if state.connector_configs.delete(&principal.org_id, &id) {
586 Ok(StatusCode::NO_CONTENT.into_response())
587 } else {
588 Err(AdminError::not_found(format!("connector '{id}' not found")))
589 }
590}
591
592async fn index_connector(
601 RequireRole::<1>(principal): RequireRole<1>,
602 State(state): State<AppState>,
603 Path(id): Path<String>,
604) -> Result<Json<Value>, AdminError> {
605 let cfg = state
606 .connector_configs
607 .get(&principal.org_id, &id)
608 .ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
609
610 let scoped_key = scoped_connector_key(&principal.org_id, &cfg.name);
618 let connector = build_connector(&cfg, &scoped_key)?;
619
620 let service = IndexingService::new(principal.org_id.clone());
621 let chunker = Chunker::default();
622 let embedder = build_embedder(&EmbedderConfig::from_server_config(&state.config));
628 let knowledge = state.storage.knowledge();
629
630 let run = service
631 .run_once(
632 connector.as_ref(),
633 state.indexing.as_ref(),
634 &chunker,
635 embedder.as_ref(),
636 knowledge,
637 )
638 .await
639 .map_err(|e| AdminError::internal(format!("indexing failed: {e}")))?;
640
641 state.record_connector(principal.org_id.clone(), cfg.name.clone());
643
644 Ok(Json(serde_json::json!({
645 "run": {
646 "id": run.id,
647 "connectorName": cfg.name,
649 "status": format!("{:?}", run.status).to_lowercase(),
650 "startedAt": run.started_at,
651 "finishedAt": run.finished_at,
652 "documentsSeen": run.documents_seen,
653 "chunksIndexed": run.chunks_indexed,
654 "documentsSkipped": run.documents_skipped,
655 "cursor": run.cursor,
656 "error": run.error,
657 }
658 })))
659}
660
661fn build_connector(
672 cfg: &ConnectorConfig,
673 connector_name: &str,
674) -> Result<Box<dyn Connector>, AdminError> {
675 let connector_name = connector_name.to_string();
676 match cfg.kind {
677 ConnectorKind::Web => {
678 let url = cfg
679 .config
680 .get("url")
681 .and_then(Value::as_str)
682 .ok_or_else(|| AdminError::validation("web connector requires a 'url'"))?;
683 Ok(Box::new(NamedConnector::new(
684 connector_name,
685 WebConnector::new(url),
686 )))
687 }
688 ConnectorKind::File => {
689 let path = cfg
690 .config
691 .get("path")
692 .and_then(Value::as_str)
693 .ok_or_else(|| AdminError::validation("file connector requires a 'path'"))?;
694 Ok(Box::new(NamedConnector::new(
695 connector_name,
696 FileConnector::new(path),
697 )))
698 }
699 ConnectorKind::Github => {
700 let owner = cfg
701 .config
702 .get("owner")
703 .and_then(Value::as_str)
704 .ok_or_else(|| AdminError::validation("github connector requires an 'owner'"))?;
705 let repo = cfg
706 .config
707 .get("repo")
708 .and_then(Value::as_str)
709 .ok_or_else(|| AdminError::validation("github connector requires a 'repo'"))?;
710
711 let visibility = match cfg.config.get("visibility").and_then(Value::as_str) {
714 Some("private") => GithubVisibility::Private,
715 _ => GithubVisibility::Public,
716 };
717 let auth = resolve_github_auth(cfg, visibility)?;
718
719 let mut gh = GithubConnectorConfig::new(owner, repo, auth).visibility(visibility);
720 if let Some(r) = cfg.config.get("ref").and_then(Value::as_str) {
721 gh = gh.at_ref(r);
722 }
723 Ok(Box::new(NamedConnector::new(
724 connector_name,
725 GithubConnector::new(gh),
726 )))
727 }
728 }
729}
730
731fn resolve_github_auth(
738 cfg: &ConnectorConfig,
739 visibility: GithubVisibility,
740) -> Result<GithubAuth, AdminError> {
741 match cfg.auth_ref() {
742 Some(name) => match std::env::var(name) {
743 Ok(token) if !token.trim().is_empty() => Ok(GithubAuth::Token(token)),
744 _ => Err(AdminError::validation(format!(
745 "github connector auth_ref '{name}' did not resolve to a token \
746 (set the named secret/env var); refusing to index"
747 ))),
748 },
749 None => match visibility {
750 GithubVisibility::Public => Ok(GithubAuth::Unauthenticated),
751 GithubVisibility::Private => Err(AdminError::validation(
752 "github connector for a private repo requires an 'auth_ref' \
753 naming a token secret",
754 )),
755 },
756 }
757}
758
759struct NamedConnector<C: Connector> {
764 name: String,
765 inner: C,
766}
767
768impl<C: Connector> NamedConnector<C> {
769 fn new(name: String, inner: C) -> Self {
770 Self { name, inner }
771 }
772}
773
774#[async_trait::async_trait]
775impl<C: Connector> Connector for NamedConnector<C> {
776 fn name(&self) -> &str {
777 &self.name
778 }
779
780 async fn pull(
781 &self,
782 since: Option<smooth_operator_ingestion::Timestamp>,
783 ) -> anyhow::Result<Vec<smooth_operator_ingestion::RawDocument>> {
784 self.inner.pull(since).await
785 }
786}
787
788#[derive(Debug, Deserialize)]
794#[serde(rename_all = "camelCase")]
795struct SettingsWrite {
796 model: String,
797 system_prompt: String,
798 #[serde(default)]
801 persona: Option<String>,
802 #[serde(default)]
803 default_tools: Vec<String>,
804}
805
806fn settings_json(s: &AgentSettings) -> Value {
808 serde_json::json!({
809 "settings": {
810 "orgId": s.org_id,
811 "model": s.model,
812 "systemPrompt": s.system_prompt,
813 "persona": s.persona,
814 "defaultTools": s.default_tools,
815 "updatedAt": s.updated_at,
816 }
817 })
818}
819
820async fn get_settings(
822 RequireRole::<1>(principal): RequireRole<1>,
823 State(state): State<AppState>,
824) -> Json<Value> {
825 let settings = state.settings.get(&principal.org_id);
826 Json(settings_json(&settings))
827}
828
829async fn put_settings(
831 RequireRole::<2>(principal): RequireRole<2>,
832 State(state): State<AppState>,
833 Json(body): Json<SettingsWrite>,
834) -> Json<Value> {
835 let settings = AgentSettings {
836 org_id: principal.org_id.clone(),
837 model: body.model,
838 system_prompt: body.system_prompt,
839 persona: body.persona,
840 default_tools: body.default_tools,
841 updated_at: chrono::Utc::now(),
842 };
843 state.settings.put(settings.clone());
844 Json(settings_json(&settings))
845}
846
847async fn conversation_owned_by(state: &AppState, conversation_id: &str, user_id: &str) -> bool {
854 match state
855 .storage
856 .list_participants_by_conversation(conversation_id)
857 .await
858 {
859 Ok(parts) => parts.iter().any(|p| {
860 p.participant_type == ParticipantType::User && p.external_id.as_deref() == Some(user_id)
861 }),
862 Err(_) => false,
863 }
864}
865
866#[derive(Deserialize)]
873#[serde(tag = "type", content = "id", rename_all = "snake_case")]
874enum PublishTarget {
875 Connection(String),
876 Session(String),
877 User(String),
878 Org(String),
879 Agent(String),
880}
881
882impl From<PublishTarget> for Target {
883 fn from(t: PublishTarget) -> Self {
884 match t {
885 PublishTarget::Connection(id) => Target::Connection(id),
886 PublishTarget::Session(id) => Target::Session(id),
887 PublishTarget::User(id) => Target::User(id),
888 PublishTarget::Org(id) => Target::Org(id),
889 PublishTarget::Agent(id) => Target::Agent(id),
890 }
891 }
892}
893
894#[derive(Deserialize)]
897struct PublishRequest {
898 target: PublishTarget,
899 event: Value,
900}
901
902#[derive(Serialize)]
904struct PublishResponse {
905 delivered: usize,
910}
911
912async fn publish_event(
924 RequireRole::<2>(_principal): RequireRole<2>,
925 State(state): State<AppState>,
926 Json(body): Json<PublishRequest>,
927) -> Json<PublishResponse> {
928 let delivered = state
929 .backplane
930 .publish(body.target.into(), body.event)
931 .await;
932 Json(PublishResponse { delivered })
933}