use axum::extract::{Path, Query, State};
use axum::http::request::Parts;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::{Json, Router};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use smooth_operator::auth::{AuthError, Principal, Role};
use smooth_operator::backplane::Target;
use smooth_operator::connector_config::{ConnectorConfig, ConnectorKind};
use smooth_operator::domain::ParticipantType;
use smooth_operator::settings::AgentSettings;
use smooth_operator_ingestion::{
Chunker, Connector, FileConnector, GithubAuth, GithubConnector, GithubConnectorConfig,
GithubVisibility, IndexingService, WebConnector,
};
use crate::embedder::{build_embedder, EmbedderConfig};
use crate::protocol;
use crate::state::{scoped_connector_key, AppState};
pub fn router() -> Router<AppState> {
Router::new()
.route("/admin/health", get(health))
.route("/admin/me", get(me))
.route("/admin/conversations", get(list_conversations))
.route(
"/admin/conversations/{id}/messages",
get(conversation_messages),
)
.route("/admin/indexing/runs", get(indexing_runs))
.route("/admin/document-sets", get(document_sets))
.route(
"/admin/connectors",
get(list_connectors).post(create_connector),
)
.route(
"/admin/connectors/{id}",
get(get_connector)
.put(update_connector)
.delete(delete_connector),
)
.route("/admin/connectors/{id}/index", post(index_connector))
.route("/admin/settings", get(get_settings).put(put_settings))
.route("/admin/publish", post(publish_event))
}
pub struct RequireRole<const MIN: u8>(pub Principal);
const fn role_rank(role: Role) -> u8 {
match role {
Role::Basic => 0,
Role::Curator => 1,
Role::Admin => 2,
}
}
const fn rank_role(min: u8) -> Role {
match min {
0 => Role::Basic,
1 => Role::Curator,
_ => Role::Admin,
}
}
impl<const MIN: u8> axum::extract::FromRequestParts<AppState> for RequireRole<MIN> {
type Rejection = AuthRejection;
async fn from_request_parts(
parts: &mut Parts,
state: &AppState,
) -> Result<Self, Self::Rejection> {
let token = bearer_token(parts).ok_or(AuthRejection(AuthError::Unauthenticated))?;
let principal = state.auth.verify(&token).map_err(AuthRejection)?;
if role_rank(principal.role) < MIN {
return Err(AuthRejection(AuthError::Forbidden {
required: rank_role(MIN),
actual: principal.role,
}));
}
Ok(RequireRole(principal))
}
}
fn bearer_token(parts: &Parts) -> Option<String> {
let header = parts.headers.get(axum::http::header::AUTHORIZATION)?;
let value = header.to_str().ok()?;
let rest = value
.strip_prefix("Bearer ")
.or_else(|| value.strip_prefix("bearer "))?;
let trimmed = rest.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
pub struct AuthRejection(AuthError);
impl IntoResponse for AuthRejection {
fn into_response(self) -> Response {
let (status, code) = match &self.0 {
AuthError::Unauthenticated => (StatusCode::UNAUTHORIZED, "UNAUTHENTICATED"),
AuthError::InvalidToken(_) => (StatusCode::UNAUTHORIZED, "INVALID_TOKEN"),
AuthError::MissingRole(_) => (StatusCode::UNAUTHORIZED, "MISSING_ROLE"),
AuthError::Forbidden { .. } => (StatusCode::FORBIDDEN, "FORBIDDEN"),
AuthError::Misconfigured(_) => {
(StatusCode::INTERNAL_SERVER_ERROR, "AUTH_MISCONFIGURED")
}
};
let body = protocol::error(None, code, &self.0.to_string());
(status, Json(body)).into_response()
}
}
struct AdminError(StatusCode, String, &'static str);
impl IntoResponse for AdminError {
fn into_response(self) -> Response {
let body = protocol::error(None, self.2, &self.1);
(self.0, Json(body)).into_response()
}
}
impl AdminError {
fn internal(msg: impl Into<String>) -> Self {
Self(
StatusCode::INTERNAL_SERVER_ERROR,
msg.into(),
"INTERNAL_ERROR",
)
}
fn forbidden(msg: impl Into<String>) -> Self {
Self(StatusCode::FORBIDDEN, msg.into(), "FORBIDDEN")
}
fn not_found(msg: impl Into<String>) -> Self {
Self(StatusCode::NOT_FOUND, msg.into(), "NOT_FOUND")
}
fn validation(msg: impl Into<String>) -> Self {
Self(StatusCode::BAD_REQUEST, msg.into(), "VALIDATION_ERROR")
}
}
async fn health() -> Json<Value> {
Json(serde_json::json!({ "status": "ok" }))
}
async fn me(RequireRole::<0>(principal): RequireRole<0>) -> Json<Principal> {
Json(principal)
}
#[derive(Debug, Deserialize)]
struct ConversationsQuery {
limit: Option<usize>,
cursor: Option<usize>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ConversationRow {
id: String,
name: String,
platform: String,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ConversationsResponse {
conversations: Vec<ConversationRow>,
next_cursor: Option<usize>,
}
async fn list_conversations(
RequireRole::<0>(principal): RequireRole<0>,
State(state): State<AppState>,
Query(q): Query<ConversationsQuery>,
) -> Result<Json<ConversationsResponse>, AdminError> {
let limit = q.limit.unwrap_or(50).clamp(1, 200);
let offset = q.cursor.unwrap_or(0);
let all = state
.storage
.list_conversations_by_org(&principal.org_id)
.await
.map_err(|e| AdminError::internal(format!("list conversations failed: {e}")))?;
let visible: Vec<_> = if principal.role >= Role::Curator {
all
} else {
let mut owned = Vec::new();
for conv in all {
if conversation_owned_by(&state, &conv.id, &principal.user_id).await {
owned.push(conv);
}
}
owned
};
let total = visible.len();
let page: Vec<ConversationRow> = visible
.into_iter()
.skip(offset)
.take(limit)
.map(|c| ConversationRow {
id: c.id,
name: c.name,
platform: format!("{:?}", c.platform).to_lowercase(),
created_at: c.created_at,
updated_at: c.updated_at,
})
.collect();
let next = offset + page.len();
let next_cursor = if next < total { Some(next) } else { None };
Ok(Json(ConversationsResponse {
conversations: page,
next_cursor,
}))
}
async fn conversation_messages(
RequireRole::<0>(principal): RequireRole<0>,
State(state): State<AppState>,
Path(conversation_id): Path<String>,
) -> Result<Json<Value>, AdminError> {
let conv = state
.storage
.get_conversation(&conversation_id)
.await
.map_err(|e| AdminError::internal(format!("get conversation failed: {e}")))?
.ok_or_else(|| {
AdminError::not_found(format!("conversation '{conversation_id}' not found"))
})?;
if conv.organization_id != principal.org_id {
return Err(AdminError::not_found(format!(
"conversation '{conversation_id}' not found"
)));
}
if principal.role < Role::Curator
&& !conversation_owned_by(&state, &conversation_id, &principal.user_id).await
{
return Err(AdminError::forbidden(
"you do not have access to this conversation",
));
}
let query = smooth_operator::adapter::MessageQuery::new(&conversation_id, 200);
let page = state
.storage
.list_messages_by_conversation(query)
.await
.map_err(|e| AdminError::internal(format!("list messages failed: {e}")))?;
Ok(Json(serde_json::json!({
"conversationId": conversation_id,
"messages": page.messages,
"nextCursor": page.next_cursor,
})))
}
async fn indexing_runs(
RequireRole::<1>(principal): RequireRole<1>,
State(state): State<AppState>,
) -> Json<Value> {
let mut runs = Vec::new();
for connector in state.connectors(&principal.org_id) {
let key = scoped_connector_key(&principal.org_id, &connector);
for run in state.indexing.list_runs(&key) {
runs.push(serde_json::json!({
"id": run.id,
"connectorName": connector,
"status": format!("{:?}", run.status).to_lowercase(),
"startedAt": run.started_at,
"finishedAt": run.finished_at,
"documentsSeen": run.documents_seen,
"chunksIndexed": run.chunks_indexed,
"documentsSkipped": run.documents_skipped,
"cursor": run.cursor,
"error": run.error,
}));
}
}
Json(serde_json::json!({ "runs": runs }))
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct DocumentSetRow {
name: String,
document_count: usize,
}
async fn document_sets(
RequireRole::<1>(principal): RequireRole<1>,
State(state): State<AppState>,
) -> Json<Value> {
let sets: Vec<DocumentSetRow> = state
.document_sets(&principal.org_id)
.into_iter()
.map(|(name, document_count)| DocumentSetRow {
name,
document_count,
})
.collect();
Json(serde_json::json!({ "documentSets": sets }))
}
#[derive(Debug, Deserialize)]
struct ConnectorWrite {
name: String,
kind: String,
#[serde(default)]
config: Value,
#[serde(default = "default_enabled")]
enabled: bool,
}
const fn default_enabled() -> bool {
true
}
fn connector_json(cfg: &ConnectorConfig) -> Value {
serde_json::json!({
"connector": {
"id": cfg.id,
"name": cfg.name,
"kind": cfg.kind.as_str(),
"config": cfg.config,
"enabled": cfg.enabled,
"createdAt": cfg.created_at,
"updatedAt": cfg.updated_at,
}
})
}
fn validate_connector(kind: ConnectorKind, config: &Value) -> Result<(), AdminError> {
let missing = |field: &str| {
AdminError::validation(format!(
"{} connector config requires a '{field}' field",
kind.as_str()
))
};
match kind {
ConnectorKind::Github => {
if config.get("owner").and_then(Value::as_str).is_none() {
return Err(missing("owner"));
}
if config.get("repo").and_then(Value::as_str).is_none() {
return Err(missing("repo"));
}
}
ConnectorKind::Web => {
if config.get("url").and_then(Value::as_str).is_none() {
return Err(missing("url"));
}
}
ConnectorKind::File => {
if config.get("path").and_then(Value::as_str).is_none() {
return Err(missing("path"));
}
}
}
Ok(())
}
async fn list_connectors(
RequireRole::<1>(principal): RequireRole<1>,
State(state): State<AppState>,
) -> Json<Value> {
let connectors: Vec<Value> = state
.connector_configs
.list(&principal.org_id)
.iter()
.map(|c| connector_json(c)["connector"].clone())
.collect();
Json(serde_json::json!({ "connectors": connectors }))
}
async fn get_connector(
RequireRole::<1>(principal): RequireRole<1>,
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<Json<Value>, AdminError> {
let cfg = state
.connector_configs
.get(&principal.org_id, &id)
.ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
Ok(Json(connector_json(&cfg)))
}
async fn create_connector(
RequireRole::<2>(principal): RequireRole<2>,
State(state): State<AppState>,
Json(body): Json<ConnectorWrite>,
) -> Result<Response, AdminError> {
let kind = ConnectorKind::parse(&body.kind)
.map_err(|bad| AdminError::validation(format!("unknown connector kind '{bad}'")))?;
validate_connector(kind, &body.config)?;
let now = chrono::Utc::now();
let cfg = ConnectorConfig {
id: uuid::Uuid::new_v4().to_string(),
org_id: principal.org_id.clone(),
name: body.name,
kind,
config: body.config,
enabled: body.enabled,
created_at: now,
updated_at: now,
};
state.connector_configs.upsert(cfg.clone());
state.record_connector(principal.org_id.clone(), cfg.name.clone());
Ok((StatusCode::CREATED, Json(connector_json(&cfg))).into_response())
}
async fn update_connector(
RequireRole::<2>(principal): RequireRole<2>,
State(state): State<AppState>,
Path(id): Path<String>,
Json(body): Json<ConnectorWrite>,
) -> Result<Json<Value>, AdminError> {
let existing = state
.connector_configs
.get(&principal.org_id, &id)
.ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
let kind = ConnectorKind::parse(&body.kind)
.map_err(|bad| AdminError::validation(format!("unknown connector kind '{bad}'")))?;
validate_connector(kind, &body.config)?;
let cfg = ConnectorConfig {
id: existing.id,
org_id: existing.org_id,
name: body.name,
kind,
config: body.config,
enabled: body.enabled,
created_at: existing.created_at,
updated_at: chrono::Utc::now(),
};
state.connector_configs.upsert(cfg.clone());
state.record_connector(principal.org_id.clone(), cfg.name.clone());
Ok(Json(connector_json(&cfg)))
}
async fn delete_connector(
RequireRole::<2>(principal): RequireRole<2>,
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<Response, AdminError> {
if state.connector_configs.delete(&principal.org_id, &id) {
Ok(StatusCode::NO_CONTENT.into_response())
} else {
Err(AdminError::not_found(format!("connector '{id}' not found")))
}
}
async fn index_connector(
RequireRole::<1>(principal): RequireRole<1>,
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<Json<Value>, AdminError> {
let cfg = state
.connector_configs
.get(&principal.org_id, &id)
.ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
let scoped_key = scoped_connector_key(&principal.org_id, &cfg.name);
let connector = build_connector(&cfg, &scoped_key)?;
let service = IndexingService::new(principal.org_id.clone());
let chunker = Chunker::default();
let embedder = build_embedder(&EmbedderConfig::from_server_config(&state.config));
let knowledge = state.storage.knowledge();
let run = service
.run_once(
connector.as_ref(),
state.indexing.as_ref(),
&chunker,
embedder.as_ref(),
knowledge,
)
.await
.map_err(|e| AdminError::internal(format!("indexing failed: {e}")))?;
state.record_connector(principal.org_id.clone(), cfg.name.clone());
Ok(Json(serde_json::json!({
"run": {
"id": run.id,
"connectorName": cfg.name,
"status": format!("{:?}", run.status).to_lowercase(),
"startedAt": run.started_at,
"finishedAt": run.finished_at,
"documentsSeen": run.documents_seen,
"chunksIndexed": run.chunks_indexed,
"documentsSkipped": run.documents_skipped,
"cursor": run.cursor,
"error": run.error,
}
})))
}
fn build_connector(
cfg: &ConnectorConfig,
connector_name: &str,
) -> Result<Box<dyn Connector>, AdminError> {
let connector_name = connector_name.to_string();
match cfg.kind {
ConnectorKind::Web => {
let url = cfg
.config
.get("url")
.and_then(Value::as_str)
.ok_or_else(|| AdminError::validation("web connector requires a 'url'"))?;
Ok(Box::new(NamedConnector::new(
connector_name,
WebConnector::new(url),
)))
}
ConnectorKind::File => {
let path = cfg
.config
.get("path")
.and_then(Value::as_str)
.ok_or_else(|| AdminError::validation("file connector requires a 'path'"))?;
Ok(Box::new(NamedConnector::new(
connector_name,
FileConnector::new(path),
)))
}
ConnectorKind::Github => {
let owner = cfg
.config
.get("owner")
.and_then(Value::as_str)
.ok_or_else(|| AdminError::validation("github connector requires an 'owner'"))?;
let repo = cfg
.config
.get("repo")
.and_then(Value::as_str)
.ok_or_else(|| AdminError::validation("github connector requires a 'repo'"))?;
let visibility = match cfg.config.get("visibility").and_then(Value::as_str) {
Some("private") => GithubVisibility::Private,
_ => GithubVisibility::Public,
};
let auth = resolve_github_auth(cfg, visibility)?;
let mut gh = GithubConnectorConfig::new(owner, repo, auth).visibility(visibility);
if let Some(r) = cfg.config.get("ref").and_then(Value::as_str) {
gh = gh.at_ref(r);
}
Ok(Box::new(NamedConnector::new(
connector_name,
GithubConnector::new(gh),
)))
}
}
}
fn resolve_github_auth(
cfg: &ConnectorConfig,
visibility: GithubVisibility,
) -> Result<GithubAuth, AdminError> {
match cfg.auth_ref() {
Some(name) => match std::env::var(name) {
Ok(token) if !token.trim().is_empty() => Ok(GithubAuth::Token(token)),
_ => Err(AdminError::validation(format!(
"github connector auth_ref '{name}' did not resolve to a token \
(set the named secret/env var); refusing to index"
))),
},
None => match visibility {
GithubVisibility::Public => Ok(GithubAuth::Unauthenticated),
GithubVisibility::Private => Err(AdminError::validation(
"github connector for a private repo requires an 'auth_ref' \
naming a token secret",
)),
},
}
}
struct NamedConnector<C: Connector> {
name: String,
inner: C,
}
impl<C: Connector> NamedConnector<C> {
fn new(name: String, inner: C) -> Self {
Self { name, inner }
}
}
#[async_trait::async_trait]
impl<C: Connector> Connector for NamedConnector<C> {
fn name(&self) -> &str {
&self.name
}
async fn pull(
&self,
since: Option<smooth_operator_ingestion::Timestamp>,
) -> anyhow::Result<Vec<smooth_operator_ingestion::RawDocument>> {
self.inner.pull(since).await
}
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct SettingsWrite {
model: String,
system_prompt: String,
#[serde(default)]
persona: Option<String>,
#[serde(default)]
default_tools: Vec<String>,
}
fn settings_json(s: &AgentSettings) -> Value {
serde_json::json!({
"settings": {
"orgId": s.org_id,
"model": s.model,
"systemPrompt": s.system_prompt,
"persona": s.persona,
"defaultTools": s.default_tools,
"updatedAt": s.updated_at,
}
})
}
async fn get_settings(
RequireRole::<1>(principal): RequireRole<1>,
State(state): State<AppState>,
) -> Json<Value> {
let settings = state.settings.get(&principal.org_id);
Json(settings_json(&settings))
}
async fn put_settings(
RequireRole::<2>(principal): RequireRole<2>,
State(state): State<AppState>,
Json(body): Json<SettingsWrite>,
) -> Json<Value> {
let settings = AgentSettings {
org_id: principal.org_id.clone(),
model: body.model,
system_prompt: body.system_prompt,
persona: body.persona,
default_tools: body.default_tools,
updated_at: chrono::Utc::now(),
};
state.settings.put(settings.clone());
Json(settings_json(&settings))
}
async fn conversation_owned_by(state: &AppState, conversation_id: &str, user_id: &str) -> bool {
match state
.storage
.list_participants_by_conversation(conversation_id)
.await
{
Ok(parts) => parts.iter().any(|p| {
p.participant_type == ParticipantType::User && p.external_id.as_deref() == Some(user_id)
}),
Err(_) => false,
}
}
#[derive(Deserialize)]
#[serde(tag = "type", content = "id", rename_all = "snake_case")]
enum PublishTarget {
Connection(String),
Session(String),
User(String),
Org(String),
Agent(String),
}
impl From<PublishTarget> for Target {
fn from(t: PublishTarget) -> Self {
match t {
PublishTarget::Connection(id) => Target::Connection(id),
PublishTarget::Session(id) => Target::Session(id),
PublishTarget::User(id) => Target::User(id),
PublishTarget::Org(id) => Target::Org(id),
PublishTarget::Agent(id) => Target::Agent(id),
}
}
}
#[derive(Deserialize)]
struct PublishRequest {
target: PublishTarget,
event: Value,
}
#[derive(Serialize)]
struct PublishResponse {
delivered: usize,
}
async fn publish_event(
RequireRole::<2>(_principal): RequireRole<2>,
State(state): State<AppState>,
Json(body): Json<PublishRequest>,
) -> Json<PublishResponse> {
let delivered = state
.backplane
.publish(body.target.into(), body.event)
.await;
Json(PublishResponse { delivered })
}