langgraph_tracing/
server.rs1use crate::event_bus::EventBus;
2use crate::store::{TraceFilter, TracingStore};
3use crate::types::*;
4use axum::extract::{Path, Query, State};
5use axum::response::IntoResponse;
6use axum::routing::get;
7use axum::{Json, Router};
8use futures::{SinkExt, StreamExt};
9use serde::Deserialize;
10use std::sync::Arc;
11use tower_http::cors::CorsLayer;
12
13#[derive(Debug, Deserialize)]
15struct ListTracesQuery {
16 status: Option<String>,
17 name: Option<String>,
18 limit: Option<usize>,
19 offset: Option<usize>,
20}
21
22async fn list_traces(
24 State(state): State<AppState>,
25 Query(params): Query<ListTracesQuery>,
26) -> Json<Vec<TraceSummary>> {
27 let filter = TraceFilter {
28 status: params.status.and_then(|s| match s.as_str() {
29 "running" => Some(TraceStatus::Running),
30 "success" => Some(TraceStatus::Success),
31 "error" => Some(TraceStatus::Error),
32 "interrupted" => Some(TraceStatus::Interrupted),
33 _ => None,
34 }),
35 name_contains: params.name,
36 limit: params.limit,
37 offset: params.offset,
38 };
39 Json(state.store.list_traces(&filter))
40}
41
42async fn get_trace(
44 State(state): State<AppState>,
45 Path(trace_id): Path<String>,
46) -> Result<Json<TraceDetail>, axum::http::StatusCode> {
47 state
48 .store
49 .get_trace(&trace_id)
50 .map(Json)
51 .ok_or(axum::http::StatusCode::NOT_FOUND)
52}
53
54async fn clear_traces(State(state): State<AppState>) {
56 state.store.clear();
57}
58
59async fn ws_handler(
61 ws: axum::extract::WebSocketUpgrade,
62 State(state): State<AppState>,
63) -> axum::response::Response {
64 ws.on_upgrade(|socket| handle_ws(socket, state))
65}
66
67async fn handle_ws(socket: axum::extract::ws::WebSocket, state: AppState) {
68 let (mut sender, _receiver) = socket.split();
69 let mut rx = state.event_bus.subscribe();
70
71 while let Ok(event) = rx.recv().await {
72 if let Ok(json) = serde_json::to_string(&event) {
73 let msg: axum::extract::ws::Message = axum::extract::ws::Message::Text(json);
74 if sender.send(msg).await.is_err() {
75 break;
76 }
77 }
78 }
79}
80
81#[derive(Clone)]
82struct AppState {
83 store: Arc<dyn TracingStore>,
84 event_bus: EventBus,
85}
86
87pub async fn start(
92 addr: &str,
93 store: Arc<dyn TracingStore>,
94 event_bus: EventBus,
95 static_dir: Option<&str>,
96) -> Result<(), Box<dyn std::error::Error>> {
97 let state = AppState { store, event_bus };
98
99 let api_routes = Router::new()
100 .route("/api/traces", get(list_traces).delete(clear_traces))
101 .route("/api/traces/:trace_id", get(get_trace))
102 .route("/ws", get(ws_handler))
103 .with_state(state);
104
105 let app = if let Some(dir) = static_dir {
106 let dir_owned = dir.to_string();
107 let serve_dir = tower_http::services::ServeDir::new(&dir_owned)
108 .not_found_service(tower_http::services::ServeDir::new(&dir_owned)
109 .fallback(axum::routing::any(move || {
110 let d = dir_owned.clone();
111 async move {
112 match tokio::fs::read_to_string(format!("{}/index.html", d)).await {
113 Ok(html) => axum::response::Html(html).into_response(),
114 Err(_) => axum::http::StatusCode::NOT_FOUND.into_response(),
115 }
116 }
117 })));
118 api_routes.merge(Router::new().fallback_service(serve_dir))
119 } else {
120 api_routes
121 };
122
123 let app = app.layer(CorsLayer::permissive());
124
125 let listener = tokio::net::TcpListener::bind(addr).await?;
126 println!("Tracing UI available at http://{}", addr);
127
128 axum::serve(listener, app).await?;
129 Ok(())
130}