Skip to main content

alopex_server/http/
columnar.rs

1use std::sync::Arc;
2
3use alopex_core::columnar::encoding::Column;
4use alopex_core::columnar::kvs_bridge::ColumnarKvsBridge;
5use alopex_core::columnar::segment_v2::{ColumnSegmentV2, RecordBatch};
6use alopex_core::columnar::ColumnarError;
7use alopex_core::kv::KVTransaction;
8use alopex_core::storage::format::bincode_config;
9use alopex_core::types::TxnMode;
10use alopex_core::KVStore;
11use alopex_sql::SqlValue;
12use axum::extract::Extension;
13use axum::response::Response;
14use axum::Json;
15use bincode::config::Options;
16use serde::{Deserialize, Serialize};
17use std::hash::{Hash, Hasher};
18use std::time::Instant;
19
20use crate::error::{Result, ServerError};
21use crate::http::{error_response, json_response, RequestContext};
22use crate::server::ServerState;
23
24#[derive(Debug, Deserialize)]
25pub struct ColumnarScanRequest {
26    pub segment_id: String,
27}
28
29#[derive(Debug, Deserialize)]
30pub struct ColumnarStatsRequest {
31    pub segment_id: String,
32}
33
34#[derive(Debug, Deserialize)]
35pub struct ColumnarIndexCreateRequest {
36    pub segment_id: String,
37    pub column: String,
38    pub index_type: String,
39}
40
41#[derive(Debug, Deserialize)]
42pub struct ColumnarIndexListRequest {
43    pub segment_id: String,
44}
45
46#[derive(Debug, Deserialize)]
47pub struct ColumnarIndexDropRequest {
48    pub segment_id: String,
49    pub column: String,
50}
51
52#[derive(Debug, Deserialize)]
53pub struct ColumnarIngestRequest {
54    pub table: String,
55    pub compression: String,
56    pub segment: Vec<u8>,
57}
58
59#[derive(Debug, Serialize)]
60pub struct ColumnarScanResponse {
61    pub rows: Vec<Vec<SqlValue>>,
62}
63
64#[derive(Debug, Serialize)]
65pub struct ColumnarStatsResponse {
66    pub row_count: usize,
67    pub column_count: usize,
68    pub size_bytes: u64,
69}
70
71#[derive(Debug, Serialize)]
72pub struct ColumnarListResponse {
73    pub segments: Vec<String>,
74}
75
76#[derive(Debug, Serialize)]
77pub struct ColumnarIndexInfo {
78    pub column: String,
79    pub index_type: String,
80}
81
82#[derive(Debug, Serialize)]
83pub struct ColumnarIndexListResponse {
84    pub indexes: Vec<ColumnarIndexInfo>,
85}
86
87#[derive(Debug, Serialize)]
88pub struct ColumnarIngestResponse {
89    pub row_count: u64,
90    pub segment_id: String,
91    pub size_bytes: u64,
92    pub compression: String,
93    pub elapsed_ms: u64,
94}
95
96#[derive(Debug, Serialize)]
97pub struct ColumnarStatusResponse {
98    pub success: bool,
99}
100
101pub async fn scan(
102    Extension(state): Extension<Arc<ServerState>>,
103    Extension(ctx): Extension<RequestContext>,
104    Json(request): Json<ColumnarScanRequest>,
105) -> Response {
106    match scan_impl(state.clone(), request) {
107        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
108        Err(err) => error_response(err, &ctx),
109    }
110}
111
112pub async fn stats(
113    Extension(state): Extension<Arc<ServerState>>,
114    Extension(ctx): Extension<RequestContext>,
115    Json(request): Json<ColumnarStatsRequest>,
116) -> Response {
117    match stats_impl(state.clone(), request) {
118        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
119        Err(err) => error_response(err, &ctx),
120    }
121}
122
123pub async fn list(
124    Extension(state): Extension<Arc<ServerState>>,
125    Extension(ctx): Extension<RequestContext>,
126) -> Response {
127    match list_impl(state.clone()) {
128        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
129        Err(err) => error_response(err, &ctx),
130    }
131}
132
133pub async fn index_create(
134    Extension(state): Extension<Arc<ServerState>>,
135    Extension(ctx): Extension<RequestContext>,
136    Json(request): Json<ColumnarIndexCreateRequest>,
137) -> Response {
138    match index_create_impl(state.clone(), request) {
139        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
140        Err(err) => error_response(err, &ctx),
141    }
142}
143
144pub async fn index_list(
145    Extension(state): Extension<Arc<ServerState>>,
146    Extension(ctx): Extension<RequestContext>,
147    Json(request): Json<ColumnarIndexListRequest>,
148) -> Response {
149    match index_list_impl(state.clone(), request) {
150        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
151        Err(err) => error_response(err, &ctx),
152    }
153}
154
155pub async fn index_drop(
156    Extension(state): Extension<Arc<ServerState>>,
157    Extension(ctx): Extension<RequestContext>,
158    Json(request): Json<ColumnarIndexDropRequest>,
159) -> Response {
160    match index_drop_impl(state.clone(), request) {
161        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
162        Err(err) => error_response(err, &ctx),
163    }
164}
165
166pub async fn ingest(
167    Extension(state): Extension<Arc<ServerState>>,
168    Extension(ctx): Extension<RequestContext>,
169    Json(request): Json<ColumnarIngestRequest>,
170) -> Response {
171    match ingest_impl(state.clone(), request) {
172        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
173        Err(err) => error_response(err, &ctx),
174    }
175}
176
177fn scan_impl(
178    state: Arc<ServerState>,
179    request: ColumnarScanRequest,
180) -> Result<ColumnarScanResponse> {
181    let bridge = ColumnarKvsBridge::new(state.store.clone());
182    let (table_id, seg_id) = parse_segment_id(&request.segment_id)?;
183    let column_count = bridge
184        .column_count(table_id, seg_id)
185        .map_err(map_columnar_error)?;
186    let batches = bridge
187        .read_segment(table_id, seg_id, &(0..column_count).collect::<Vec<_>>())
188        .map_err(map_columnar_error)?;
189    Ok(ColumnarScanResponse {
190        rows: batches_to_rows(batches),
191    })
192}
193
194fn stats_impl(
195    state: Arc<ServerState>,
196    request: ColumnarStatsRequest,
197) -> Result<ColumnarStatsResponse> {
198    let bridge = ColumnarKvsBridge::new(state.store.clone());
199    let (table_id, seg_id) = parse_segment_id(&request.segment_id)?;
200    let column_count = bridge
201        .column_count(table_id, seg_id)
202        .map_err(map_columnar_error)?;
203    let batches = bridge
204        .read_segment(table_id, seg_id, &(0..column_count).collect::<Vec<_>>())
205        .map_err(map_columnar_error)?;
206    let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
207    Ok(ColumnarStatsResponse {
208        row_count,
209        column_count,
210        size_bytes: 0,
211    })
212}
213
214fn list_impl(state: Arc<ServerState>) -> Result<ColumnarListResponse> {
215    let bridge = ColumnarKvsBridge::new(state.store.clone());
216    let segments = bridge.list_segments().map_err(map_columnar_error)?;
217    let segment_ids = segments
218        .into_iter()
219        .map(|(table_id, seg_id)| format!("{}:{}", table_id, seg_id))
220        .collect();
221    Ok(ColumnarListResponse {
222        segments: segment_ids,
223    })
224}
225
226fn index_create_impl(
227    state: Arc<ServerState>,
228    request: ColumnarIndexCreateRequest,
229) -> Result<ColumnarStatusResponse> {
230    validate_segment_exists(state.store.clone(), &request.segment_id)?;
231    let index_type = parse_index_type(&request.index_type)?;
232    let key = columnar_index_key(&request.segment_id, &request.column);
233    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
234    txn.put(key, index_type.as_bytes().to_vec())?;
235    txn.commit_self()?;
236    Ok(ColumnarStatusResponse { success: true })
237}
238
239fn index_list_impl(
240    state: Arc<ServerState>,
241    request: ColumnarIndexListRequest,
242) -> Result<ColumnarIndexListResponse> {
243    validate_segment_exists(state.store.clone(), &request.segment_id)?;
244    let prefix = columnar_index_prefix(&request.segment_id);
245    let mut txn = state.store.begin(TxnMode::ReadOnly)?;
246    let mut indexes = Vec::new();
247    for (key, value) in txn.scan_prefix(&prefix)? {
248        let column = parse_index_column(&request.segment_id, &key)?;
249        let index_type = parse_index_type(std::str::from_utf8(&value).map_err(|_| {
250            ServerError::BadRequest("columnar index type is not valid UTF-8".into())
251        })?)?;
252        indexes.push(ColumnarIndexInfo { column, index_type });
253    }
254    txn.commit_self()?;
255    Ok(ColumnarIndexListResponse { indexes })
256}
257
258fn index_drop_impl(
259    state: Arc<ServerState>,
260    request: ColumnarIndexDropRequest,
261) -> Result<ColumnarStatusResponse> {
262    validate_segment_exists(state.store.clone(), &request.segment_id)?;
263    let key = columnar_index_key(&request.segment_id, &request.column);
264    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
265    let exists = txn.get(&key)?.is_some();
266    if !exists {
267        txn.rollback_self()?;
268        return Err(ServerError::NotFound(format!(
269            "columnar index {}:{} not found",
270            request.segment_id, request.column
271        )));
272    }
273    txn.delete(key)?;
274    txn.commit_self()?;
275    Ok(ColumnarStatusResponse { success: true })
276}
277
278fn ingest_impl(
279    state: Arc<ServerState>,
280    request: ColumnarIngestRequest,
281) -> Result<ColumnarIngestResponse> {
282    if request.table.is_empty() {
283        return Err(ServerError::BadRequest("table is required".into()));
284    }
285    let start = Instant::now();
286    let segment: ColumnSegmentV2 =
287        bincode_config()
288            .deserialize(&request.segment)
289            .map_err(|err| {
290                ServerError::BadRequest(format!("invalid columnar segment payload: {err}"))
291            })?;
292    let bridge = ColumnarKvsBridge::new(state.store.clone());
293    let table_id = table_id(&request.table)?;
294    let seg_id = bridge
295        .write_segment(table_id, &segment)
296        .map_err(map_columnar_error)?;
297    let segment_id = format!("{}:{}", table_id, seg_id);
298    Ok(ColumnarIngestResponse {
299        row_count: segment.meta.num_rows,
300        segment_id,
301        size_bytes: segment.meta.compressed_size,
302        compression: request.compression,
303        elapsed_ms: start.elapsed().as_millis() as u64,
304    })
305}
306
307fn validate_segment_exists(store: Arc<alopex_core::kv::AnyKV>, segment_id: &str) -> Result<()> {
308    let bridge = ColumnarKvsBridge::new(store);
309    let (table_id, seg_id) = parse_segment_id(segment_id)?;
310    bridge
311        .column_count(table_id, seg_id)
312        .map_err(map_columnar_error)?;
313    Ok(())
314}
315
316fn batches_to_rows(batches: Vec<RecordBatch>) -> Vec<Vec<SqlValue>> {
317    let mut rows = Vec::new();
318    for batch in batches {
319        let num_rows = batch.num_rows();
320        for row_idx in 0..num_rows {
321            let mut row = Vec::with_capacity(batch.columns.len());
322            for col in &batch.columns {
323                row.push(column_value_to_sql_value(col, row_idx));
324            }
325            rows.push(row);
326        }
327    }
328    rows
329}
330
331fn column_value_to_sql_value(col: &Column, row_idx: usize) -> SqlValue {
332    match col {
333        Column::Int64(vals) => vals
334            .get(row_idx)
335            .map(|&v| SqlValue::BigInt(v))
336            .unwrap_or(SqlValue::Null),
337        Column::Float32(vals) => vals
338            .get(row_idx)
339            .map(|&v| SqlValue::Float(v))
340            .unwrap_or(SqlValue::Null),
341        Column::Float64(vals) => vals
342            .get(row_idx)
343            .map(|&v| SqlValue::Double(v))
344            .unwrap_or(SqlValue::Null),
345        Column::Bool(vals) => vals
346            .get(row_idx)
347            .copied()
348            .map(SqlValue::Boolean)
349            .unwrap_or(SqlValue::Null),
350        Column::Binary(vals) => vals
351            .get(row_idx)
352            .cloned()
353            .map(SqlValue::Blob)
354            .unwrap_or(SqlValue::Null),
355        Column::Fixed { values, .. } => values
356            .get(row_idx)
357            .cloned()
358            .map(SqlValue::Blob)
359            .unwrap_or(SqlValue::Null),
360    }
361}
362
363fn parse_segment_id(segment_id: &str) -> Result<(u32, u64)> {
364    let parts: Vec<&str> = segment_id.split(':').collect();
365    if parts.len() != 2 {
366        return Err(ServerError::BadRequest(format!(
367            "invalid segment ID format: expected 'table_id:segment_id', got '{}'",
368            segment_id
369        )));
370    }
371
372    let table_id: u32 = parts[0].parse().map_err(|_| {
373        ServerError::BadRequest(format!("invalid table_id in segment ID: '{}'", parts[0]))
374    })?;
375
376    let seg_id: u64 = parts[1].parse().map_err(|_| {
377        ServerError::BadRequest(format!("invalid segment_id in segment ID: '{}'", parts[1]))
378    })?;
379
380    Ok((table_id, seg_id))
381}
382
383fn table_id(table: &str) -> Result<u32> {
384    let mut hasher = std::collections::hash_map::DefaultHasher::new();
385    table.hash(&mut hasher);
386    Ok((hasher.finish() & 0xffff_ffff) as u32)
387}
388
389const COLUMNAR_INDEX_PREFIX: &str = "__alopex_columnar_index__:";
390
391fn columnar_index_key(segment: &str, column: &str) -> Vec<u8> {
392    let mut key =
393        String::with_capacity(COLUMNAR_INDEX_PREFIX.len() + segment.len() + column.len() + 1);
394    key.push_str(COLUMNAR_INDEX_PREFIX);
395    key.push_str(segment);
396    key.push(':');
397    key.push_str(column);
398    key.into_bytes()
399}
400
401fn columnar_index_prefix(segment: &str) -> Vec<u8> {
402    let mut key = String::with_capacity(COLUMNAR_INDEX_PREFIX.len() + segment.len() + 1);
403    key.push_str(COLUMNAR_INDEX_PREFIX);
404    key.push_str(segment);
405    key.push(':');
406    key.into_bytes()
407}
408
409fn parse_index_column(segment: &str, key: &[u8]) -> Result<String> {
410    let prefix = columnar_index_prefix(segment);
411    if !key.starts_with(&prefix) {
412        return Err(ServerError::BadRequest(
413            "columnar index key is invalid".into(),
414        ));
415    }
416    let suffix = &key[prefix.len()..];
417    String::from_utf8(suffix.to_vec())
418        .map_err(|_| ServerError::BadRequest("columnar index column is not valid UTF-8".into()))
419}
420
421fn parse_index_type(raw: &str) -> Result<String> {
422    match raw {
423        "minmax" | "bloom" => Ok(raw.to_string()),
424        other => Err(ServerError::BadRequest(format!(
425            "unknown columnar index type: {other}"
426        ))),
427    }
428}
429
430fn map_columnar_error(err: ColumnarError) -> ServerError {
431    match err {
432        ColumnarError::NotFound => ServerError::NotFound("columnar segment not found".into()),
433        ColumnarError::InvalidFormat(message) => {
434            ServerError::BadRequest(format!("columnar segment invalid: {message}"))
435        }
436        ColumnarError::MemoryLimitExceeded { limit, requested } => ServerError::PayloadTooLarge(
437            format!("memory limit {limit} exceeded by {requested} bytes"),
438        ),
439        other => ServerError::Core(other.into()),
440    }
441}