1use std::path::PathBuf;
2use std::sync::Arc;
3
4use anyhow::Result;
5use axum::{
6 extract::State,
7 http::StatusCode,
8 response::IntoResponse,
9 routing::{get, post},
10 Json, Router,
11};
12use lora_database::{ExecuteOptions, QueryRunner, ResultFormat, SnapshotAdmin};
13use serde::{Deserialize, Serialize};
14
15#[derive(Debug, Deserialize)]
16pub struct QueryRequest {
17 pub query: String,
18 #[serde(default)]
19 pub format: Option<QueryFormat>,
20}
21
22#[derive(Debug, Clone, Copy, Deserialize)]
23#[serde(rename_all = "camelCase")]
24pub enum QueryFormat {
25 Rows,
26 RowArrays,
27 Graph,
28 Combined,
29}
30
31impl From<QueryFormat> for ResultFormat {
32 fn from(value: QueryFormat) -> Self {
33 match value {
34 QueryFormat::Rows => ResultFormat::Rows,
35 QueryFormat::RowArrays => ResultFormat::RowArrays,
36 QueryFormat::Graph => ResultFormat::Graph,
37 QueryFormat::Combined => ResultFormat::Combined,
38 }
39 }
40}
41
42#[derive(Debug, Serialize)]
43pub struct ErrorResponse {
44 pub error: String,
45}
46
47#[derive(Debug, Serialize)]
48pub struct HealthResponse {
49 pub status: &'static str,
50}
51
52pub fn build_app<R>(db: Arc<R>) -> Router
53where
54 R: QueryRunner,
55{
56 Router::new()
57 .route("/health", get(health))
58 .route("/query", post(query::<R>))
59 .with_state(db)
60}
61
62pub async fn serve<R>(listener: tokio::net::TcpListener, db: Arc<R>) -> Result<()>
63where
64 R: QueryRunner,
65{
66 let app = build_app(db);
67 axum::serve(listener, app).await?;
68 Ok(())
69}
70
71#[derive(Clone)]
80pub struct AdminConfig {
81 pub snapshot_path: PathBuf,
82 pub admin: Arc<dyn SnapshotAdmin>,
83}
84
85pub fn build_app_with_admin<R>(db: Arc<R>, admin_config: Option<AdminConfig>) -> Router
88where
89 R: QueryRunner,
90{
91 let router = build_app(db);
92 match admin_config {
93 Some(cfg) => router.merge(build_admin_router(cfg)),
94 None => router,
95 }
96}
97
98pub async fn serve_with_admin<R>(
99 listener: tokio::net::TcpListener,
100 db: Arc<R>,
101 admin_config: Option<AdminConfig>,
102) -> Result<()>
103where
104 R: QueryRunner,
105{
106 let app = build_app_with_admin(db, admin_config);
107 axum::serve(listener, app).await?;
108 Ok(())
109}
110
111fn build_admin_router(cfg: AdminConfig) -> Router {
112 Router::new()
113 .route("/admin/snapshot/save", post(admin_snapshot_save))
114 .route("/admin/snapshot/load", post(admin_snapshot_load))
115 .with_state(cfg)
116}
117
118#[derive(Debug, Default, Deserialize)]
129#[serde(default)]
130pub struct SnapshotRequest {
131 pub path: Option<String>,
133}
134
135#[derive(Debug, Serialize)]
136pub struct SnapshotResponse {
137 #[serde(rename = "formatVersion")]
138 pub format_version: u32,
139 #[serde(rename = "nodeCount")]
140 pub node_count: u64,
141 #[serde(rename = "relationshipCount")]
142 pub relationship_count: u64,
143 #[serde(rename = "walLsn")]
144 pub wal_lsn: Option<u64>,
145 pub path: String,
146}
147
148fn resolve_snapshot_path(cfg: &AdminConfig, req: Option<&SnapshotRequest>) -> PathBuf {
151 match req.and_then(|r| r.path.as_deref()) {
152 Some(p) if !p.trim().is_empty() => PathBuf::from(p),
153 _ => cfg.snapshot_path.clone(),
154 }
155}
156
157async fn admin_snapshot_save(
158 State(cfg): State<AdminConfig>,
159 body: Option<Json<SnapshotRequest>>,
160) -> impl IntoResponse {
161 let req = body.map(|Json(r)| r);
162 let path = resolve_snapshot_path(&cfg, req.as_ref());
163
164 match cfg.admin.save_snapshot(&path) {
165 Ok(meta) => (
166 StatusCode::OK,
167 Json(SnapshotResponse {
168 format_version: meta.format_version,
169 node_count: meta.node_count as u64,
170 relationship_count: meta.relationship_count as u64,
171 wal_lsn: meta.wal_lsn,
172 path: path.display().to_string(),
173 }),
174 )
175 .into_response(),
176 Err(err) => (
177 StatusCode::INTERNAL_SERVER_ERROR,
178 Json(ErrorResponse {
179 error: err.to_string(),
180 }),
181 )
182 .into_response(),
183 }
184}
185
186async fn admin_snapshot_load(
187 State(cfg): State<AdminConfig>,
188 body: Option<Json<SnapshotRequest>>,
189) -> impl IntoResponse {
190 let req = body.map(|Json(r)| r);
191 let path = resolve_snapshot_path(&cfg, req.as_ref());
192
193 match cfg.admin.load_snapshot(&path) {
194 Ok(meta) => (
195 StatusCode::OK,
196 Json(SnapshotResponse {
197 format_version: meta.format_version,
198 node_count: meta.node_count as u64,
199 relationship_count: meta.relationship_count as u64,
200 wal_lsn: meta.wal_lsn,
201 path: path.display().to_string(),
202 }),
203 )
204 .into_response(),
205 Err(err) => (
206 StatusCode::INTERNAL_SERVER_ERROR,
207 Json(ErrorResponse {
208 error: err.to_string(),
209 }),
210 )
211 .into_response(),
212 }
213}
214
215async fn health() -> Json<HealthResponse> {
216 Json(HealthResponse { status: "ok" })
217}
218
219async fn query<R>(State(db): State<Arc<R>>, Json(req): Json<QueryRequest>) -> impl IntoResponse
220where
221 R: QueryRunner,
222{
223 let options = req.format.map(|format| ExecuteOptions {
224 format: format.into(),
225 });
226
227 match db.execute(&req.query, options) {
228 Ok(result) => (StatusCode::OK, Json(result)).into_response(),
229 Err(err) => (
230 StatusCode::BAD_REQUEST,
231 Json(ErrorResponse {
232 error: err.to_string(),
233 }),
234 )
235 .into_response(),
236 }
237}