Skip to main content

lora_server/
app.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3
4use anyhow::Result;
5use axum::{
6    extract::State,
7    http::StatusCode,
8    response::IntoResponse,
9    routing::{get, post},
10    Json, Router,
11};
12use lora_database::{ExecuteOptions, QueryRunner, ResultFormat, SnapshotAdmin, WalAdmin};
13use serde::{Deserialize, Serialize};
14
15#[derive(Debug, Deserialize)]
16pub struct QueryRequest {
17    pub query: String,
18    #[serde(default)]
19    pub format: Option<QueryFormat>,
20}
21
22#[derive(Debug, Clone, Copy, Deserialize)]
23#[serde(rename_all = "camelCase")]
24pub enum QueryFormat {
25    Rows,
26    RowArrays,
27    Graph,
28    Combined,
29}
30
31impl From<QueryFormat> for ResultFormat {
32    fn from(value: QueryFormat) -> Self {
33        match value {
34            QueryFormat::Rows => ResultFormat::Rows,
35            QueryFormat::RowArrays => ResultFormat::RowArrays,
36            QueryFormat::Graph => ResultFormat::Graph,
37            QueryFormat::Combined => ResultFormat::Combined,
38        }
39    }
40}
41
42#[derive(Debug, Serialize)]
43pub struct ErrorResponse {
44    pub error: String,
45}
46
47#[derive(Debug, Serialize)]
48pub struct HealthResponse {
49    pub status: &'static str,
50}
51
52pub fn build_app<R>(db: Arc<R>) -> Router
53where
54    R: QueryRunner,
55{
56    Router::new()
57        .route("/health", get(health))
58        .route("/query", post(query::<R>))
59        .with_state(db)
60}
61
62pub async fn serve<R>(listener: tokio::net::TcpListener, db: Arc<R>) -> Result<()>
63where
64    R: QueryRunner,
65{
66    let app = build_app(db);
67    axum::serve(listener, app).await?;
68    Ok(())
69}
70
71/// Snapshot admin surface. Mounted as a unit so that
72/// `/admin/snapshot/{save,load}` always have a configured default
73/// path: an operator who set `--snapshot-path` is the one paying the
74/// cost of the route's existence, and they reasonably expect the
75/// path to be resolved automatically when no `path` field is sent in
76/// the request body.
77#[derive(Clone)]
78pub struct SnapshotAdminConfig {
79    pub path: PathBuf,
80    pub admin: Arc<dyn SnapshotAdmin>,
81}
82
83/// Configuration for the admin surface. Snapshot and WAL admin are
84/// independent: each set of routes mounts only when its corresponding
85/// field is `Some`.
86///
87/// - `snapshot.is_some()` mounts `POST /admin/snapshot/save` and
88///   `POST /admin/snapshot/load` against the configured path
89///   (the body's optional `path` field overrides per request).
90/// - `wal.is_some()` mounts `POST /admin/wal/status` and
91///   `POST /admin/wal/truncate` unconditionally, plus
92///   `POST /admin/checkpoint` (which uses `snapshot.path` as a default
93///   when present, and otherwise requires `path` in the request body).
94///
95/// The endpoints are intentionally opt-in: exposing them without
96/// authentication on a network-reachable interface is a footgun, so
97/// the caller must explicitly construct an `AdminConfig` and pass it
98/// to the server — there is no implicit default path.
99#[derive(Clone, Default)]
100pub struct AdminConfig {
101    /// Snapshot save/load admin. `None` to disable
102    /// `/admin/snapshot/{save,load}`.
103    pub snapshot: Option<SnapshotAdminConfig>,
104    /// WAL admin. `None` to disable `/admin/wal/*` and
105    /// `/admin/checkpoint`.
106    pub wal: Option<Arc<dyn WalAdmin>>,
107}
108
109impl AdminConfig {
110    /// Construct a snapshot-only admin config (no WAL endpoints).
111    pub fn snapshot_only(snapshot_path: PathBuf, admin: Arc<dyn SnapshotAdmin>) -> Self {
112        Self {
113            snapshot: Some(SnapshotAdminConfig {
114                path: snapshot_path,
115                admin,
116            }),
117            wal: None,
118        }
119    }
120
121    /// Construct a WAL-only admin config (no snapshot endpoints). The
122    /// `/admin/checkpoint` route still mounts but every call needs a
123    /// `path` in the request body since there is no configured
124    /// default.
125    pub fn wal_only(wal: Arc<dyn WalAdmin>) -> Self {
126        Self {
127            snapshot: None,
128            wal: Some(wal),
129        }
130    }
131
132    /// True when neither admin surface is configured. The router
133    /// merge then becomes a no-op and the admin routes don't exist.
134    pub fn is_empty(&self) -> bool {
135        self.snapshot.is_none() && self.wal.is_none()
136    }
137}
138
139/// Same as [`build_app`] but additionally mounts the admin routes when
140/// `admin_config` is `Some`.
141pub fn build_app_with_admin<R>(db: Arc<R>, admin_config: Option<AdminConfig>) -> Router
142where
143    R: QueryRunner,
144{
145    let router = build_app(db);
146    match admin_config {
147        Some(cfg) => router.merge(build_admin_router(cfg)),
148        None => router,
149    }
150}
151
152pub async fn serve_with_admin<R>(
153    listener: tokio::net::TcpListener,
154    db: Arc<R>,
155    admin_config: Option<AdminConfig>,
156) -> Result<()>
157where
158    R: QueryRunner,
159{
160    let app = build_app_with_admin(db, admin_config);
161    axum::serve(listener, app).await?;
162    Ok(())
163}
164
165fn build_admin_router(cfg: AdminConfig) -> Router {
166    let mut router = Router::new();
167
168    if let Some(snap) = cfg.snapshot.clone() {
169        let snapshot_router: Router = Router::new()
170            .route("/admin/snapshot/save", post(admin_snapshot_save))
171            .route("/admin/snapshot/load", post(admin_snapshot_load))
172            .with_state(snap);
173        router = router.merge(snapshot_router);
174    }
175
176    if let Some(wal) = cfg.wal.clone() {
177        let wal_state = WalAdminState {
178            // Reuse the snapshot path as the default checkpoint
179            // target when present so a body-less
180            // `POST /admin/checkpoint` writes to the same file the
181            // snapshot endpoints use. When no snapshot path is
182            // configured, the handler requires `path` in the body.
183            default_checkpoint_path: cfg.snapshot.as_ref().map(|s| s.path.clone()),
184            wal,
185        };
186        let wal_router: Router = Router::new()
187            .route("/admin/checkpoint", post(admin_checkpoint))
188            .route("/admin/wal/status", post(admin_wal_status))
189            .route("/admin/wal/truncate", post(admin_wal_truncate))
190            .with_state(wal_state);
191        router = router.merge(wal_router);
192    }
193
194    router
195}
196
197/// State plumbed into the WAL admin handlers.
198#[derive(Clone)]
199struct WalAdminState {
200    /// Default target for `POST /admin/checkpoint` when the body
201    /// omits `path`. `None` when the operator did not pass
202    /// `--snapshot-path`; in that case the handler returns 400 with
203    /// a hint.
204    default_checkpoint_path: Option<PathBuf>,
205    wal: Arc<dyn WalAdmin>,
206}
207
208/// Request body for `POST /admin/snapshot/{save,load}`. The body is
209/// optional; when it is absent (or an empty JSON object) the server uses
210/// the path configured in `AdminConfig`.
211///
212/// Supplying a `path` override lets an operator snapshot to / restore from
213/// an arbitrary filesystem location in a single request. **Any client that
214/// can reach the admin surface can write to any path the server process
215/// can write to — deploy the admin surface behind authenticated transport
216/// only.** We deliberately do not sandbox the path here; a well-meaning
217/// whitelist would give a false sense of safety without auth.
218#[derive(Debug, Default, Deserialize)]
219#[serde(default)]
220pub struct SnapshotRequest {
221    /// Override the configured snapshot path for this request only.
222    pub path: Option<String>,
223}
224
225#[derive(Debug, Serialize)]
226pub struct SnapshotResponse {
227    #[serde(rename = "formatVersion")]
228    pub format_version: u32,
229    #[serde(rename = "nodeCount")]
230    pub node_count: u64,
231    #[serde(rename = "relationshipCount")]
232    pub relationship_count: u64,
233    #[serde(rename = "walLsn")]
234    pub wal_lsn: Option<u64>,
235    pub path: String,
236}
237
238/// Extract the target path for a snapshot operation: the request-body
239/// override if present, else the configured default.
240fn resolve_snapshot_path(cfg: &SnapshotAdminConfig, req: Option<&SnapshotRequest>) -> PathBuf {
241    match req.and_then(|r| r.path.as_deref()) {
242        Some(p) if !p.trim().is_empty() => PathBuf::from(p),
243        _ => cfg.path.clone(),
244    }
245}
246
247async fn admin_snapshot_save(
248    State(cfg): State<SnapshotAdminConfig>,
249    body: Option<Json<SnapshotRequest>>,
250) -> impl IntoResponse {
251    let req = body.map(|Json(r)| r);
252    let path = resolve_snapshot_path(&cfg, req.as_ref());
253
254    match cfg.admin.save_snapshot(&path) {
255        Ok(meta) => (
256            StatusCode::OK,
257            Json(SnapshotResponse {
258                format_version: meta.format_version,
259                node_count: meta.node_count as u64,
260                relationship_count: meta.relationship_count as u64,
261                wal_lsn: meta.wal_lsn,
262                path: path.display().to_string(),
263            }),
264        )
265            .into_response(),
266        Err(err) => (
267            StatusCode::INTERNAL_SERVER_ERROR,
268            Json(ErrorResponse {
269                error: err.to_string(),
270            }),
271        )
272            .into_response(),
273    }
274}
275
276async fn admin_snapshot_load(
277    State(cfg): State<SnapshotAdminConfig>,
278    body: Option<Json<SnapshotRequest>>,
279) -> impl IntoResponse {
280    let req = body.map(|Json(r)| r);
281    let path = resolve_snapshot_path(&cfg, req.as_ref());
282
283    match cfg.admin.load_snapshot(&path) {
284        Ok(meta) => (
285            StatusCode::OK,
286            Json(SnapshotResponse {
287                format_version: meta.format_version,
288                node_count: meta.node_count as u64,
289                relationship_count: meta.relationship_count as u64,
290                wal_lsn: meta.wal_lsn,
291                path: path.display().to_string(),
292            }),
293        )
294            .into_response(),
295        Err(err) => (
296            StatusCode::INTERNAL_SERVER_ERROR,
297            Json(ErrorResponse {
298                error: err.to_string(),
299            }),
300        )
301            .into_response(),
302    }
303}
304
305// ---------------------------------------------------------------------------
306// WAL admin endpoints (mounted only when `AdminConfig.wal` is `Some`).
307// ---------------------------------------------------------------------------
308
309/// Body for `POST /admin/wal/truncate`. Operators supply the LSN past
310/// which sealed segments may be deleted; the WAL truncates everything
311/// at or below that point. Active and tombstone segments are always
312/// retained.
313#[derive(Debug, Default, Deserialize)]
314#[serde(default)]
315pub struct WalTruncateRequest {
316    #[serde(rename = "fenceLsn")]
317    pub fence_lsn: Option<u64>,
318}
319
320#[derive(Debug, Serialize)]
321pub struct WalStatusResponse {
322    #[serde(rename = "durableLsn")]
323    pub durable_lsn: u64,
324    #[serde(rename = "nextLsn")]
325    pub next_lsn: u64,
326    #[serde(rename = "activeSegmentId")]
327    pub active_segment_id: u64,
328    #[serde(rename = "oldestSegmentId")]
329    pub oldest_segment_id: u64,
330    /// Latched fsync error from the bg flusher (only populated under
331    /// `SyncMode::Group`). `None` when healthy.
332    #[serde(rename = "bgFailure")]
333    pub bg_failure: Option<String>,
334}
335
336fn resolve_checkpoint_path(
337    state: &WalAdminState,
338    req: Option<&SnapshotRequest>,
339) -> Result<PathBuf, &'static str> {
340    match req.and_then(|r| r.path.as_deref()) {
341        Some(p) if !p.trim().is_empty() => Ok(PathBuf::from(p)),
342        _ => state
343            .default_checkpoint_path
344            .clone()
345            .ok_or("no checkpoint path: pass `path` in the request body or start the server with --snapshot-path"),
346    }
347}
348
349async fn admin_checkpoint(
350    State(state): State<WalAdminState>,
351    body: Option<Json<SnapshotRequest>>,
352) -> impl IntoResponse {
353    let req = body.map(|Json(r)| r);
354    let path = match resolve_checkpoint_path(&state, req.as_ref()) {
355        Ok(p) => p,
356        Err(msg) => {
357            return (
358                StatusCode::BAD_REQUEST,
359                Json(ErrorResponse {
360                    error: msg.to_string(),
361                }),
362            )
363                .into_response()
364        }
365    };
366
367    match state.wal.checkpoint(&path) {
368        Ok(meta) => (
369            StatusCode::OK,
370            Json(SnapshotResponse {
371                format_version: meta.format_version,
372                node_count: meta.node_count as u64,
373                relationship_count: meta.relationship_count as u64,
374                wal_lsn: meta.wal_lsn,
375                path: path.display().to_string(),
376            }),
377        )
378            .into_response(),
379        Err(err) => (
380            StatusCode::INTERNAL_SERVER_ERROR,
381            Json(ErrorResponse {
382                error: err.to_string(),
383            }),
384        )
385            .into_response(),
386    }
387}
388
389async fn admin_wal_status(State(state): State<WalAdminState>) -> impl IntoResponse {
390    match state.wal.wal_status() {
391        Ok(s) => (
392            StatusCode::OK,
393            Json(WalStatusResponse {
394                durable_lsn: s.durable_lsn,
395                next_lsn: s.next_lsn,
396                active_segment_id: s.active_segment_id,
397                oldest_segment_id: s.oldest_segment_id,
398                bg_failure: s.bg_failure,
399            }),
400        )
401            .into_response(),
402        Err(err) => (
403            StatusCode::INTERNAL_SERVER_ERROR,
404            Json(ErrorResponse {
405                error: err.to_string(),
406            }),
407        )
408            .into_response(),
409    }
410}
411
412async fn admin_wal_truncate(
413    State(state): State<WalAdminState>,
414    body: Option<Json<WalTruncateRequest>>,
415) -> impl IntoResponse {
416    // No body / no fence => truncate up to the WAL's current durable
417    // LSN. That's the natural "drop everything safe to drop" default.
418    let fence = match body.and_then(|Json(r)| r.fence_lsn) {
419        Some(lsn) => lsn,
420        None => match state.wal.wal_status() {
421            Ok(s) => s.durable_lsn,
422            Err(err) => {
423                return (
424                    StatusCode::INTERNAL_SERVER_ERROR,
425                    Json(ErrorResponse {
426                        error: err.to_string(),
427                    }),
428                )
429                    .into_response()
430            }
431        },
432    };
433
434    match state.wal.wal_truncate(fence) {
435        Ok(()) => StatusCode::NO_CONTENT.into_response(),
436        Err(err) => (
437            StatusCode::INTERNAL_SERVER_ERROR,
438            Json(ErrorResponse {
439                error: err.to_string(),
440            }),
441        )
442            .into_response(),
443    }
444}
445
446async fn health() -> Json<HealthResponse> {
447    Json(HealthResponse { status: "ok" })
448}
449
450async fn query<R>(State(db): State<Arc<R>>, Json(req): Json<QueryRequest>) -> impl IntoResponse
451where
452    R: QueryRunner,
453{
454    let options = req.format.map(|format| ExecuteOptions {
455        format: format.into(),
456    });
457
458    match db.execute(&req.query, options) {
459        Ok(result) => (StatusCode::OK, Json(result)).into_response(),
460        Err(err) => (
461            StatusCode::BAD_REQUEST,
462            Json(ErrorResponse {
463                error: err.to_string(),
464            }),
465        )
466            .into_response(),
467    }
468}