1use std::path::PathBuf;
2use std::sync::Arc;
3
4use anyhow::Result;
5use axum::{
6 extract::State,
7 http::StatusCode,
8 response::IntoResponse,
9 routing::{get, post},
10 Json, Router,
11};
12use lora_database::{ExecuteOptions, QueryRunner, ResultFormat, SnapshotAdmin, WalAdmin};
13use serde::{Deserialize, Serialize};
14
15#[derive(Debug, Deserialize)]
16pub struct QueryRequest {
17 pub query: String,
18 #[serde(default)]
19 pub format: Option<QueryFormat>,
20}
21
22#[derive(Debug, Clone, Copy, Deserialize)]
23#[serde(rename_all = "camelCase")]
24pub enum QueryFormat {
25 Rows,
26 RowArrays,
27 Graph,
28 Combined,
29}
30
31impl From<QueryFormat> for ResultFormat {
32 fn from(value: QueryFormat) -> Self {
33 match value {
34 QueryFormat::Rows => ResultFormat::Rows,
35 QueryFormat::RowArrays => ResultFormat::RowArrays,
36 QueryFormat::Graph => ResultFormat::Graph,
37 QueryFormat::Combined => ResultFormat::Combined,
38 }
39 }
40}
41
42#[derive(Debug, Serialize)]
43pub struct ErrorResponse {
44 pub error: String,
45}
46
47#[derive(Debug, Serialize)]
48pub struct HealthResponse {
49 pub status: &'static str,
50}
51
52pub fn build_app<R>(db: Arc<R>) -> Router
53where
54 R: QueryRunner,
55{
56 Router::new()
57 .route("/health", get(health))
58 .route("/query", post(query::<R>))
59 .with_state(db)
60}
61
62pub async fn serve<R>(listener: tokio::net::TcpListener, db: Arc<R>) -> Result<()>
63where
64 R: QueryRunner,
65{
66 let app = build_app(db);
67 axum::serve(listener, app).await?;
68 Ok(())
69}
70
71#[derive(Clone)]
78pub struct SnapshotAdminConfig {
79 pub path: PathBuf,
80 pub admin: Arc<dyn SnapshotAdmin>,
81}
82
83#[derive(Clone, Default)]
100pub struct AdminConfig {
101 pub snapshot: Option<SnapshotAdminConfig>,
104 pub wal: Option<Arc<dyn WalAdmin>>,
107}
108
109impl AdminConfig {
110 pub fn snapshot_only(snapshot_path: PathBuf, admin: Arc<dyn SnapshotAdmin>) -> Self {
112 Self {
113 snapshot: Some(SnapshotAdminConfig {
114 path: snapshot_path,
115 admin,
116 }),
117 wal: None,
118 }
119 }
120
121 pub fn wal_only(wal: Arc<dyn WalAdmin>) -> Self {
126 Self {
127 snapshot: None,
128 wal: Some(wal),
129 }
130 }
131
132 pub fn is_empty(&self) -> bool {
135 self.snapshot.is_none() && self.wal.is_none()
136 }
137}
138
139pub fn build_app_with_admin<R>(db: Arc<R>, admin_config: Option<AdminConfig>) -> Router
142where
143 R: QueryRunner,
144{
145 let router = build_app(db);
146 match admin_config {
147 Some(cfg) => router.merge(build_admin_router(cfg)),
148 None => router,
149 }
150}
151
152pub async fn serve_with_admin<R>(
153 listener: tokio::net::TcpListener,
154 db: Arc<R>,
155 admin_config: Option<AdminConfig>,
156) -> Result<()>
157where
158 R: QueryRunner,
159{
160 let app = build_app_with_admin(db, admin_config);
161 axum::serve(listener, app).await?;
162 Ok(())
163}
164
165fn build_admin_router(cfg: AdminConfig) -> Router {
166 let mut router = Router::new();
167
168 if let Some(snap) = cfg.snapshot.clone() {
169 let snapshot_router: Router = Router::new()
170 .route("/admin/snapshot/save", post(admin_snapshot_save))
171 .route("/admin/snapshot/load", post(admin_snapshot_load))
172 .with_state(snap);
173 router = router.merge(snapshot_router);
174 }
175
176 if let Some(wal) = cfg.wal.clone() {
177 let wal_state = WalAdminState {
178 default_checkpoint_path: cfg.snapshot.as_ref().map(|s| s.path.clone()),
184 wal,
185 };
186 let wal_router: Router = Router::new()
187 .route("/admin/checkpoint", post(admin_checkpoint))
188 .route("/admin/wal/status", post(admin_wal_status))
189 .route("/admin/wal/truncate", post(admin_wal_truncate))
190 .with_state(wal_state);
191 router = router.merge(wal_router);
192 }
193
194 router
195}
196
197#[derive(Clone)]
199struct WalAdminState {
200 default_checkpoint_path: Option<PathBuf>,
205 wal: Arc<dyn WalAdmin>,
206}
207
208#[derive(Debug, Default, Deserialize)]
219#[serde(default)]
220pub struct SnapshotRequest {
221 pub path: Option<String>,
223}
224
225#[derive(Debug, Serialize)]
226pub struct SnapshotResponse {
227 #[serde(rename = "formatVersion")]
228 pub format_version: u32,
229 #[serde(rename = "nodeCount")]
230 pub node_count: u64,
231 #[serde(rename = "relationshipCount")]
232 pub relationship_count: u64,
233 #[serde(rename = "walLsn")]
234 pub wal_lsn: Option<u64>,
235 pub path: String,
236}
237
238fn resolve_snapshot_path(cfg: &SnapshotAdminConfig, req: Option<&SnapshotRequest>) -> PathBuf {
241 match req.and_then(|r| r.path.as_deref()) {
242 Some(p) if !p.trim().is_empty() => PathBuf::from(p),
243 _ => cfg.path.clone(),
244 }
245}
246
247async fn admin_snapshot_save(
248 State(cfg): State<SnapshotAdminConfig>,
249 body: Option<Json<SnapshotRequest>>,
250) -> impl IntoResponse {
251 let req = body.map(|Json(r)| r);
252 let path = resolve_snapshot_path(&cfg, req.as_ref());
253
254 match cfg.admin.save_snapshot(&path) {
255 Ok(meta) => (
256 StatusCode::OK,
257 Json(SnapshotResponse {
258 format_version: meta.format_version,
259 node_count: meta.node_count as u64,
260 relationship_count: meta.relationship_count as u64,
261 wal_lsn: meta.wal_lsn,
262 path: path.display().to_string(),
263 }),
264 )
265 .into_response(),
266 Err(err) => (
267 StatusCode::INTERNAL_SERVER_ERROR,
268 Json(ErrorResponse {
269 error: err.to_string(),
270 }),
271 )
272 .into_response(),
273 }
274}
275
276async fn admin_snapshot_load(
277 State(cfg): State<SnapshotAdminConfig>,
278 body: Option<Json<SnapshotRequest>>,
279) -> impl IntoResponse {
280 let req = body.map(|Json(r)| r);
281 let path = resolve_snapshot_path(&cfg, req.as_ref());
282
283 match cfg.admin.load_snapshot(&path) {
284 Ok(meta) => (
285 StatusCode::OK,
286 Json(SnapshotResponse {
287 format_version: meta.format_version,
288 node_count: meta.node_count as u64,
289 relationship_count: meta.relationship_count as u64,
290 wal_lsn: meta.wal_lsn,
291 path: path.display().to_string(),
292 }),
293 )
294 .into_response(),
295 Err(err) => (
296 StatusCode::INTERNAL_SERVER_ERROR,
297 Json(ErrorResponse {
298 error: err.to_string(),
299 }),
300 )
301 .into_response(),
302 }
303}
304
305#[derive(Debug, Default, Deserialize)]
314#[serde(default)]
315pub struct WalTruncateRequest {
316 #[serde(rename = "fenceLsn")]
317 pub fence_lsn: Option<u64>,
318}
319
320#[derive(Debug, Serialize)]
321pub struct WalStatusResponse {
322 #[serde(rename = "durableLsn")]
323 pub durable_lsn: u64,
324 #[serde(rename = "nextLsn")]
325 pub next_lsn: u64,
326 #[serde(rename = "activeSegmentId")]
327 pub active_segment_id: u64,
328 #[serde(rename = "oldestSegmentId")]
329 pub oldest_segment_id: u64,
330 #[serde(rename = "bgFailure")]
333 pub bg_failure: Option<String>,
334}
335
336fn resolve_checkpoint_path(
337 state: &WalAdminState,
338 req: Option<&SnapshotRequest>,
339) -> Result<PathBuf, &'static str> {
340 match req.and_then(|r| r.path.as_deref()) {
341 Some(p) if !p.trim().is_empty() => Ok(PathBuf::from(p)),
342 _ => state
343 .default_checkpoint_path
344 .clone()
345 .ok_or("no checkpoint path: pass `path` in the request body or start the server with --snapshot-path"),
346 }
347}
348
349async fn admin_checkpoint(
350 State(state): State<WalAdminState>,
351 body: Option<Json<SnapshotRequest>>,
352) -> impl IntoResponse {
353 let req = body.map(|Json(r)| r);
354 let path = match resolve_checkpoint_path(&state, req.as_ref()) {
355 Ok(p) => p,
356 Err(msg) => {
357 return (
358 StatusCode::BAD_REQUEST,
359 Json(ErrorResponse {
360 error: msg.to_string(),
361 }),
362 )
363 .into_response()
364 }
365 };
366
367 match state.wal.checkpoint(&path) {
368 Ok(meta) => (
369 StatusCode::OK,
370 Json(SnapshotResponse {
371 format_version: meta.format_version,
372 node_count: meta.node_count as u64,
373 relationship_count: meta.relationship_count as u64,
374 wal_lsn: meta.wal_lsn,
375 path: path.display().to_string(),
376 }),
377 )
378 .into_response(),
379 Err(err) => (
380 StatusCode::INTERNAL_SERVER_ERROR,
381 Json(ErrorResponse {
382 error: err.to_string(),
383 }),
384 )
385 .into_response(),
386 }
387}
388
389async fn admin_wal_status(State(state): State<WalAdminState>) -> impl IntoResponse {
390 match state.wal.wal_status() {
391 Ok(s) => (
392 StatusCode::OK,
393 Json(WalStatusResponse {
394 durable_lsn: s.durable_lsn,
395 next_lsn: s.next_lsn,
396 active_segment_id: s.active_segment_id,
397 oldest_segment_id: s.oldest_segment_id,
398 bg_failure: s.bg_failure,
399 }),
400 )
401 .into_response(),
402 Err(err) => (
403 StatusCode::INTERNAL_SERVER_ERROR,
404 Json(ErrorResponse {
405 error: err.to_string(),
406 }),
407 )
408 .into_response(),
409 }
410}
411
412async fn admin_wal_truncate(
413 State(state): State<WalAdminState>,
414 body: Option<Json<WalTruncateRequest>>,
415) -> impl IntoResponse {
416 let fence = match body.and_then(|Json(r)| r.fence_lsn) {
419 Some(lsn) => lsn,
420 None => match state.wal.wal_status() {
421 Ok(s) => s.durable_lsn,
422 Err(err) => {
423 return (
424 StatusCode::INTERNAL_SERVER_ERROR,
425 Json(ErrorResponse {
426 error: err.to_string(),
427 }),
428 )
429 .into_response()
430 }
431 },
432 };
433
434 match state.wal.wal_truncate(fence) {
435 Ok(()) => StatusCode::NO_CONTENT.into_response(),
436 Err(err) => (
437 StatusCode::INTERNAL_SERVER_ERROR,
438 Json(ErrorResponse {
439 error: err.to_string(),
440 }),
441 )
442 .into_response(),
443 }
444}
445
446async fn health() -> Json<HealthResponse> {
447 Json(HealthResponse { status: "ok" })
448}
449
450async fn query<R>(State(db): State<Arc<R>>, Json(req): Json<QueryRequest>) -> impl IntoResponse
451where
452 R: QueryRunner,
453{
454 let options = req.format.map(|format| ExecuteOptions {
455 format: format.into(),
456 });
457
458 match db.execute(&req.query, options) {
459 Ok(result) => (StatusCode::OK, Json(result)).into_response(),
460 Err(err) => (
461 StatusCode::BAD_REQUEST,
462 Json(ErrorResponse {
463 error: err.to_string(),
464 }),
465 )
466 .into_response(),
467 }
468}