Skip to main content

lora_server/
app.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3
4use anyhow::Result;
5use axum::{
6    extract::State,
7    http::StatusCode,
8    response::IntoResponse,
9    routing::{get, post},
10    Json, Router,
11};
12use lora_database::{ExecuteOptions, QueryRunner, ResultFormat, SnapshotAdmin};
13use serde::{Deserialize, Serialize};
14
15#[derive(Debug, Deserialize)]
16pub struct QueryRequest {
17    pub query: String,
18    #[serde(default)]
19    pub format: Option<QueryFormat>,
20}
21
22#[derive(Debug, Clone, Copy, Deserialize)]
23#[serde(rename_all = "camelCase")]
24pub enum QueryFormat {
25    Rows,
26    RowArrays,
27    Graph,
28    Combined,
29}
30
31impl From<QueryFormat> for ResultFormat {
32    fn from(value: QueryFormat) -> Self {
33        match value {
34            QueryFormat::Rows => ResultFormat::Rows,
35            QueryFormat::RowArrays => ResultFormat::RowArrays,
36            QueryFormat::Graph => ResultFormat::Graph,
37            QueryFormat::Combined => ResultFormat::Combined,
38        }
39    }
40}
41
42#[derive(Debug, Serialize)]
43pub struct ErrorResponse {
44    pub error: String,
45}
46
47#[derive(Debug, Serialize)]
48pub struct HealthResponse {
49    pub status: &'static str,
50}
51
52pub fn build_app<R>(db: Arc<R>) -> Router
53where
54    R: QueryRunner,
55{
56    Router::new()
57        .route("/health", get(health))
58        .route("/query", post(query::<R>))
59        .with_state(db)
60}
61
62pub async fn serve<R>(listener: tokio::net::TcpListener, db: Arc<R>) -> Result<()>
63where
64    R: QueryRunner,
65{
66    let app = build_app(db);
67    axum::serve(listener, app).await?;
68    Ok(())
69}
70
71/// Configuration for the admin surface. When present, `build_app_with_admin`
72/// (or `serve_with_admin`) mounts `POST /admin/snapshot/save` and
73/// `POST /admin/snapshot/load` that drive the configured admin handle.
74///
75/// The endpoints are intentionally opt-in: exposing them without
76/// authentication on a network-reachable interface is a footgun, so the
77/// caller must explicitly construct an `AdminConfig` and pass it to the
78/// server — there is no implicit default path.
79#[derive(Clone)]
80pub struct AdminConfig {
81    pub snapshot_path: PathBuf,
82    pub admin: Arc<dyn SnapshotAdmin>,
83}
84
85/// Same as [`build_app`] but additionally mounts the admin routes when
86/// `admin_config` is `Some`.
87pub fn build_app_with_admin<R>(db: Arc<R>, admin_config: Option<AdminConfig>) -> Router
88where
89    R: QueryRunner,
90{
91    let router = build_app(db);
92    match admin_config {
93        Some(cfg) => router.merge(build_admin_router(cfg)),
94        None => router,
95    }
96}
97
98pub async fn serve_with_admin<R>(
99    listener: tokio::net::TcpListener,
100    db: Arc<R>,
101    admin_config: Option<AdminConfig>,
102) -> Result<()>
103where
104    R: QueryRunner,
105{
106    let app = build_app_with_admin(db, admin_config);
107    axum::serve(listener, app).await?;
108    Ok(())
109}
110
111fn build_admin_router(cfg: AdminConfig) -> Router {
112    Router::new()
113        .route("/admin/snapshot/save", post(admin_snapshot_save))
114        .route("/admin/snapshot/load", post(admin_snapshot_load))
115        .with_state(cfg)
116}
117
118/// Request body for `POST /admin/snapshot/{save,load}`. The body is
119/// optional; when it is absent (or an empty JSON object) the server uses
120/// the path configured in `AdminConfig`.
121///
122/// Supplying a `path` override lets an operator snapshot to / restore from
123/// an arbitrary filesystem location in a single request. **Any client that
124/// can reach the admin surface can write to any path the server process
125/// can write to — deploy the admin surface behind authenticated transport
126/// only.** We deliberately do not sandbox the path here; a well-meaning
127/// whitelist would give a false sense of safety without auth.
128#[derive(Debug, Default, Deserialize)]
129#[serde(default)]
130pub struct SnapshotRequest {
131    /// Override the configured snapshot path for this request only.
132    pub path: Option<String>,
133}
134
135#[derive(Debug, Serialize)]
136pub struct SnapshotResponse {
137    #[serde(rename = "formatVersion")]
138    pub format_version: u32,
139    #[serde(rename = "nodeCount")]
140    pub node_count: u64,
141    #[serde(rename = "relationshipCount")]
142    pub relationship_count: u64,
143    #[serde(rename = "walLsn")]
144    pub wal_lsn: Option<u64>,
145    pub path: String,
146}
147
148/// Extract the target path for a snapshot operation: the request-body
149/// override if present, else the configured default.
150fn resolve_snapshot_path(cfg: &AdminConfig, req: Option<&SnapshotRequest>) -> PathBuf {
151    match req.and_then(|r| r.path.as_deref()) {
152        Some(p) if !p.trim().is_empty() => PathBuf::from(p),
153        _ => cfg.snapshot_path.clone(),
154    }
155}
156
157async fn admin_snapshot_save(
158    State(cfg): State<AdminConfig>,
159    body: Option<Json<SnapshotRequest>>,
160) -> impl IntoResponse {
161    let req = body.map(|Json(r)| r);
162    let path = resolve_snapshot_path(&cfg, req.as_ref());
163
164    match cfg.admin.save_snapshot(&path) {
165        Ok(meta) => (
166            StatusCode::OK,
167            Json(SnapshotResponse {
168                format_version: meta.format_version,
169                node_count: meta.node_count as u64,
170                relationship_count: meta.relationship_count as u64,
171                wal_lsn: meta.wal_lsn,
172                path: path.display().to_string(),
173            }),
174        )
175            .into_response(),
176        Err(err) => (
177            StatusCode::INTERNAL_SERVER_ERROR,
178            Json(ErrorResponse {
179                error: err.to_string(),
180            }),
181        )
182            .into_response(),
183    }
184}
185
186async fn admin_snapshot_load(
187    State(cfg): State<AdminConfig>,
188    body: Option<Json<SnapshotRequest>>,
189) -> impl IntoResponse {
190    let req = body.map(|Json(r)| r);
191    let path = resolve_snapshot_path(&cfg, req.as_ref());
192
193    match cfg.admin.load_snapshot(&path) {
194        Ok(meta) => (
195            StatusCode::OK,
196            Json(SnapshotResponse {
197                format_version: meta.format_version,
198                node_count: meta.node_count as u64,
199                relationship_count: meta.relationship_count as u64,
200                wal_lsn: meta.wal_lsn,
201                path: path.display().to_string(),
202            }),
203        )
204            .into_response(),
205        Err(err) => (
206            StatusCode::INTERNAL_SERVER_ERROR,
207            Json(ErrorResponse {
208                error: err.to_string(),
209            }),
210        )
211            .into_response(),
212    }
213}
214
215async fn health() -> Json<HealthResponse> {
216    Json(HealthResponse { status: "ok" })
217}
218
219async fn query<R>(State(db): State<Arc<R>>, Json(req): Json<QueryRequest>) -> impl IntoResponse
220where
221    R: QueryRunner,
222{
223    let options = req.format.map(|format| ExecuteOptions {
224        format: format.into(),
225    });
226
227    match db.execute(&req.query, options) {
228        Ok(result) => (StatusCode::OK, Json(result)).into_response(),
229        Err(err) => (
230            StatusCode::BAD_REQUEST,
231            Json(ErrorResponse {
232                error: err.to_string(),
233            }),
234        )
235            .into_response(),
236    }
237}