Skip to main content

langgraph_tracing/
server.rs

1use crate::event_bus::EventBus;
2use crate::store::{TraceFilter, TracingStore};
3use crate::types::*;
4use axum::extract::{Path, Query, State};
5use axum::response::IntoResponse;
6use axum::routing::get;
7use axum::{Json, Router};
8use futures::{SinkExt, StreamExt};
9use serde::Deserialize;
10use std::sync::Arc;
11use tower_http::cors::CorsLayer;
12
13/// Query parameters for listing traces
14#[derive(Debug, Deserialize)]
15struct ListTracesQuery {
16    status: Option<String>,
17    name: Option<String>,
18    limit: Option<usize>,
19    offset: Option<usize>,
20}
21
22/// GET /api/traces
23async fn list_traces(
24    State(state): State<AppState>,
25    Query(params): Query<ListTracesQuery>,
26) -> Json<Vec<TraceSummary>> {
27    let filter = TraceFilter {
28        status: params.status.and_then(|s| match s.as_str() {
29            "running" => Some(TraceStatus::Running),
30            "success" => Some(TraceStatus::Success),
31            "error" => Some(TraceStatus::Error),
32            "interrupted" => Some(TraceStatus::Interrupted),
33            _ => None,
34        }),
35        name_contains: params.name,
36        limit: params.limit,
37        offset: params.offset,
38    };
39    Json(state.store.list_traces(&filter))
40}
41
42/// GET /api/traces/:id
43async fn get_trace(
44    State(state): State<AppState>,
45    Path(trace_id): Path<String>,
46) -> Result<Json<TraceDetail>, axum::http::StatusCode> {
47    state
48        .store
49        .get_trace(&trace_id)
50        .map(Json)
51        .ok_or(axum::http::StatusCode::NOT_FOUND)
52}
53
54/// DELETE /api/traces
55async fn clear_traces(State(state): State<AppState>) {
56    state.store.clear();
57}
58
59/// WebSocket handler for real-time events
60async fn ws_handler(
61    ws: axum::extract::WebSocketUpgrade,
62    State(state): State<AppState>,
63) -> axum::response::Response {
64    ws.on_upgrade(|socket| handle_ws(socket, state))
65}
66
67async fn handle_ws(socket: axum::extract::ws::WebSocket, state: AppState) {
68    let (mut sender, _receiver) = socket.split();
69    let mut rx = state.event_bus.subscribe();
70
71    while let Ok(event) = rx.recv().await {
72        if let Ok(json) = serde_json::to_string(&event) {
73            let msg: axum::extract::ws::Message = axum::extract::ws::Message::Text(json);
74            if sender.send(msg).await.is_err() {
75                break;
76            }
77        }
78    }
79}
80
81#[derive(Clone)]
82struct AppState {
83    store: Arc<dyn TracingStore>,
84    event_bus: EventBus,
85}
86
87/// Start the tracing web server on the given address.
88///
89/// `static_dir` is the path to the built frontend assets (e.g., "crates/langgraph-tracing/frontend/dist").
90/// If the directory doesn't exist, only the API and WebSocket endpoints will be available.
91pub async fn start(
92    addr: &str,
93    store: Arc<dyn TracingStore>,
94    event_bus: EventBus,
95    static_dir: Option<&str>,
96) -> Result<(), Box<dyn std::error::Error>> {
97    let state = AppState { store, event_bus };
98
99    let api_routes = Router::new()
100        .route("/api/traces", get(list_traces).delete(clear_traces))
101        .route("/api/traces/:trace_id", get(get_trace))
102        .route("/ws", get(ws_handler))
103        .with_state(state);
104
105    let app = if let Some(dir) = static_dir {
106        let dir_owned = dir.to_string();
107        let serve_dir = tower_http::services::ServeDir::new(&dir_owned)
108            .not_found_service(tower_http::services::ServeDir::new(&dir_owned)
109                .fallback(axum::routing::any(move || {
110                    let d = dir_owned.clone();
111                    async move {
112                        match tokio::fs::read_to_string(format!("{}/index.html", d)).await {
113                            Ok(html) => axum::response::Html(html).into_response(),
114                            Err(_) => axum::http::StatusCode::NOT_FOUND.into_response(),
115                        }
116                    }
117                })));
118        api_routes.merge(Router::new().fallback_service(serve_dir))
119    } else {
120        api_routes
121    };
122
123    let app = app.layer(CorsLayer::permissive());
124
125    let listener = tokio::net::TcpListener::bind(addr).await?;
126    println!("Tracing UI available at http://{}", addr);
127
128    axum::serve(listener, app).await?;
129    Ok(())
130}