1use anyhow::Result;
2use axum::extract::DefaultBodyLimit;
3use axum::routing::get;
4use axum::{Json, Router};
5use serde_json::json;
6use systemprompt_database::DatabaseQuery;
7use systemprompt_models::api::SingleResponse;
8use systemprompt_models::modules::ApiPaths;
9use systemprompt_models::AppPaths;
10use systemprompt_runtime::AppContext;
11use systemprompt_traits::{StartupEvent, StartupEventExt, StartupEventSender};
12
13use super::routes::configure_routes;
14use crate::models::ServerConfig;
15use crate::services::middleware::{
16 inject_trace_header, remove_trailing_slash, AnalyticsMiddleware, ContextMiddleware,
17 CorsMiddleware, JwtContextExtractor, SessionMiddleware,
18};
19
20const HEALTH_CHECK_QUERY: DatabaseQuery = DatabaseQuery::new("SELECT 1");
21
22#[derive(Debug)]
23pub struct ApiServer {
24 router: Router,
25 _config: ServerConfig,
26 events: Option<StartupEventSender>,
27}
28
29impl ApiServer {
30 pub fn new(router: Router, events: Option<StartupEventSender>) -> Self {
31 Self::with_config(router, ServerConfig::default(), events)
32 }
33
34 pub const fn with_config(
35 router: Router,
36 config: ServerConfig,
37 events: Option<StartupEventSender>,
38 ) -> Self {
39 Self {
40 router,
41 _config: config,
42 events,
43 }
44 }
45
46 pub async fn serve(self, addr: &str) -> Result<()> {
47 if let Some(ref tx) = self.events {
48 if tx
49 .unbounded_send(StartupEvent::ServerBinding {
50 address: addr.to_string(),
51 })
52 .is_err()
53 {
54 tracing::debug!("Startup event receiver dropped");
55 }
56 }
57
58 let listener = self.create_listener(addr).await?;
59
60 if let Some(ref tx) = self.events {
61 tx.server_listening(addr, std::process::id());
62 }
63
64 axum::serve(
65 listener,
66 self.router
67 .into_make_service_with_connect_info::<std::net::SocketAddr>(),
68 )
69 .await?;
70 Ok(())
71 }
72
73 async fn create_listener(&self, addr: &str) -> Result<tokio::net::TcpListener> {
74 tokio::net::TcpListener::bind(addr)
75 .await
76 .map_err(|e| anyhow::anyhow!("Failed to bind to {addr}: {e}"))
77 }
78}
79
80pub fn setup_api_server(ctx: &AppContext, events: Option<StartupEventSender>) -> Result<ApiServer> {
81 let rate_config = &ctx.config().rate_limits;
82
83 if rate_config.disabled {
84 if let Some(ref tx) = events {
85 tx.warning("Rate limiting disabled - development mode only");
86 }
87 }
88
89 let router = configure_routes(ctx, events.as_ref())?;
90 let router = apply_global_middleware(router, ctx)?;
91
92 Ok(ApiServer::new(router, events))
93}
94
95fn apply_global_middleware(router: Router, ctx: &AppContext) -> Result<Router> {
96 let mut router = router;
97
98 router = router.layer(DefaultBodyLimit::max(100 * 1024 * 1024));
99
100 let analytics_middleware = AnalyticsMiddleware::new(ctx);
101 router = router.layer(axum::middleware::from_fn({
102 let middleware = analytics_middleware;
103 move |req, next| {
104 let middleware = middleware.clone();
105 async move { middleware.track_request(req, next).await }
106 }
107 }));
108
109 let jwt_extractor = JwtContextExtractor::new(
110 systemprompt_models::SecretsBootstrap::jwt_secret()?,
111 ctx.db_pool(),
112 );
113 let global_context_middleware = ContextMiddleware::public(jwt_extractor);
114 router = router.layer(axum::middleware::from_fn({
115 let middleware = global_context_middleware;
116 move |req, next| {
117 let middleware = middleware.clone();
118 async move { middleware.handle(req, next).await }
119 }
120 }));
121
122 let session_middleware = SessionMiddleware::new(ctx)?;
123 router = router.layer(axum::middleware::from_fn({
124 let middleware = session_middleware;
125 move |req, next| {
126 let middleware = middleware.clone();
127 async move { middleware.handle(req, next).await }
128 }
129 }));
130
131 let cors = CorsMiddleware::build_layer(ctx.config())?;
132 router = router.layer(cors);
133
134 router = router.layer(axum::middleware::from_fn(remove_trailing_slash));
135
136 router = router.layer(axum::middleware::from_fn(inject_trace_header));
137
138 Ok(router)
139}
140
141pub async fn handle_root_discovery(
142 axum::extract::State(ctx): axum::extract::State<AppContext>,
143) -> impl axum::response::IntoResponse {
144 let base = &ctx.config().api_external_url;
145 let data = json!({
146 "name": format!("{} API", ctx.config().sitename),
147 "version": "1.0.0",
148 "description": "systemprompt.io OS API Gateway",
149 "endpoints": {
150 "health": format!("{}{}", base, ApiPaths::HEALTH),
151 "oauth": {
152 "href": format!("{}{}", base, ApiPaths::OAUTH_BASE),
153 "description": "OAuth2/OIDC authentication and WebAuthn",
154 "endpoints": {
155 "authorize": format!("{}{}", base, ApiPaths::OAUTH_AUTHORIZE),
156 "token": format!("{}{}", base, ApiPaths::OAUTH_TOKEN),
157 "userinfo": format!("{}{}/userinfo", base, ApiPaths::OAUTH_BASE),
158 "introspect": format!("{}{}/introspect", base, ApiPaths::OAUTH_BASE),
159 "revoke": format!("{}{}/revoke", base, ApiPaths::OAUTH_BASE),
160 "webauthn": format!("{}{}/webauthn", base, ApiPaths::OAUTH_BASE)
161 }
162 },
163 "core": {
164 "href": format!("{}{}", base, ApiPaths::CORE_BASE),
165 "description": "Core conversation, task, and artifact management",
166 "endpoints": {
167 "contexts": format!("{}{}", base, ApiPaths::CORE_CONTEXTS),
168 "tasks": format!("{}{}", base, ApiPaths::CORE_TASKS),
169 "artifacts": format!("{}{}", base, ApiPaths::CORE_ARTIFACTS)
170 }
171 },
172 "agents": {
173 "href": format!("{}{}", base, ApiPaths::AGENTS_REGISTRY),
174 "description": "A2A protocol agent registry and proxy",
175 "endpoints": {
176 "registry": format!("{}{}", base, ApiPaths::AGENTS_REGISTRY),
177 "proxy": format!("{}{}{{agent_id}}", base, ApiPaths::AGENTS_BASE)
178 }
179 },
180 "mcp": {
181 "href": format!("{}{}", base, ApiPaths::MCP_REGISTRY),
182 "description": "MCP server registry and lifecycle management",
183 "endpoints": {
184 "registry": format!("{}{}", base, ApiPaths::MCP_REGISTRY),
185 "proxy": format!("{}{}{{server_name}}", base, ApiPaths::MCP_BASE)
186 }
187 },
188 "stream": {
189 "href": format!("{}{}", base, ApiPaths::STREAM_BASE),
190 "description": "Server-Sent Events (SSE) for real-time updates",
191 "endpoints": {
192 "contexts": format!("{}{}", base, ApiPaths::STREAM_CONTEXTS)
193 }
194 }
195 },
196 "wellknown": {
197 "oauth": format!("{}{}", base, ApiPaths::WELLKNOWN_OAUTH_SERVER),
198 "agent": format!("{}{}", base, ApiPaths::WELLKNOWN_AGENT_CARD)
199 }
200 });
201
202 Json(SingleResponse::new(data))
203}
204
205#[cfg(target_os = "linux")]
206fn parse_proc_status_kb(content: &str, key: &str) -> Option<u64> {
207 content
208 .lines()
209 .find(|line| line.starts_with(key))
210 .and_then(|line| {
211 line.split_whitespace()
212 .nth(1)
213 .and_then(|v| v.parse::<u64>().ok())
214 })
215}
216
217#[cfg(target_os = "linux")]
218fn get_process_memory() -> Option<serde_json::Value> {
219 let content = std::fs::read_to_string("/proc/self/status").ok()?;
220
221 let rss_kb = parse_proc_status_kb(&content, "VmRSS:");
222 let virt_kb = parse_proc_status_kb(&content, "VmSize:");
223 let peak_kb = parse_proc_status_kb(&content, "VmPeak:");
224
225 Some(json!({
226 "rss_mb": rss_kb.map(|kb| kb / 1024),
227 "virtual_mb": virt_kb.map(|kb| kb / 1024),
228 "peak_mb": peak_kb.map(|kb| kb / 1024)
229 }))
230}
231
232#[cfg(not(target_os = "linux"))]
233fn get_process_memory() -> Option<serde_json::Value> {
234 None
235}
236
237pub async fn handle_health(
238 axum::extract::State(ctx): axum::extract::State<AppContext>,
239) -> impl axum::response::IntoResponse {
240 use axum::http::StatusCode;
241 use systemprompt_database::{DatabaseProvider, ServiceRepository};
242
243 let start = std::time::Instant::now();
244
245 let (db_status, db_latency_ms) = {
246 let db_start = std::time::Instant::now();
247 let status = match ctx.db_pool().fetch_optional(&HEALTH_CHECK_QUERY, &[]).await {
248 Ok(_) => "healthy",
249 Err(_) => "unhealthy",
250 };
251 (status, db_start.elapsed().as_millis())
252 };
253
254 let service_repo = ServiceRepository::new(ctx.db_pool().clone());
255
256 let (agent_count, agent_status) = match service_repo.count_running_services("agent").await {
257 Ok(count) if count > 0 => (count, "healthy"),
258 Ok(_) => (0, "none"),
259 Err(_) => (0, "error"),
260 };
261
262 let (mcp_count, mcp_status) = match service_repo.count_running_services("mcp").await {
263 Ok(count) if count > 0 => (count, "healthy"),
264 Ok(_) => (0, "none"),
265 Err(_) => (0, "error"),
266 };
267
268 let web_dir = AppPaths::get()
269 .map(|p| p.web().dist().to_path_buf())
270 .unwrap_or_else(|e| {
271 tracing::debug!(error = %e, "Failed to get web dist path, using default");
272 std::path::PathBuf::from("/var/www/html/dist")
273 });
274 let sitemap_exists = web_dir.join("sitemap.xml").exists();
275 let index_exists = web_dir.join("index.html").exists();
276
277 let db_healthy = db_status == "healthy";
278 let services_ok = agent_status != "error" && mcp_status != "error";
279 let content_ok = sitemap_exists && index_exists;
280
281 let (overall_status, http_status) = match (db_healthy, services_ok && content_ok) {
282 (false, _) => ("unhealthy", StatusCode::SERVICE_UNAVAILABLE),
283 (true, false) => ("degraded", StatusCode::OK),
284 (true, true) => ("healthy", StatusCode::OK),
285 };
286
287 let check_duration_ms = start.elapsed().as_millis();
288 let memory = get_process_memory();
289
290 let data = json!({
291 "status": overall_status,
292 "timestamp": chrono::Utc::now().to_rfc3339(),
293 "version": env!("CARGO_PKG_VERSION"),
294 "checks": {
295 "database": {
296 "status": db_status,
297 "latency_ms": db_latency_ms
298 },
299 "agents": {
300 "status": agent_status,
301 "count": agent_count
302 },
303 "mcp": {
304 "status": mcp_status,
305 "count": mcp_count
306 },
307 "static_content": {
308 "status": if content_ok { "healthy" } else { "degraded" },
309 "index_html": index_exists,
310 "sitemap_xml": sitemap_exists
311 }
312 },
313 "memory": memory,
314 "response_time_ms": check_duration_ms
315 });
316
317 (http_status, Json(data))
318}
319
320pub async fn handle_core_discovery(
321 axum::extract::State(ctx): axum::extract::State<AppContext>,
322) -> impl axum::response::IntoResponse {
323 let base = &ctx.config().api_external_url;
324 let data = json!({
325 "name": "Core Services",
326 "description": "Core conversation, task, and artifact management APIs",
327 "endpoints": {
328 "contexts": {
329 "href": format!("{}{}", base, ApiPaths::CORE_CONTEXTS),
330 "description": "Conversation context management",
331 "methods": ["GET", "POST", "DELETE"]
332 },
333 "tasks": {
334 "href": format!("{}{}", base, ApiPaths::CORE_TASKS),
335 "description": "Task management for agent operations",
336 "methods": ["GET", "POST", "PUT", "DELETE"]
337 },
338 "artifacts": {
339 "href": format!("{}{}", base, ApiPaths::CORE_ARTIFACTS),
340 "description": "Artifact storage and retrieval",
341 "methods": ["GET", "POST", "DELETE"]
342 },
343 "oauth": {
344 "href": format!("{}{}", base, ApiPaths::OAUTH_BASE),
345 "description": "OAuth2/OIDC authentication endpoints"
346 }
347 }
348 });
349 Json(SingleResponse::new(data))
350}
351
352pub async fn handle_agents_discovery(
353 axum::extract::State(ctx): axum::extract::State<AppContext>,
354) -> impl axum::response::IntoResponse {
355 let base = &ctx.config().api_external_url;
356 let data = json!({
357 "name": "Agent Services",
358 "description": "A2A protocol agent registry and proxy",
359 "endpoints": {
360 "registry": {
361 "href": format!("{}{}", base, ApiPaths::AGENTS_REGISTRY),
362 "description": "List and discover available agents",
363 "methods": ["GET"]
364 },
365 "proxy": {
366 "href": format!("{}{}/<agent_id>/", base, ApiPaths::AGENTS_BASE),
367 "description": "Proxy requests to specific agents",
368 "methods": ["GET", "POST"]
369 }
370 }
371 });
372 Json(SingleResponse::new(data))
373}
374
375pub async fn handle_mcp_discovery(
376 axum::extract::State(ctx): axum::extract::State<AppContext>,
377) -> impl axum::response::IntoResponse {
378 let base = &ctx.config().api_external_url;
379 let data = json!({
380 "name": "MCP Services",
381 "description": "Model Context Protocol server registry and proxy",
382 "endpoints": {
383 "registry": {
384 "href": format!("{}{}", base, ApiPaths::MCP_REGISTRY),
385 "description": "List and discover available MCP servers",
386 "methods": ["GET"]
387 },
388 "proxy": {
389 "href": format!("{}{}/<server_name>/mcp", base, ApiPaths::MCP_BASE),
390 "description": "Proxy requests to specific MCP servers",
391 "methods": ["GET", "POST"]
392 }
393 }
394 });
395 Json(SingleResponse::new(data))
396}
397
398pub fn discovery_router(ctx: &AppContext) -> Router {
399 Router::new()
400 .route(ApiPaths::DISCOVERY, get(handle_root_discovery))
401 .route(ApiPaths::HEALTH, get(handle_health))
402 .route("/health", get(handle_health))
403 .route(ApiPaths::CORE_BASE, get(handle_core_discovery))
404 .route(ApiPaths::AGENTS_BASE, get(handle_agents_discovery))
405 .route(ApiPaths::MCP_BASE, get(handle_mcp_discovery))
406 .with_state(ctx.clone())
407}