1use std::sync::Arc;
2
3use axum::extract::{Path, State};
4use axum::http::StatusCode;
5use axum::response::IntoResponse;
6use axum::routing::{get, post};
7use axum::{Json, Router};
8use dkdc_db_core::DkdcDb;
9use serde::{Deserialize, Serialize};
10
11type AppState = Arc<DkdcDb>;
12
13pub fn router(state: AppState) -> Router {
14 Router::new()
15 .route("/execute", post(execute))
16 .route("/query", post(query))
17 .route("/query/libsql", post(query_libsql))
18 .route("/tables", get(list_tables))
19 .route("/schema/{table}", get(table_schema))
20 .route("/health", get(health))
21 .with_state(state)
22}
23
24#[derive(Deserialize)]
25struct SqlRequest {
26 sql: String,
27}
28
29#[derive(Serialize)]
30struct ExecuteResponse {
31 affected: u64,
32}
33
34#[derive(Serialize)]
35struct QueryResponse {
36 columns: Vec<ColumnInfo>,
37 rows: Vec<Vec<serde_json::Value>>,
38}
39
40#[derive(Serialize)]
41struct ColumnInfo {
42 name: String,
43 r#type: String,
44}
45
46#[derive(Serialize)]
47struct ErrorResponse {
48 error: String,
49}
50
51fn error_response(status: StatusCode, msg: impl ToString) -> impl IntoResponse {
52 (
53 status,
54 Json(ErrorResponse {
55 error: msg.to_string(),
56 }),
57 )
58}
59
60fn batches_to_response(batches: &[dkdc_db_core::RecordBatch]) -> QueryResponse {
61 let mut columns = Vec::new();
62 let mut rows = Vec::new();
63
64 if let Some(first) = batches.first() {
65 let schema = first.schema();
66 columns = schema
67 .fields()
68 .iter()
69 .map(|f| ColumnInfo {
70 name: f.name().clone(),
71 r#type: format!("{}", f.data_type()),
72 })
73 .collect();
74 }
75
76 for batch in batches {
77 for row_idx in 0..batch.num_rows() {
78 let mut row = Vec::new();
79 for col_idx in 0..batch.num_columns() {
80 let col = batch.column(col_idx);
81 let value = column_value_to_json(col, row_idx);
82 row.push(value);
83 }
84 rows.push(row);
85 }
86 }
87
88 QueryResponse { columns, rows }
89}
90
91fn column_value_to_json(col: &dyn arrow::array::Array, row: usize) -> serde_json::Value {
92 use arrow::array::*;
93
94 if col.is_null(row) {
95 return serde_json::Value::Null;
96 }
97
98 if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
99 serde_json::Value::Number(arr.value(row).into())
100 } else if let Some(arr) = col.as_any().downcast_ref::<Float64Array>() {
101 serde_json::json!(arr.value(row))
102 } else if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
103 serde_json::Value::String(arr.value(row).to_string())
104 } else if let Some(arr) = col.as_any().downcast_ref::<BinaryArray>() {
105 use base64::Engine;
106 let bytes = arr.value(row);
107 serde_json::Value::String(base64::engine::general_purpose::STANDARD.encode(bytes))
108 } else {
109 serde_json::Value::String(format!("{:?}", col))
111 }
112}
113
114async fn execute(State(db): State<AppState>, Json(req): Json<SqlRequest>) -> impl IntoResponse {
115 match db.execute(&req.sql).await {
116 Ok(affected) => (StatusCode::OK, Json(ExecuteResponse { affected })).into_response(),
117 Err(e) => error_response(StatusCode::BAD_REQUEST, e).into_response(),
118 }
119}
120
121async fn query(State(db): State<AppState>, Json(req): Json<SqlRequest>) -> impl IntoResponse {
122 match db.query(&req.sql).await {
123 Ok(batches) => (StatusCode::OK, Json(batches_to_response(&batches))).into_response(),
124 Err(e) => error_response(StatusCode::BAD_REQUEST, e).into_response(),
125 }
126}
127
128async fn query_libsql(
129 State(db): State<AppState>,
130 Json(req): Json<SqlRequest>,
131) -> impl IntoResponse {
132 match db.query_libsql(&req.sql).await {
133 Ok(batches) => (StatusCode::OK, Json(batches_to_response(&batches))).into_response(),
134 Err(e) => error_response(StatusCode::BAD_REQUEST, e).into_response(),
135 }
136}
137
138async fn list_tables(State(db): State<AppState>) -> impl IntoResponse {
139 match db.list_tables().await {
140 Ok(tables) => (StatusCode::OK, Json(tables)).into_response(),
141 Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, e).into_response(),
142 }
143}
144
145async fn table_schema(State(db): State<AppState>, Path(table): Path<String>) -> impl IntoResponse {
146 let sql = format!(
147 "SELECT name, type FROM pragma_table_info('{}')",
148 table.replace('\'', "''")
149 );
150 match db.query_libsql(&sql).await {
151 Ok(batches) => (StatusCode::OK, Json(batches_to_response(&batches))).into_response(),
152 Err(e) => error_response(StatusCode::BAD_REQUEST, e).into_response(),
153 }
154}
155
156async fn health() -> impl IntoResponse {
157 Json(serde_json::json!({"status": "ok"}))
158}