Skip to main content

dkdc_db_server/
api.rs

1use std::sync::Arc;
2
3use axum::extract::{Path, State};
4use axum::http::StatusCode;
5use axum::response::IntoResponse;
6use axum::routing::{get, post};
7use axum::{Json, Router};
8use dkdc_db_core::DkdcDb;
9use serde::{Deserialize, Serialize};
10
11type AppState = Arc<DkdcDb>;
12
13pub fn router(state: AppState) -> Router {
14    Router::new()
15        .route("/execute", post(execute))
16        .route("/query", post(query))
17        .route("/query/libsql", post(query_libsql))
18        .route("/tables", get(list_tables))
19        .route("/schema/{table}", get(table_schema))
20        .route("/health", get(health))
21        .with_state(state)
22}
23
24#[derive(Deserialize)]
25struct SqlRequest {
26    sql: String,
27}
28
29#[derive(Serialize)]
30struct ExecuteResponse {
31    affected: u64,
32}
33
34#[derive(Serialize)]
35struct QueryResponse {
36    columns: Vec<ColumnInfo>,
37    rows: Vec<Vec<serde_json::Value>>,
38}
39
40#[derive(Serialize)]
41struct ColumnInfo {
42    name: String,
43    r#type: String,
44}
45
46#[derive(Serialize)]
47struct ErrorResponse {
48    error: String,
49}
50
51fn error_response(status: StatusCode, msg: impl ToString) -> impl IntoResponse {
52    (
53        status,
54        Json(ErrorResponse {
55            error: msg.to_string(),
56        }),
57    )
58}
59
60fn batches_to_response(batches: &[dkdc_db_core::RecordBatch]) -> QueryResponse {
61    let mut columns = Vec::new();
62    let mut rows = Vec::new();
63
64    if let Some(first) = batches.first() {
65        let schema = first.schema();
66        columns = schema
67            .fields()
68            .iter()
69            .map(|f| ColumnInfo {
70                name: f.name().clone(),
71                r#type: format!("{}", f.data_type()),
72            })
73            .collect();
74    }
75
76    for batch in batches {
77        for row_idx in 0..batch.num_rows() {
78            let mut row = Vec::new();
79            for col_idx in 0..batch.num_columns() {
80                let col = batch.column(col_idx);
81                let value = column_value_to_json(col, row_idx);
82                row.push(value);
83            }
84            rows.push(row);
85        }
86    }
87
88    QueryResponse { columns, rows }
89}
90
91fn column_value_to_json(col: &dyn arrow::array::Array, row: usize) -> serde_json::Value {
92    use arrow::array::*;
93
94    if col.is_null(row) {
95        return serde_json::Value::Null;
96    }
97
98    if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
99        serde_json::Value::Number(arr.value(row).into())
100    } else if let Some(arr) = col.as_any().downcast_ref::<Float64Array>() {
101        serde_json::json!(arr.value(row))
102    } else if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
103        serde_json::Value::String(arr.value(row).to_string())
104    } else if let Some(arr) = col.as_any().downcast_ref::<BinaryArray>() {
105        use base64::Engine;
106        let bytes = arr.value(row);
107        serde_json::Value::String(base64::engine::general_purpose::STANDARD.encode(bytes))
108    } else {
109        // Fallback: try to format as string
110        serde_json::Value::String(format!("{:?}", col))
111    }
112}
113
114async fn execute(State(db): State<AppState>, Json(req): Json<SqlRequest>) -> impl IntoResponse {
115    match db.execute(&req.sql).await {
116        Ok(affected) => (StatusCode::OK, Json(ExecuteResponse { affected })).into_response(),
117        Err(e) => error_response(StatusCode::BAD_REQUEST, e).into_response(),
118    }
119}
120
121async fn query(State(db): State<AppState>, Json(req): Json<SqlRequest>) -> impl IntoResponse {
122    match db.query(&req.sql).await {
123        Ok(batches) => (StatusCode::OK, Json(batches_to_response(&batches))).into_response(),
124        Err(e) => error_response(StatusCode::BAD_REQUEST, e).into_response(),
125    }
126}
127
128async fn query_libsql(
129    State(db): State<AppState>,
130    Json(req): Json<SqlRequest>,
131) -> impl IntoResponse {
132    match db.query_libsql(&req.sql).await {
133        Ok(batches) => (StatusCode::OK, Json(batches_to_response(&batches))).into_response(),
134        Err(e) => error_response(StatusCode::BAD_REQUEST, e).into_response(),
135    }
136}
137
138async fn list_tables(State(db): State<AppState>) -> impl IntoResponse {
139    match db.list_tables().await {
140        Ok(tables) => (StatusCode::OK, Json(tables)).into_response(),
141        Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, e).into_response(),
142    }
143}
144
145async fn table_schema(State(db): State<AppState>, Path(table): Path<String>) -> impl IntoResponse {
146    let sql = format!(
147        "SELECT name, type FROM pragma_table_info('{}')",
148        table.replace('\'', "''")
149    );
150    match db.query_libsql(&sql).await {
151        Ok(batches) => (StatusCode::OK, Json(batches_to_response(&batches))).into_response(),
152        Err(e) => error_response(StatusCode::BAD_REQUEST, e).into_response(),
153    }
154}
155
156async fn health() -> impl IntoResponse {
157    Json(serde_json::json!({"status": "ok"}))
158}