Skip to main content

aeo_graph_explorer/
app.rs

1//! Axum HTTP layer.
2
3use std::sync::Arc;
4
5use axum::extract::{Path, Query, State};
6use axum::http::StatusCode;
7use axum::response::{IntoResponse, Response};
8use axum::routing::{get, post};
9use axum::{Json, Router};
10use serde::Deserialize;
11use tokio::sync::RwLock;
12
13use crate::error::GraphError;
14use crate::graph::AeoGraph;
15use crate::model::{AeoEntity, AeoNode};
16use crate::query::{find_by_claim, neighbors, shortest_path, ClaimMatch, NeighborView, PathResult};
17
18/// Shared app state — a single graph protected by a `RwLock` so `/ingest`
19/// can replace it atomically without blocking concurrent reads.
20///
21/// When built with the `audit-stream` feature (default), the state also
22/// holds a `reqwest::Client` reused for governance-event emission. The
23/// client itself is cheap; what matters is reusing connections so a
24/// burst of ingests doesn't open one TCP socket per emit.
25#[derive(Clone)]
26pub struct AppState {
27    /// The graph itself.
28    pub graph: Arc<RwLock<AeoGraph>>,
29    /// Shared HTTP client for `audit_stream::emit`. Always present when
30    /// the feature is on, even if `AUDIT_STREAM_URL` is unset (in which
31    /// case `emit` no-ops without using it).
32    #[cfg(feature = "audit-stream")]
33    pub audit_client: reqwest::Client,
34}
35
36impl AppState {
37    /// Construct fresh empty state with a default audit-stream client.
38    pub fn new() -> Self {
39        Self {
40            graph: Arc::new(RwLock::new(AeoGraph::default())),
41            #[cfg(feature = "audit-stream")]
42            audit_client: reqwest::Client::new(),
43        }
44    }
45
46    /// Build state with a caller-provided audit-stream client. Tests use
47    /// this to swap in a wiremock-backed client.
48    #[cfg(feature = "audit-stream")]
49    #[must_use]
50    pub fn with_audit_client(audit_client: reqwest::Client) -> Self {
51        Self {
52            graph: Arc::new(RwLock::new(AeoGraph::default())),
53            audit_client,
54        }
55    }
56}
57
58impl Default for AppState {
59    fn default() -> Self {
60        Self::new()
61    }
62}
63
64/// Build the `axum` router. Tests use this directly with `tower::ServiceExt`.
65pub fn build_router(state: AppState) -> Router {
66    Router::new()
67        .route("/", get(root))
68        .route("/healthz", get(healthz))
69        .route("/nodes", get(list_nodes))
70        .route("/nodes/:id", get(get_node))
71        .route("/nodes/:id/neighbors", get(get_neighbors))
72        .route("/shortest-path", get(get_shortest_path))
73        .route("/find-by-claim", get(get_find_by_claim))
74        .route("/ingest", post(post_ingest))
75        .route("/stats", get(get_stats))
76        .with_state(state)
77}
78
79// ---------------------------------------------------------------------------
80// Handlers
81// ---------------------------------------------------------------------------
82
83async fn root() -> Json<serde_json::Value> {
84    Json(serde_json::json!({
85        "name": "aeo-graph-explorer",
86        "version": env!("CARGO_PKG_VERSION"),
87        "description": "HTTP graph-query service over AEO Protocol crawls. Layer 5 of the AEO Reference Stack.",
88        "endpoints": {
89            "GET  /healthz": "liveness probe",
90            "GET  /nodes": "list every entity in the graph (summary)",
91            "GET  /nodes/{id}": "fetch one entity's full AEO body",
92            "GET  /nodes/{id}/neighbors": "outbound + inbound neighbours",
93            "GET  /shortest-path?from=&to=": "A* over the graph",
94            "GET  /find-by-claim?predicate=&value=": "linear claim search",
95            "POST /ingest": "load JSONL (one AEO doc per line) and rebuild",
96            "GET  /stats": "node_count + edge_count"
97        }
98    }))
99}
100
101async fn healthz() -> Json<serde_json::Value> {
102    Json(serde_json::json!({"status": "ok"}))
103}
104
105async fn list_nodes(State(state): State<AppState>) -> Json<Vec<AeoEntity>> {
106    let graph = state.graph.read().await;
107    Json(graph.nodes().map(|n| n.entity.clone()).collect())
108}
109
110async fn get_node(
111    State(state): State<AppState>,
112    Path(id): Path<String>,
113) -> Result<Json<AeoNode>, GraphError> {
114    let graph = state.graph.read().await;
115    let node = graph
116        .node(&id)
117        .cloned()
118        .ok_or_else(|| GraphError::NotFound(id.clone()))?;
119    Ok(Json(node))
120}
121
122async fn get_neighbors(
123    State(state): State<AppState>,
124    Path(id): Path<String>,
125) -> Result<Json<NeighborView>, GraphError> {
126    let graph = state.graph.read().await;
127    Ok(Json(neighbors(&graph, &id)?))
128}
129
130#[derive(Deserialize)]
131struct PathParams {
132    from: String,
133    to: String,
134}
135
136async fn get_shortest_path(
137    State(state): State<AppState>,
138    Query(params): Query<PathParams>,
139) -> Result<Json<PathResult>, GraphError> {
140    let graph = state.graph.read().await;
141    Ok(Json(shortest_path(&graph, &params.from, &params.to)?))
142}
143
144#[derive(Deserialize)]
145struct ClaimParams {
146    predicate: Option<String>,
147    value: Option<String>,
148}
149
150async fn get_find_by_claim(
151    State(state): State<AppState>,
152    Query(params): Query<ClaimParams>,
153) -> Result<Json<Vec<ClaimMatch>>, GraphError> {
154    let graph = state.graph.read().await;
155    Ok(Json(find_by_claim(
156        &graph,
157        params.predicate.as_deref(),
158        params.value.as_deref(),
159    )?))
160}
161
162async fn post_ingest(
163    State(state): State<AppState>,
164    body: String,
165) -> Result<Json<serde_json::Value>, GraphError> {
166    let new_graph = match AeoGraph::from_jsonl(&body) {
167        Ok(g) => g,
168        Err(err) => {
169            #[cfg(feature = "audit-stream")]
170            crate::audit_stream::emit(
171                &state.audit_client,
172                "graph_ingest_failed",
173                serde_json::json!({
174                    "reason": err.to_string(),
175                    "input_bytes": body.len(),
176                }),
177            )
178            .await;
179            return Err(err);
180        }
181    };
182    let nodes = new_graph.node_count();
183    let edges = new_graph.edge_count();
184    *state.graph.write().await = new_graph;
185    #[cfg(feature = "audit-stream")]
186    crate::audit_stream::emit(
187        &state.audit_client,
188        "graph_ingested",
189        serde_json::json!({
190            "nodes": nodes,
191            "edges": edges,
192            "input_bytes": body.len(),
193        }),
194    )
195    .await;
196    Ok(Json(serde_json::json!({
197        "status": "ok",
198        "nodes": nodes,
199        "edges": edges,
200    })))
201}
202
203async fn get_stats(State(state): State<AppState>) -> Json<serde_json::Value> {
204    let graph = state.graph.read().await;
205    Json(serde_json::json!({
206        "nodes": graph.node_count(),
207        "edges": graph.edge_count(),
208    }))
209}
210
211// ---------------------------------------------------------------------------
212// GraphError -> HTTP response
213// ---------------------------------------------------------------------------
214
215impl IntoResponse for GraphError {
216    fn into_response(self) -> Response {
217        let (status, message) = match &self {
218            GraphError::NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
219            GraphError::JsonLine { .. } | GraphError::UnknownNode(_) | GraphError::EmptyQuery => {
220                (StatusCode::BAD_REQUEST, self.to_string())
221            }
222        };
223        (status, Json(serde_json::json!({"error": message}))).into_response()
224    }
225}