1use axum::extract::{Path, Query, State};
36use axum::http::request::Parts;
37use axum::http::StatusCode;
38use axum::response::{IntoResponse, Response};
39use axum::routing::{get, post};
40use axum::{Json, Router};
41use serde::{Deserialize, Serialize};
42use serde_json::Value;
43
44use smooth_operator::auth::{AuthError, Principal, Role};
45use smooth_operator::backplane::Target;
46use smooth_operator::connector_config::{ConnectorConfig, ConnectorKind};
47use smooth_operator::domain::ParticipantType;
48use smooth_operator::settings::AgentSettings;
49
50use smooth_operator_ingestion::{
51 Chunker, Connector, FileConnector, GithubAuth, GithubConnector, GithubConnectorConfig,
52 GithubVisibility, IndexingService, WebConnector,
53};
54
55use crate::embedder::{build_embedder, EmbedderConfig};
56use crate::protocol;
57use crate::state::{scoped_connector_key, AppState};
58
59pub fn router() -> Router<AppState> {
61 Router::new()
62 .route("/admin/health", get(health))
63 .route("/admin/me", get(me))
64 .route("/admin/conversations", get(list_conversations))
65 .route(
66 "/admin/conversations/{id}/messages",
67 get(conversation_messages),
68 )
69 .route("/admin/indexing/runs", get(indexing_runs))
70 .route("/admin/document-sets", get(document_sets))
71 .route("/admin/model-costs", get(model_costs))
76 .route(
80 "/admin/connectors",
81 get(list_connectors).post(create_connector),
82 )
83 .route(
84 "/admin/connectors/{id}",
85 get(get_connector)
86 .put(update_connector)
87 .delete(delete_connector),
88 )
89 .route("/admin/connectors/{id}/index", post(index_connector))
90 .route("/admin/settings", get(get_settings).put(put_settings))
91 .route("/admin/publish", post(publish_event))
95 .layer(admin_cors())
105}
106
107pub(crate) fn admin_cors() -> tower_http::cors::CorsLayer {
115 use axum::http::{header, Method};
116 tower_http::cors::CorsLayer::new()
117 .allow_origin(tower_http::cors::Any)
118 .allow_methods([
119 Method::GET,
120 Method::POST,
121 Method::PUT,
122 Method::DELETE,
123 Method::OPTIONS,
124 ])
125 .allow_headers([header::AUTHORIZATION, header::CONTENT_TYPE])
126}
127
128pub struct RequireRole<const MIN: u8>(pub Principal);
140
141const fn role_rank(role: Role) -> u8 {
143 match role {
144 Role::Basic => 0,
145 Role::Curator => 1,
146 Role::Admin => 2,
147 }
148}
149
150const fn rank_role(min: u8) -> Role {
152 match min {
153 0 => Role::Basic,
154 1 => Role::Curator,
155 _ => Role::Admin,
156 }
157}
158
159impl<const MIN: u8> axum::extract::FromRequestParts<AppState> for RequireRole<MIN> {
160 type Rejection = AuthRejection;
161
162 async fn from_request_parts(
163 parts: &mut Parts,
164 state: &AppState,
165 ) -> Result<Self, Self::Rejection> {
166 let token = bearer_token(parts).ok_or(AuthRejection(AuthError::Unauthenticated))?;
167 let principal = state.auth.verify(&token).map_err(AuthRejection)?;
168 if role_rank(principal.role) < MIN {
169 return Err(AuthRejection(AuthError::Forbidden {
170 required: rank_role(MIN),
171 actual: principal.role,
172 }));
173 }
174 Ok(RequireRole(principal))
175 }
176}
177
178fn bearer_token(parts: &Parts) -> Option<String> {
181 let header = parts.headers.get(axum::http::header::AUTHORIZATION)?;
182 let value = header.to_str().ok()?;
183 let rest = value
184 .strip_prefix("Bearer ")
185 .or_else(|| value.strip_prefix("bearer "))?;
186 let trimmed = rest.trim();
187 if trimmed.is_empty() {
188 None
189 } else {
190 Some(trimmed.to_string())
191 }
192}
193
194pub struct AuthRejection(AuthError);
197
198impl IntoResponse for AuthRejection {
199 fn into_response(self) -> Response {
200 let (status, code) = match &self.0 {
201 AuthError::Unauthenticated => (StatusCode::UNAUTHORIZED, "UNAUTHENTICATED"),
202 AuthError::InvalidToken(_) => (StatusCode::UNAUTHORIZED, "INVALID_TOKEN"),
203 AuthError::MissingRole(_) => (StatusCode::UNAUTHORIZED, "MISSING_ROLE"),
204 AuthError::Forbidden { .. } => (StatusCode::FORBIDDEN, "FORBIDDEN"),
205 AuthError::Misconfigured(_) => {
208 (StatusCode::INTERNAL_SERVER_ERROR, "AUTH_MISCONFIGURED")
209 }
210 };
211 let body = protocol::error(None, code, &self.0.to_string());
212 (status, Json(body)).into_response()
213 }
214}
215
216struct AdminError(StatusCode, String, &'static str);
219
220impl IntoResponse for AdminError {
221 fn into_response(self) -> Response {
222 let body = protocol::error(None, self.2, &self.1);
223 (self.0, Json(body)).into_response()
224 }
225}
226
227impl AdminError {
228 fn internal(msg: impl Into<String>) -> Self {
229 Self(
230 StatusCode::INTERNAL_SERVER_ERROR,
231 msg.into(),
232 "INTERNAL_ERROR",
233 )
234 }
235
236 fn forbidden(msg: impl Into<String>) -> Self {
237 Self(StatusCode::FORBIDDEN, msg.into(), "FORBIDDEN")
238 }
239
240 fn not_found(msg: impl Into<String>) -> Self {
241 Self(StatusCode::NOT_FOUND, msg.into(), "NOT_FOUND")
242 }
243
244 fn validation(msg: impl Into<String>) -> Self {
247 Self(StatusCode::BAD_REQUEST, msg.into(), "VALIDATION_ERROR")
248 }
249}
250
251async fn health() -> Json<Value> {
257 Json(serde_json::json!({ "status": "ok" }))
258}
259
260async fn me(RequireRole::<0>(principal): RequireRole<0>) -> Json<Principal> {
262 Json(principal)
263}
264
265#[derive(Debug, Deserialize)]
267struct ConversationsQuery {
268 limit: Option<usize>,
270 cursor: Option<usize>,
273}
274
275#[derive(Debug, Serialize)]
277#[serde(rename_all = "camelCase")]
278struct ConversationRow {
279 id: String,
280 name: String,
281 platform: String,
282 created_at: chrono::DateTime<chrono::Utc>,
283 updated_at: chrono::DateTime<chrono::Utc>,
284}
285
286#[derive(Debug, Serialize)]
288#[serde(rename_all = "camelCase")]
289struct ConversationsResponse {
290 conversations: Vec<ConversationRow>,
291 next_cursor: Option<usize>,
293}
294
295async fn list_conversations(
298 RequireRole::<0>(principal): RequireRole<0>,
299 State(state): State<AppState>,
300 Query(q): Query<ConversationsQuery>,
301) -> Result<Json<ConversationsResponse>, AdminError> {
302 let limit = q.limit.unwrap_or(50).clamp(1, 200);
303 let offset = q.cursor.unwrap_or(0);
304
305 let all = state
306 .storage
307 .list_conversations_by_org(&principal.org_id)
308 .await
309 .map_err(|e| AdminError::internal(format!("list conversations failed: {e}")))?;
310
311 let visible: Vec<_> = if principal.role >= Role::Curator {
313 all
314 } else {
315 let mut owned = Vec::new();
316 for conv in all {
317 if conversation_owned_by(&state, &conv.id, &principal.user_id).await {
318 owned.push(conv);
319 }
320 }
321 owned
322 };
323
324 let total = visible.len();
325 let page: Vec<ConversationRow> = visible
326 .into_iter()
327 .skip(offset)
328 .take(limit)
329 .map(|c| ConversationRow {
330 id: c.id,
331 name: c.name,
332 platform: format!("{:?}", c.platform).to_lowercase(),
333 created_at: c.created_at,
334 updated_at: c.updated_at,
335 })
336 .collect();
337
338 let next = offset + page.len();
339 let next_cursor = if next < total { Some(next) } else { None };
340
341 Ok(Json(ConversationsResponse {
342 conversations: page,
343 next_cursor,
344 }))
345}
346
347async fn conversation_messages(
350 RequireRole::<0>(principal): RequireRole<0>,
351 State(state): State<AppState>,
352 Path(conversation_id): Path<String>,
353) -> Result<Json<Value>, AdminError> {
354 let conv = state
356 .storage
357 .get_conversation(&conversation_id)
358 .await
359 .map_err(|e| AdminError::internal(format!("get conversation failed: {e}")))?
360 .ok_or_else(|| {
361 AdminError::not_found(format!("conversation '{conversation_id}' not found"))
362 })?;
363
364 if conv.organization_id != principal.org_id {
365 return Err(AdminError::not_found(format!(
367 "conversation '{conversation_id}' not found"
368 )));
369 }
370
371 if principal.role < Role::Curator
373 && !conversation_owned_by(&state, &conversation_id, &principal.user_id).await
374 {
375 return Err(AdminError::forbidden(
376 "you do not have access to this conversation",
377 ));
378 }
379
380 let query = smooth_operator::adapter::MessageQuery::new(&conversation_id, 200);
381 let page = state
382 .storage
383 .list_messages_by_conversation(query)
384 .await
385 .map_err(|e| AdminError::internal(format!("list messages failed: {e}")))?;
386
387 Ok(Json(serde_json::json!({
388 "conversationId": conversation_id,
389 "messages": page.messages,
390 "nextCursor": page.next_cursor,
391 })))
392}
393
394async fn indexing_runs(
403 RequireRole::<1>(principal): RequireRole<1>,
404 State(state): State<AppState>,
405) -> Json<Value> {
406 let mut runs = Vec::new();
407 for connector in state.connectors(&principal.org_id) {
408 let key = scoped_connector_key(&principal.org_id, &connector);
409 for run in state.indexing.list_runs(&key) {
410 runs.push(serde_json::json!({
411 "id": run.id,
412 "connectorName": connector,
414 "status": format!("{:?}", run.status).to_lowercase(),
415 "startedAt": run.started_at,
416 "finishedAt": run.finished_at,
417 "documentsSeen": run.documents_seen,
418 "chunksIndexed": run.chunks_indexed,
419 "documentsSkipped": run.documents_skipped,
420 "cursor": run.cursor,
421 "error": run.error,
422 }));
423 }
424 }
425 Json(serde_json::json!({ "runs": runs }))
426}
427
428#[derive(Debug, Serialize)]
430#[serde(rename_all = "camelCase")]
431struct DocumentSetRow {
432 name: String,
433 document_count: usize,
434}
435
436async fn document_sets(
440 RequireRole::<1>(principal): RequireRole<1>,
441 State(state): State<AppState>,
442) -> Json<Value> {
443 let sets: Vec<DocumentSetRow> = state
444 .document_sets(&principal.org_id)
445 .into_iter()
446 .map(|(name, document_count)| DocumentSetRow {
447 name,
448 document_count,
449 })
450 .collect();
451 Json(serde_json::json!({ "documentSets": sets }))
452}
453
454async fn model_costs(State(state): State<AppState>) -> Json<Value> {
474 if let Some(cached) = state.model_costs_cache.get() {
476 return Json(cached.clone());
477 }
478 match fetch_model_costs(&state.config).await {
479 Ok(map) => {
480 let _ = state.model_costs_cache.set(map.clone());
483 Json(map)
484 }
485 Err(_) => Json(serde_json::json!({})),
488 }
489}
490
491async fn fetch_model_costs(config: &crate::config::ServerConfig) -> anyhow::Result<Value> {
499 let url = format!("{}/model/info", config.gateway_url.trim_end_matches('/'));
502 let client = reqwest::Client::new();
503 let mut req = client.get(&url);
504 if let Some(key) = config.gateway_key.as_deref() {
505 req = req.bearer_auth(key);
506 }
507 let payload: Value = req.send().await?.error_for_status()?.json().await?;
508 Ok(map_model_info(&payload))
509}
510
511fn map_model_info(payload: &Value) -> Value {
519 let mut out = serde_json::Map::new();
520 let Some(entries) = payload.get("data").and_then(Value::as_array) else {
521 return Value::Object(out);
522 };
523 for entry in entries {
524 let Some(name) = entry.get("model_name").and_then(Value::as_str) else {
525 continue;
526 };
527 let info = entry.get("model_info");
528 let input = info
529 .and_then(|i| i.get("input_cost_per_token"))
530 .and_then(Value::as_f64);
531 let output = info
532 .and_then(|i| i.get("output_cost_per_token"))
533 .and_then(Value::as_f64);
534 let tier = info
535 .and_then(|i| i.get("model_tier"))
536 .and_then(Value::as_str);
537 let use_cases = info
538 .and_then(|i| i.get("use_cases"))
539 .and_then(Value::as_array)
540 .cloned()
541 .unwrap_or_default();
542 out.insert(
543 name.to_string(),
544 serde_json::json!({
545 "inputCostPerToken": input,
546 "outputCostPerToken": output,
547 "tier": tier,
548 "useCases": use_cases,
549 }),
550 );
551 }
552 Value::Object(out)
553}
554
555#[derive(Debug, Deserialize)]
563struct ConnectorWrite {
564 name: String,
565 kind: String,
566 #[serde(default)]
567 config: Value,
568 #[serde(default = "default_enabled")]
569 enabled: bool,
570}
571
572const fn default_enabled() -> bool {
573 true
574}
575
576fn connector_json(cfg: &ConnectorConfig) -> Value {
581 serde_json::json!({
582 "connector": {
583 "id": cfg.id,
584 "name": cfg.name,
585 "kind": cfg.kind.as_str(),
586 "config": cfg.config,
587 "enabled": cfg.enabled,
588 "createdAt": cfg.created_at,
589 "updatedAt": cfg.updated_at,
590 }
591 })
592}
593
594fn validate_connector(kind: ConnectorKind, config: &Value) -> Result<(), AdminError> {
597 let missing = |field: &str| {
598 AdminError::validation(format!(
599 "{} connector config requires a '{field}' field",
600 kind.as_str()
601 ))
602 };
603 match kind {
604 ConnectorKind::Github => {
605 if config.get("owner").and_then(Value::as_str).is_none() {
606 return Err(missing("owner"));
607 }
608 if config.get("repo").and_then(Value::as_str).is_none() {
609 return Err(missing("repo"));
610 }
611 }
612 ConnectorKind::Web => {
613 if config.get("url").and_then(Value::as_str).is_none() {
614 return Err(missing("url"));
615 }
616 }
617 ConnectorKind::File => {
618 if config.get("path").and_then(Value::as_str).is_none() {
619 return Err(missing("path"));
620 }
621 }
622 }
623 Ok(())
624}
625
626async fn list_connectors(
628 RequireRole::<1>(principal): RequireRole<1>,
629 State(state): State<AppState>,
630) -> Json<Value> {
631 let connectors: Vec<Value> = state
632 .connector_configs
633 .list(&principal.org_id)
634 .iter()
635 .map(|c| connector_json(c)["connector"].clone())
636 .collect();
637 Json(serde_json::json!({ "connectors": connectors }))
638}
639
640async fn get_connector(
643 RequireRole::<1>(principal): RequireRole<1>,
644 State(state): State<AppState>,
645 Path(id): Path<String>,
646) -> Result<Json<Value>, AdminError> {
647 let cfg = state
648 .connector_configs
649 .get(&principal.org_id, &id)
650 .ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
651 Ok(Json(connector_json(&cfg)))
652}
653
654async fn create_connector(
657 RequireRole::<2>(principal): RequireRole<2>,
658 State(state): State<AppState>,
659 Json(body): Json<ConnectorWrite>,
660) -> Result<Response, AdminError> {
661 let kind = ConnectorKind::parse(&body.kind)
662 .map_err(|bad| AdminError::validation(format!("unknown connector kind '{bad}'")))?;
663 validate_connector(kind, &body.config)?;
664
665 let now = chrono::Utc::now();
666 let cfg = ConnectorConfig {
667 id: uuid::Uuid::new_v4().to_string(),
668 org_id: principal.org_id.clone(),
669 name: body.name,
670 kind,
671 config: body.config,
672 enabled: body.enabled,
673 created_at: now,
674 updated_at: now,
675 };
676 state.connector_configs.upsert(cfg.clone());
677 state.record_connector(principal.org_id.clone(), cfg.name.clone());
680 Ok((StatusCode::CREATED, Json(connector_json(&cfg))).into_response())
681}
682
683async fn update_connector(
686 RequireRole::<2>(principal): RequireRole<2>,
687 State(state): State<AppState>,
688 Path(id): Path<String>,
689 Json(body): Json<ConnectorWrite>,
690) -> Result<Json<Value>, AdminError> {
691 let existing = state
692 .connector_configs
693 .get(&principal.org_id, &id)
694 .ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
695
696 let kind = ConnectorKind::parse(&body.kind)
697 .map_err(|bad| AdminError::validation(format!("unknown connector kind '{bad}'")))?;
698 validate_connector(kind, &body.config)?;
699
700 let cfg = ConnectorConfig {
701 id: existing.id,
702 org_id: existing.org_id,
703 name: body.name,
704 kind,
705 config: body.config,
706 enabled: body.enabled,
707 created_at: existing.created_at,
708 updated_at: chrono::Utc::now(),
709 };
710 state.connector_configs.upsert(cfg.clone());
711 state.record_connector(principal.org_id.clone(), cfg.name.clone());
712 Ok(Json(connector_json(&cfg)))
713}
714
715async fn delete_connector(
718 RequireRole::<2>(principal): RequireRole<2>,
719 State(state): State<AppState>,
720 Path(id): Path<String>,
721) -> Result<Response, AdminError> {
722 if state.connector_configs.delete(&principal.org_id, &id) {
723 Ok(StatusCode::NO_CONTENT.into_response())
724 } else {
725 Err(AdminError::not_found(format!("connector '{id}' not found")))
726 }
727}
728
729async fn index_connector(
738 RequireRole::<1>(principal): RequireRole<1>,
739 State(state): State<AppState>,
740 Path(id): Path<String>,
741) -> Result<Json<Value>, AdminError> {
742 let cfg = state
743 .connector_configs
744 .get(&principal.org_id, &id)
745 .ok_or_else(|| AdminError::not_found(format!("connector '{id}' not found")))?;
746
747 let scoped_key = scoped_connector_key(&principal.org_id, &cfg.name);
755 let connector = build_connector(&cfg, &scoped_key)?;
756
757 let service = IndexingService::new(principal.org_id.clone());
758 let chunker = Chunker::default();
759 let embedder = build_embedder(&EmbedderConfig::from_server_config(&state.config));
765 let knowledge = state.storage.knowledge();
766
767 let run = service
768 .run_once(
769 connector.as_ref(),
770 state.indexing.as_ref(),
771 &chunker,
772 embedder.as_ref(),
773 knowledge,
774 )
775 .await
776 .map_err(|e| AdminError::internal(format!("indexing failed: {e}")))?;
777
778 state.record_connector(principal.org_id.clone(), cfg.name.clone());
780
781 Ok(Json(serde_json::json!({
782 "run": {
783 "id": run.id,
784 "connectorName": cfg.name,
786 "status": format!("{:?}", run.status).to_lowercase(),
787 "startedAt": run.started_at,
788 "finishedAt": run.finished_at,
789 "documentsSeen": run.documents_seen,
790 "chunksIndexed": run.chunks_indexed,
791 "documentsSkipped": run.documents_skipped,
792 "cursor": run.cursor,
793 "error": run.error,
794 }
795 })))
796}
797
798fn build_connector(
809 cfg: &ConnectorConfig,
810 connector_name: &str,
811) -> Result<Box<dyn Connector>, AdminError> {
812 let connector_name = connector_name.to_string();
813 match cfg.kind {
814 ConnectorKind::Web => {
815 let url = cfg
816 .config
817 .get("url")
818 .and_then(Value::as_str)
819 .ok_or_else(|| AdminError::validation("web connector requires a 'url'"))?;
820 Ok(Box::new(NamedConnector::new(
821 connector_name,
822 WebConnector::new(url),
823 )))
824 }
825 ConnectorKind::File => {
826 let path = cfg
827 .config
828 .get("path")
829 .and_then(Value::as_str)
830 .ok_or_else(|| AdminError::validation("file connector requires a 'path'"))?;
831 Ok(Box::new(NamedConnector::new(
832 connector_name,
833 FileConnector::new(path),
834 )))
835 }
836 ConnectorKind::Github => {
837 let owner = cfg
838 .config
839 .get("owner")
840 .and_then(Value::as_str)
841 .ok_or_else(|| AdminError::validation("github connector requires an 'owner'"))?;
842 let repo = cfg
843 .config
844 .get("repo")
845 .and_then(Value::as_str)
846 .ok_or_else(|| AdminError::validation("github connector requires a 'repo'"))?;
847
848 let visibility = match cfg.config.get("visibility").and_then(Value::as_str) {
851 Some("private") => GithubVisibility::Private,
852 _ => GithubVisibility::Public,
853 };
854 let auth = resolve_github_auth(cfg, visibility)?;
855
856 let mut gh = GithubConnectorConfig::new(owner, repo, auth).visibility(visibility);
857 if let Some(r) = cfg.config.get("ref").and_then(Value::as_str) {
858 gh = gh.at_ref(r);
859 }
860 Ok(Box::new(NamedConnector::new(
861 connector_name,
862 GithubConnector::new(gh),
863 )))
864 }
865 }
866}
867
868fn resolve_github_auth(
875 cfg: &ConnectorConfig,
876 visibility: GithubVisibility,
877) -> Result<GithubAuth, AdminError> {
878 match cfg.auth_ref() {
879 Some(name) => match std::env::var(name) {
880 Ok(token) if !token.trim().is_empty() => Ok(GithubAuth::Token(token)),
881 _ => Err(AdminError::validation(format!(
882 "github connector auth_ref '{name}' did not resolve to a token \
883 (set the named secret/env var); refusing to index"
884 ))),
885 },
886 None => match visibility {
887 GithubVisibility::Public => Ok(GithubAuth::Unauthenticated),
888 GithubVisibility::Private => Err(AdminError::validation(
889 "github connector for a private repo requires an 'auth_ref' \
890 naming a token secret",
891 )),
892 },
893 }
894}
895
896struct NamedConnector<C: Connector> {
901 name: String,
902 inner: C,
903}
904
905impl<C: Connector> NamedConnector<C> {
906 fn new(name: String, inner: C) -> Self {
907 Self { name, inner }
908 }
909}
910
911#[async_trait::async_trait]
912impl<C: Connector> Connector for NamedConnector<C> {
913 fn name(&self) -> &str {
914 &self.name
915 }
916
917 async fn pull(
918 &self,
919 since: Option<smooth_operator_ingestion::Timestamp>,
920 ) -> anyhow::Result<Vec<smooth_operator_ingestion::RawDocument>> {
921 self.inner.pull(since).await
922 }
923}
924
925#[derive(Debug, Deserialize)]
931#[serde(rename_all = "camelCase")]
932struct SettingsWrite {
933 model: String,
934 system_prompt: String,
935 #[serde(default)]
938 persona: Option<String>,
939 #[serde(default)]
940 default_tools: Vec<String>,
941}
942
943fn settings_json(s: &AgentSettings) -> Value {
945 serde_json::json!({
946 "settings": {
947 "orgId": s.org_id,
948 "model": s.model,
949 "systemPrompt": s.system_prompt,
950 "persona": s.persona,
951 "defaultTools": s.default_tools,
952 "updatedAt": s.updated_at,
953 }
954 })
955}
956
957async fn get_settings(
959 RequireRole::<1>(principal): RequireRole<1>,
960 State(state): State<AppState>,
961) -> Json<Value> {
962 let settings = state.settings.get(&principal.org_id);
963 Json(settings_json(&settings))
964}
965
966async fn put_settings(
968 RequireRole::<2>(principal): RequireRole<2>,
969 State(state): State<AppState>,
970 Json(body): Json<SettingsWrite>,
971) -> Json<Value> {
972 let settings = AgentSettings {
973 org_id: principal.org_id.clone(),
974 model: body.model,
975 system_prompt: body.system_prompt,
976 persona: body.persona,
977 default_tools: body.default_tools,
978 updated_at: chrono::Utc::now(),
979 };
980 state.settings.put(settings.clone());
981 Json(settings_json(&settings))
982}
983
984async fn conversation_owned_by(state: &AppState, conversation_id: &str, user_id: &str) -> bool {
991 match state
992 .storage
993 .list_participants_by_conversation(conversation_id)
994 .await
995 {
996 Ok(parts) => parts.iter().any(|p| {
997 p.participant_type == ParticipantType::User && p.external_id.as_deref() == Some(user_id)
998 }),
999 Err(_) => false,
1000 }
1001}
1002
1003#[derive(Deserialize)]
1010#[serde(tag = "type", content = "id", rename_all = "snake_case")]
1011enum PublishTarget {
1012 Connection(String),
1013 Session(String),
1014 User(String),
1015 Org(String),
1016 Agent(String),
1017}
1018
1019impl From<PublishTarget> for Target {
1020 fn from(t: PublishTarget) -> Self {
1021 match t {
1022 PublishTarget::Connection(id) => Target::Connection(id),
1023 PublishTarget::Session(id) => Target::Session(id),
1024 PublishTarget::User(id) => Target::User(id),
1025 PublishTarget::Org(id) => Target::Org(id),
1026 PublishTarget::Agent(id) => Target::Agent(id),
1027 }
1028 }
1029}
1030
1031#[derive(Deserialize)]
1034struct PublishRequest {
1035 target: PublishTarget,
1036 event: Value,
1037}
1038
1039#[derive(Serialize)]
1041struct PublishResponse {
1042 delivered: usize,
1047}
1048
1049async fn publish_event(
1061 RequireRole::<2>(_principal): RequireRole<2>,
1062 State(state): State<AppState>,
1063 Json(body): Json<PublishRequest>,
1064) -> Json<PublishResponse> {
1065 let delivered = state
1066 .backplane
1067 .publish(body.target.into(), body.event)
1068 .await;
1069 Json(PublishResponse { delivered })
1070}
1071
1072#[cfg(test)]
1073mod tests {
1074 use super::*;
1075
1076 #[test]
1077 fn map_model_info_maps_sample_payload() {
1078 let payload = serde_json::json!({
1080 "data": [
1081 {
1082 "model_name": "claude-opus-4-8",
1083 "model_info": {
1084 "input_cost_per_token": 0.000015,
1085 "output_cost_per_token": 0.000075,
1086 "model_tier": "frontier",
1087 "use_cases": ["reasoning", "coding"]
1088 }
1089 },
1090 {
1091 "model_name": "claude-haiku-4-5",
1092 "model_info": {
1093 "input_cost_per_token": 0.0000008,
1094 "output_cost_per_token": 0.000004,
1095 "model_tier": "fast",
1096 "use_cases": ["chat"]
1097 }
1098 }
1099 ]
1100 });
1101
1102 let out = map_model_info(&payload);
1103 let opus = &out["claude-opus-4-8"];
1104 assert!((opus["inputCostPerToken"].as_f64().unwrap() - 0.000015).abs() < 1e-12);
1105 assert!((opus["outputCostPerToken"].as_f64().unwrap() - 0.000075).abs() < 1e-12);
1106 assert_eq!(opus["tier"], "frontier");
1107 assert_eq!(opus["useCases"], serde_json::json!(["reasoning", "coding"]));
1108
1109 let haiku = &out["claude-haiku-4-5"];
1110 assert_eq!(haiku["tier"], "fast");
1111 assert_eq!(haiku["useCases"], serde_json::json!(["chat"]));
1112 }
1113
1114 #[test]
1115 fn map_model_info_tolerates_missing_fields() {
1116 let payload = serde_json::json!({
1119 "data": [
1120 { "model_name": "bare", "model_info": {} },
1121 { "model_info": { "model_tier": "x" } }
1122 ]
1123 });
1124 let out = map_model_info(&payload);
1125 let obj = out.as_object().unwrap();
1126 assert_eq!(obj.len(), 1, "the model_name-less entry is skipped");
1127 let bare = &out["bare"];
1128 assert!(bare["inputCostPerToken"].is_null());
1129 assert!(bare["outputCostPerToken"].is_null());
1130 assert!(bare["tier"].is_null());
1131 assert_eq!(bare["useCases"], serde_json::json!([]));
1132 }
1133
1134 #[test]
1135 fn map_model_info_empty_on_missing_data() {
1136 assert_eq!(
1138 map_model_info(&serde_json::json!({})),
1139 serde_json::json!({})
1140 );
1141 assert_eq!(
1142 map_model_info(&serde_json::json!({ "data": "nope" })),
1143 serde_json::json!({})
1144 );
1145 }
1146}