use axum::extract::{Path, Query, State};
use axum::http::request::Parts;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::{Json, Router};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use smooth_operator::auth::{AuthError, Principal, Role};
use smooth_operator::backplane::Target;
use smooth_operator::connector_config::{ConnectorConfig, ConnectorKind};
use smooth_operator::domain::ParticipantType;
use smooth_operator::settings::AgentSettings;
use smooth_operator_ingestion::{
Chunker, Connector, FileConnector, GithubAuth, GithubConnector, GithubConnectorConfig,
GithubVisibility, IndexingService, WebConnector,
};
use crate::embedder::{build_embedder, EmbedderConfig};
use crate::protocol;
use crate::state::{scoped_connector_key, AppState};
pub fn router() -> Router<AppState> {
Router::new()
.route("/admin/health", get(health))
.route("/admin/me", get(me))
.route("/admin/conversations", get(list_conversations))
.route(
"/admin/conversations/{id}/messages",
get(conversation_messages),
)
.route("/admin/indexing/runs", get(indexing_runs))
.route("/admin/document-sets", get(document_sets))
.route("/admin/model-costs", get(model_costs))
.route(
"/admin/connectors",
get(list_connectors).post(create_connector),
)
.route(
"/admin/connectors/{id}",
get(get_connector)
.put(update_connector)
.delete(delete_connector),
)
.route("/admin/connectors/{id}/index", post(index_connector))
.route("/admin/settings", get(get_settings).put(put_settings))
.route("/admin/publish", post(publish_event))
.layer(admin_cors())
}
pub(crate) fn admin_cors() -> tower_http::cors::CorsLayer {
use axum::http::{header, Method};
tower_http::cors::CorsLayer::new()
.allow_origin(tower_http::cors::Any)
.allow_methods([
Method::GET,
Method::POST,
Method::PUT,
Method::DELETE,
Method::OPTIONS,
])
.allow_headers([header::AUTHORIZATION, header::CONTENT_TYPE])
}
pub struct RequireRole<const MIN: u8>(pub Principal);
const fn role_rank(role: Role) -> u8 {
match role {
Role::Basic => 0,
Role::Curator => 1,
Role::Admin => 2,
}
}
const fn rank_role(min: u8) -> Role {
match min {
0 => Role::Basic,
1 => Role::Curator,
_ => Role::Admin,
}
}
impl<const MIN: u8> axum::extract::FromRequestParts<AppState> for RequireRole<MIN> {
type Rejection = AuthRejection;
async fn from_request_parts(
parts: &mut Parts,
state: &AppState,
) -> Result<Self, Self::Rejection> {
let token = bearer_token(parts).ok_or(AuthRejection(AuthError::Unauthenticated))?;
let principal = state.auth.verify(&token).map_err(AuthRejection)?;
if role_rank(principal.role) < MIN {
return Err(AuthRejection(AuthError::Forbidden {
required: rank_role(MIN),
actual: principal.role,
}));
}
Ok(RequireRole(principal))
}
}
fn bearer_token(parts: &Parts) -> Option<String> {
let header = parts.headers.get(axum::http::header::AUTHORIZATION)?;
let value = header.to_str().ok()?;
let rest = value
.strip_prefix("Bearer ")
.or_else(|| value.strip_prefix("bearer "))?;
let trimmed = rest.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
pub struct AuthRejection(AuthError);
impl IntoResponse for AuthRejection {
fn into_response(self) -> Response {
let (status, code) = match &self.0 {
AuthError::Unauthenticated => (StatusCode::UNAUTHORIZED, "UNAUTHENTICATED"),
AuthError::InvalidToken(_) => (StatusCode::UNAUTHORIZED, "INVALID_TOKEN"),
AuthError::MissingRole(_) => (StatusCode::UNAUTHORIZED, "MISSING_ROLE"),
AuthError::Forbidden { .. } => (StatusCode::FORBIDDEN, "FORBIDDEN"),
AuthError::Misconfigured(_) => {
(StatusCode::INTERNAL_SERVER_ERROR, "AUTH_MISCONFIGURED")
}
};
let body = protocol::error(None, code, &self.0.to_string());
(status, Json(body)).into_response()
}
}
struct AdminError(StatusCode, String, &'static str);
impl IntoResponse for AdminError {
fn into_response(self) -> Response {
let body = protocol::error(None, self.2, &self.1);
(self.0, Json(body)).into_response()
}
}
impl AdminError {
fn internal(msg: impl Into<String>) -> Self {
Self(
StatusCode::INTERNAL_SERVER_ERROR,
msg.into(),
"INTERNAL_ERROR",
)
}
fn forbidden(msg: impl Into<String>) -> Self {
Self(StatusCode::FORBIDDEN, msg.into(), "FORBIDDEN")
}
fn not_found(msg: impl Into<String>) -> Self {
Self(StatusCode::NOT_FOUND, msg.into(), "NOT_FOUND")
}
fn validation(msg: impl Into<String>) -> Self {
Self(StatusCode::BAD_REQUEST, msg.into(), "VALIDATION_ERROR")
}
}
async fn health() -> Json<Value> {
Json(serde_json::json!({ "status": "ok" }))
}
async fn me(RequireRole::<0>(principal): RequireRole<0>) -> Json<Principal> {
Json(principal)
}
#[derive(Debug, Deserialize)]
struct ConversationsQuery {
limit: Option<usize>,
cursor: Option<usize>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ConversationRow {
id: String,
name: String,
platform: String,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ConversationsResponse {
conversations: Vec<ConversationRow>,
next_cursor: Option<usize>,
}
async fn list_conversations(
RequireRole::<0>(principal): RequireRole<0>,
State(state): State<AppState>,
Query(q): Query<ConversationsQuery>,
) -> Result<Json<ConversationsResponse>, AdminError> {
let limit = q.limit.unwrap_or(50).clamp(1, 200);
let offset = q.cursor.unwrap_or(0);
let all = state
.storage
.list_conversations_by_org(&principal.org_id)
.await
.map_err(|e| AdminError::internal(format!("list conversations failed: {e}")))?;
let visible: Vec<_> = if principal.role >= Role::Curator {
all
} else {
let mut owned = Vec::new();
for conv in all {
if conversation_owned_by(&state, &conv.id, &principal.user_id).await {
owned.push(conv);
}
}
owned
};
let total = visible.len();
let page: Vec<ConversationRow> = visible
.into_iter()
.skip(offset)
.take(limit)
.map(|c| ConversationRow {
id: c.id,
name: c.name,
platform: format!("{:?}", c.platform).to_lowercase(),
created_at: c.created_at,
updated_at: c.updated_at,
})
.collect();
let next = offset + page.len();
let next_cursor = if next < total { Some(next) } else { None };
Ok(Json(ConversationsResponse {
conversations: page,
next_cursor,
}))
}
async fn conversation_messages(
RequireRole::<0>(principal): RequireRole<0>,
State(state): State<AppState>,
Path(conversation_id): Path<String>,
) -> Result<Json<Value>, AdminError> {
let conv = state
.storage
.get_conversation(&conversation_id)
.await
.map_err(|e| AdminError::internal(format!("get conversation failed: {e}")))?
.ok_or_else(|| {
AdminError::not_found(format!("conversation '{conversation_id}' not found"))
})?;
if conv.organization_id != principal.org_id {
return Err(AdminError::not_found(format!(
"conversation '{conversation_id}' not found"
)));
}
if principal.role < Role::Curator
&& !conversation_owned_by(&state, &conversation_id, &principal.user_id).await
{
return Err(AdminError::forbidden(
"you do not have access to this conversation",
));
}
let query = smooth_operator::adapter::MessageQuery::new(&conversation_id, 200);
let page = state
.storage
.list_messages_by_conversation(query)
.await
.map_err(|e| AdminError::internal(format!("list messages failed: {e}")))?;
Ok(Json(serde_json::json!({
"conversationId": conversation_id,
"messages": page.messages,
"nextCursor": page.next_cursor,
})))
}
async fn indexing_runs(
RequireRole::<1>(principal): RequireRole<1>,
State(state): State<AppState>,
) -> Json<Value> {
let mut runs = Vec::new();
for connector in state.connectors(&principal.org_id) {
let key = scoped_connector_key(&principal.org_id, &connector);
for run in state.indexing.list_runs(&key) {
runs.push(serde_json::json!({
"id": run.id,
"connectorName": connector,
"status": format!("{:?}", run.status).to_lowercase(),
"startedAt": run.started_at,
"finishedAt": run.finished_at,
"documentsSeen": run.documents_seen,
"chunksIndexed": run.chunks_indexed,
"documentsSkipped": run.documents_skipped,
"cursor": run.cursor,
"error": run.error,
}));
}
}
Json(serde_json::json!({ "runs": runs }))
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct DocumentSetRow {
name: String,
document_count: usize,
}
async fn document_sets(
RequireRole::<1>(principal): RequireRole<1>,
State(state): State<AppState>,
) -> Json<Value> {
let sets: Vec<DocumentSetRow> = state
.document_sets(&principal.org_id)
.into_iter()
.map(|(name, document_count)| DocumentSetRow {
name,
document_count,
})
.collect();
Json(serde_json::json!({ "documentSets": sets }))
}
async fn model_costs(State(state): State<AppState>) -> Json<Value> {
if let Some(cached) = state.model_costs_cache.get() {
return Json(cached.clone());
}
match fetch_model_costs(&state.config).await {
Ok(map) => {
let _ = state.model_costs_cache.set(map.clone());
Json(map)
}
Err(_) => Json(serde_json::json!({})),
}
}
async fn fetch_model_costs(config: &crate::config::ServerConfig) -> anyhow::Result<Value> {
let url = format!("{}/model/info", config.gateway_url.trim_end_matches('/'));
let client = reqwest::Client::new();
let mut req = client.get(&url);
if let Some(key) = config.gateway_key.as_deref() {
req = req.bearer_auth(key);
}
let payload: Value = req.send().await?.error_for_status()?.json().await?;
Ok(map_model_info(&payload))
}
fn map_model_info(payload: &Value) -> Value {
let mut out = serde_json::Map::new();
let Some(entries) = payload.get("data").and_then(Value::as_array) else {
return Value::Object(out);
};
for entry in entries {
let Some(name) = entry.get("model_name").and_then(Value::as_str) else {
continue;
};
let info = entry.get("model_info");
let input = info
.and_then(|i| i.get("input_cost_per_token"))
.and_then(Value::as_f64);
let output = info
.and_then(|i| i.get("output_cost_per_token"))
.and_then(Value::as_f64);
let tier = info
.and_then(|i| i.get("model_tier"))
.and_then(Value::as_str);
let use_cases = info
.and_then(|i| i.get("use_cases"))
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
out.insert(
name.to_string(),
serde_json::json!({
"inputCostPerToken": input,
"outputCostPerToken": output,
"tier": tier,
"useCases": use_cases,
}),
);
}
Value::Object(out)
}
#[derive(Debug, Deserialize)]
struct ConnectorWrite {
name: String,
kind: String,
#[serde(default)]
config: Value,
#[serde(default = "default_enabled")]
enabled: bool,
}
const fn default_enabled() -> bool {
true
}
fn connector_json(cfg: &ConnectorConfig) -> Value {
serde_json::json!({
"connector": {
"id": cfg.id,
"name": cfg.name,
"kind": cfg.kind.as_str(),
"config": cfg.config,
"enabled": cfg.enabled,
"createdAt": cfg.created_at,
"updatedAt": cfg.updated_at,
}
})
}
fn validate_connector(kind: ConnectorKind, config: &Value) -> Result<(), AdminError> {
let missing = |field: &str| {
AdminError::validation(format!(
"{} connector config requires a '{field}' field",
kind.as_str()
))
};
match kind {
ConnectorKind::Github => {
if config.get("owner").and_then(Value::as_str).is_none() {
return Err(missing("owner"));
}
if config.get("repo").and_then(Value::as_str).is_none() {
return Err(missing("repo"));
}
}
ConnectorKind::Web => {
if config.get("url").and_then(Value::as_str).is_none() {
return Err(missing("url"));
}
}
ConnectorKind::File => {
if config.get("path").and_then(Value::as_str).is_none() {
return Err(missing("path"));
}
}
}
Ok(())
}
async fn list_connectors(
RequireRole::<1>(principal): RequireRole<1>,
State(state): State<AppState>,
) -> Json<Value> {
let connectors: Vec<Value> = state
.connector_configs
.list(&principal.org_id)
.iter()
.map(|c| connector_json(c)["connector"].clone())
.collect();
Json(serde_json::json!({ "connectors": connectors }))
}
async fn get_connector(
RequireRole::<1>(principal): RequireRole<1>,
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<Json<Value>, AdminError> {
let cfg = state
.connector_configs
.get(&principal.org_id, &id)
.ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
Ok(Json(connector_json(&cfg)))
}
async fn create_connector(
RequireRole::<2>(principal): RequireRole<2>,
State(state): State<AppState>,
Json(body): Json<ConnectorWrite>,
) -> Result<Response, AdminError> {
let kind = ConnectorKind::parse(&body.kind)
.map_err(|bad| AdminError::validation(format!("unknown connector kind '{bad}'")))?;
validate_connector(kind, &body.config)?;
let now = chrono::Utc::now();
let cfg = ConnectorConfig {
id: uuid::Uuid::new_v4().to_string(),
org_id: principal.org_id.clone(),
name: body.name,
kind,
config: body.config,
enabled: body.enabled,
created_at: now,
updated_at: now,
};
state.connector_configs.upsert(cfg.clone());
state.record_connector(principal.org_id.clone(), cfg.name.clone());
Ok((StatusCode::CREATED, Json(connector_json(&cfg))).into_response())
}
async fn update_connector(
RequireRole::<2>(principal): RequireRole<2>,
State(state): State<AppState>,
Path(id): Path<String>,
Json(body): Json<ConnectorWrite>,
) -> Result<Json<Value>, AdminError> {
let existing = state
.connector_configs
.get(&principal.org_id, &id)
.ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
let kind = ConnectorKind::parse(&body.kind)
.map_err(|bad| AdminError::validation(format!("unknown connector kind '{bad}'")))?;
validate_connector(kind, &body.config)?;
let cfg = ConnectorConfig {
id: existing.id,
org_id: existing.org_id,
name: body.name,
kind,
config: body.config,
enabled: body.enabled,
created_at: existing.created_at,
updated_at: chrono::Utc::now(),
};
state.connector_configs.upsert(cfg.clone());
state.record_connector(principal.org_id.clone(), cfg.name.clone());
Ok(Json(connector_json(&cfg)))
}
async fn delete_connector(
RequireRole::<2>(principal): RequireRole<2>,
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<Response, AdminError> {
if state.connector_configs.delete(&principal.org_id, &id) {
Ok(StatusCode::NO_CONTENT.into_response())
} else {
Err(AdminError::not_found(format!("connector '{id}' not found")))
}
}
async fn index_connector(
RequireRole::<1>(principal): RequireRole<1>,
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<Json<Value>, AdminError> {
let cfg = state
.connector_configs
.get(&principal.org_id, &id)
.ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
let scoped_key = scoped_connector_key(&principal.org_id, &cfg.name);
let connector = build_connector(&cfg, &scoped_key)?;
let service = IndexingService::new(principal.org_id.clone());
let chunker = Chunker::default();
let embedder = build_embedder(&EmbedderConfig::from_server_config(&state.config));
let knowledge = state.storage.knowledge();
let run = service
.run_once(
connector.as_ref(),
state.indexing.as_ref(),
&chunker,
embedder.as_ref(),
knowledge,
)
.await
.map_err(|e| AdminError::internal(format!("indexing failed: {e}")))?;
state.record_connector(principal.org_id.clone(), cfg.name.clone());
Ok(Json(serde_json::json!({
"run": {
"id": run.id,
"connectorName": cfg.name,
"status": format!("{:?}", run.status).to_lowercase(),
"startedAt": run.started_at,
"finishedAt": run.finished_at,
"documentsSeen": run.documents_seen,
"chunksIndexed": run.chunks_indexed,
"documentsSkipped": run.documents_skipped,
"cursor": run.cursor,
"error": run.error,
}
})))
}
fn build_connector(
cfg: &ConnectorConfig,
connector_name: &str,
) -> Result<Box<dyn Connector>, AdminError> {
let connector_name = connector_name.to_string();
match cfg.kind {
ConnectorKind::Web => {
let url = cfg
.config
.get("url")
.and_then(Value::as_str)
.ok_or_else(|| AdminError::validation("web connector requires a 'url'"))?;
Ok(Box::new(NamedConnector::new(
connector_name,
WebConnector::new(url),
)))
}
ConnectorKind::File => {
let path = cfg
.config
.get("path")
.and_then(Value::as_str)
.ok_or_else(|| AdminError::validation("file connector requires a 'path'"))?;
Ok(Box::new(NamedConnector::new(
connector_name,
FileConnector::new(path),
)))
}
ConnectorKind::Github => {
let owner = cfg
.config
.get("owner")
.and_then(Value::as_str)
.ok_or_else(|| AdminError::validation("github connector requires an 'owner'"))?;
let repo = cfg
.config
.get("repo")
.and_then(Value::as_str)
.ok_or_else(|| AdminError::validation("github connector requires a 'repo'"))?;
let visibility = match cfg.config.get("visibility").and_then(Value::as_str) {
Some("private") => GithubVisibility::Private,
_ => GithubVisibility::Public,
};
let auth = resolve_github_auth(cfg, visibility)?;
let mut gh = GithubConnectorConfig::new(owner, repo, auth).visibility(visibility);
if let Some(r) = cfg.config.get("ref").and_then(Value::as_str) {
gh = gh.at_ref(r);
}
Ok(Box::new(NamedConnector::new(
connector_name,
GithubConnector::new(gh),
)))
}
}
}
fn resolve_github_auth(
cfg: &ConnectorConfig,
visibility: GithubVisibility,
) -> Result<GithubAuth, AdminError> {
match cfg.auth_ref() {
Some(name) => match std::env::var(name) {
Ok(token) if !token.trim().is_empty() => Ok(GithubAuth::Token(token)),
_ => Err(AdminError::validation(format!(
"github connector auth_ref '{name}' did not resolve to a token \
(set the named secret/env var); refusing to index"
))),
},
None => match visibility {
GithubVisibility::Public => Ok(GithubAuth::Unauthenticated),
GithubVisibility::Private => Err(AdminError::validation(
"github connector for a private repo requires an 'auth_ref' \
naming a token secret",
)),
},
}
}
struct NamedConnector<C: Connector> {
name: String,
inner: C,
}
impl<C: Connector> NamedConnector<C> {
fn new(name: String, inner: C) -> Self {
Self { name, inner }
}
}
#[async_trait::async_trait]
impl<C: Connector> Connector for NamedConnector<C> {
fn name(&self) -> &str {
&self.name
}
async fn pull(
&self,
since: Option<smooth_operator_ingestion::Timestamp>,
) -> anyhow::Result<Vec<smooth_operator_ingestion::RawDocument>> {
self.inner.pull(since).await
}
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct SettingsWrite {
model: String,
system_prompt: String,
#[serde(default)]
persona: Option<String>,
#[serde(default)]
default_tools: Vec<String>,
}
fn settings_json(s: &AgentSettings) -> Value {
serde_json::json!({
"settings": {
"orgId": s.org_id,
"model": s.model,
"systemPrompt": s.system_prompt,
"persona": s.persona,
"defaultTools": s.default_tools,
"updatedAt": s.updated_at,
}
})
}
async fn get_settings(
RequireRole::<1>(principal): RequireRole<1>,
State(state): State<AppState>,
) -> Json<Value> {
let settings = state.settings.get(&principal.org_id);
Json(settings_json(&settings))
}
async fn put_settings(
RequireRole::<2>(principal): RequireRole<2>,
State(state): State<AppState>,
Json(body): Json<SettingsWrite>,
) -> Json<Value> {
let settings = AgentSettings {
org_id: principal.org_id.clone(),
model: body.model,
system_prompt: body.system_prompt,
persona: body.persona,
default_tools: body.default_tools,
updated_at: chrono::Utc::now(),
};
state.settings.put(settings.clone());
Json(settings_json(&settings))
}
async fn conversation_owned_by(state: &AppState, conversation_id: &str, user_id: &str) -> bool {
match state
.storage
.list_participants_by_conversation(conversation_id)
.await
{
Ok(parts) => parts.iter().any(|p| {
p.participant_type == ParticipantType::User && p.external_id.as_deref() == Some(user_id)
}),
Err(_) => false,
}
}
#[derive(Deserialize)]
#[serde(tag = "type", content = "id", rename_all = "snake_case")]
enum PublishTarget {
Connection(String),
Session(String),
User(String),
Org(String),
Agent(String),
}
impl From<PublishTarget> for Target {
fn from(t: PublishTarget) -> Self {
match t {
PublishTarget::Connection(id) => Target::Connection(id),
PublishTarget::Session(id) => Target::Session(id),
PublishTarget::User(id) => Target::User(id),
PublishTarget::Org(id) => Target::Org(id),
PublishTarget::Agent(id) => Target::Agent(id),
}
}
}
#[derive(Deserialize)]
struct PublishRequest {
target: PublishTarget,
event: Value,
}
#[derive(Serialize)]
struct PublishResponse {
delivered: usize,
}
async fn publish_event(
RequireRole::<2>(_principal): RequireRole<2>,
State(state): State<AppState>,
Json(body): Json<PublishRequest>,
) -> Json<PublishResponse> {
let delivered = state
.backplane
.publish(body.target.into(), body.event)
.await;
Json(PublishResponse { delivered })
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn map_model_info_maps_sample_payload() {
let payload = serde_json::json!({
"data": [
{
"model_name": "claude-opus-4-8",
"model_info": {
"input_cost_per_token": 0.000015,
"output_cost_per_token": 0.000075,
"model_tier": "frontier",
"use_cases": ["reasoning", "coding"]
}
},
{
"model_name": "claude-haiku-4-5",
"model_info": {
"input_cost_per_token": 0.0000008,
"output_cost_per_token": 0.000004,
"model_tier": "fast",
"use_cases": ["chat"]
}
}
]
});
let out = map_model_info(&payload);
let opus = &out["claude-opus-4-8"];
assert!((opus["inputCostPerToken"].as_f64().unwrap() - 0.000015).abs() < 1e-12);
assert!((opus["outputCostPerToken"].as_f64().unwrap() - 0.000075).abs() < 1e-12);
assert_eq!(opus["tier"], "frontier");
assert_eq!(opus["useCases"], serde_json::json!(["reasoning", "coding"]));
let haiku = &out["claude-haiku-4-5"];
assert_eq!(haiku["tier"], "fast");
assert_eq!(haiku["useCases"], serde_json::json!(["chat"]));
}
#[test]
fn map_model_info_tolerates_missing_fields() {
let payload = serde_json::json!({
"data": [
{ "model_name": "bare", "model_info": {} },
{ "model_info": { "model_tier": "x" } }
]
});
let out = map_model_info(&payload);
let obj = out.as_object().unwrap();
assert_eq!(obj.len(), 1, "the model_name-less entry is skipped");
let bare = &out["bare"];
assert!(bare["inputCostPerToken"].is_null());
assert!(bare["outputCostPerToken"].is_null());
assert!(bare["tier"].is_null());
assert_eq!(bare["useCases"], serde_json::json!([]));
}
#[test]
fn map_model_info_empty_on_missing_data() {
assert_eq!(
map_model_info(&serde_json::json!({})),
serde_json::json!({})
);
assert_eq!(
map_model_info(&serde_json::json!({ "data": "nope" })),
serde_json::json!({})
);
}
}