1use std::sync::Arc;
2
3use alopex_core::columnar::encoding::Column;
4use alopex_core::columnar::kvs_bridge::ColumnarKvsBridge;
5use alopex_core::columnar::segment_v2::{ColumnSegmentV2, RecordBatch};
6use alopex_core::columnar::ColumnarError;
7use alopex_core::kv::KVTransaction;
8use alopex_core::storage::format::bincode_config;
9use alopex_core::types::TxnMode;
10use alopex_core::KVStore;
11use alopex_sql::SqlValue;
12use axum::extract::Extension;
13use axum::response::Response;
14use axum::Json;
15use bincode::config::Options;
16use serde::{Deserialize, Serialize};
17use std::hash::{Hash, Hasher};
18use std::time::Instant;
19
20use crate::error::{Result, ServerError};
21use crate::http::{error_response, json_response, RequestContext};
22use crate::server::ServerState;
23
24#[derive(Debug, Deserialize)]
25pub struct ColumnarScanRequest {
26 pub segment_id: String,
27}
28
29#[derive(Debug, Deserialize)]
30pub struct ColumnarStatsRequest {
31 pub segment_id: String,
32}
33
34#[derive(Debug, Deserialize)]
35pub struct ColumnarIndexCreateRequest {
36 pub segment_id: String,
37 pub column: String,
38 pub index_type: String,
39}
40
41#[derive(Debug, Deserialize)]
42pub struct ColumnarIndexListRequest {
43 pub segment_id: String,
44}
45
46#[derive(Debug, Deserialize)]
47pub struct ColumnarIndexDropRequest {
48 pub segment_id: String,
49 pub column: String,
50}
51
52#[derive(Debug, Deserialize)]
53pub struct ColumnarIngestRequest {
54 pub table: String,
55 pub compression: String,
56 pub segment: Vec<u8>,
57}
58
59#[derive(Debug, Serialize)]
60pub struct ColumnarScanResponse {
61 pub rows: Vec<Vec<SqlValue>>,
62}
63
64#[derive(Debug, Serialize)]
65pub struct ColumnarStatsResponse {
66 pub row_count: usize,
67 pub column_count: usize,
68 pub size_bytes: u64,
69}
70
71#[derive(Debug, Serialize)]
72pub struct ColumnarListResponse {
73 pub segments: Vec<String>,
74}
75
76#[derive(Debug, Serialize)]
77pub struct ColumnarIndexInfo {
78 pub column: String,
79 pub index_type: String,
80}
81
82#[derive(Debug, Serialize)]
83pub struct ColumnarIndexListResponse {
84 pub indexes: Vec<ColumnarIndexInfo>,
85}
86
87#[derive(Debug, Serialize)]
88pub struct ColumnarIngestResponse {
89 pub row_count: u64,
90 pub segment_id: String,
91 pub size_bytes: u64,
92 pub compression: String,
93 pub elapsed_ms: u64,
94}
95
96#[derive(Debug, Serialize)]
97pub struct ColumnarStatusResponse {
98 pub success: bool,
99}
100
101pub async fn scan(
102 Extension(state): Extension<Arc<ServerState>>,
103 Extension(ctx): Extension<RequestContext>,
104 Json(request): Json<ColumnarScanRequest>,
105) -> Response {
106 match scan_impl(state.clone(), request) {
107 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
108 Err(err) => error_response(err, &ctx),
109 }
110}
111
112pub async fn stats(
113 Extension(state): Extension<Arc<ServerState>>,
114 Extension(ctx): Extension<RequestContext>,
115 Json(request): Json<ColumnarStatsRequest>,
116) -> Response {
117 match stats_impl(state.clone(), request) {
118 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
119 Err(err) => error_response(err, &ctx),
120 }
121}
122
123pub async fn list(
124 Extension(state): Extension<Arc<ServerState>>,
125 Extension(ctx): Extension<RequestContext>,
126) -> Response {
127 match list_impl(state.clone()) {
128 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
129 Err(err) => error_response(err, &ctx),
130 }
131}
132
133pub async fn index_create(
134 Extension(state): Extension<Arc<ServerState>>,
135 Extension(ctx): Extension<RequestContext>,
136 Json(request): Json<ColumnarIndexCreateRequest>,
137) -> Response {
138 match index_create_impl(state.clone(), request) {
139 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
140 Err(err) => error_response(err, &ctx),
141 }
142}
143
144pub async fn index_list(
145 Extension(state): Extension<Arc<ServerState>>,
146 Extension(ctx): Extension<RequestContext>,
147 Json(request): Json<ColumnarIndexListRequest>,
148) -> Response {
149 match index_list_impl(state.clone(), request) {
150 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
151 Err(err) => error_response(err, &ctx),
152 }
153}
154
155pub async fn index_drop(
156 Extension(state): Extension<Arc<ServerState>>,
157 Extension(ctx): Extension<RequestContext>,
158 Json(request): Json<ColumnarIndexDropRequest>,
159) -> Response {
160 match index_drop_impl(state.clone(), request) {
161 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
162 Err(err) => error_response(err, &ctx),
163 }
164}
165
166pub async fn ingest(
167 Extension(state): Extension<Arc<ServerState>>,
168 Extension(ctx): Extension<RequestContext>,
169 Json(request): Json<ColumnarIngestRequest>,
170) -> Response {
171 match ingest_impl(state.clone(), request) {
172 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
173 Err(err) => error_response(err, &ctx),
174 }
175}
176
177fn scan_impl(
178 state: Arc<ServerState>,
179 request: ColumnarScanRequest,
180) -> Result<ColumnarScanResponse> {
181 let bridge = ColumnarKvsBridge::new(state.store.clone());
182 let (table_id, seg_id) = parse_segment_id(&request.segment_id)?;
183 let column_count = bridge
184 .column_count(table_id, seg_id)
185 .map_err(map_columnar_error)?;
186 let batches = bridge
187 .read_segment(table_id, seg_id, &(0..column_count).collect::<Vec<_>>())
188 .map_err(map_columnar_error)?;
189 Ok(ColumnarScanResponse {
190 rows: batches_to_rows(batches),
191 })
192}
193
194fn stats_impl(
195 state: Arc<ServerState>,
196 request: ColumnarStatsRequest,
197) -> Result<ColumnarStatsResponse> {
198 let bridge = ColumnarKvsBridge::new(state.store.clone());
199 let (table_id, seg_id) = parse_segment_id(&request.segment_id)?;
200 let column_count = bridge
201 .column_count(table_id, seg_id)
202 .map_err(map_columnar_error)?;
203 let batches = bridge
204 .read_segment(table_id, seg_id, &(0..column_count).collect::<Vec<_>>())
205 .map_err(map_columnar_error)?;
206 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
207 Ok(ColumnarStatsResponse {
208 row_count,
209 column_count,
210 size_bytes: 0,
211 })
212}
213
214fn list_impl(state: Arc<ServerState>) -> Result<ColumnarListResponse> {
215 let bridge = ColumnarKvsBridge::new(state.store.clone());
216 let segments = bridge.list_segments().map_err(map_columnar_error)?;
217 let segment_ids = segments
218 .into_iter()
219 .map(|(table_id, seg_id)| format!("{}:{}", table_id, seg_id))
220 .collect();
221 Ok(ColumnarListResponse {
222 segments: segment_ids,
223 })
224}
225
226fn index_create_impl(
227 state: Arc<ServerState>,
228 request: ColumnarIndexCreateRequest,
229) -> Result<ColumnarStatusResponse> {
230 validate_segment_exists(state.store.clone(), &request.segment_id)?;
231 let index_type = parse_index_type(&request.index_type)?;
232 let key = columnar_index_key(&request.segment_id, &request.column);
233 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
234 txn.put(key, index_type.as_bytes().to_vec())?;
235 txn.commit_self()?;
236 Ok(ColumnarStatusResponse { success: true })
237}
238
239fn index_list_impl(
240 state: Arc<ServerState>,
241 request: ColumnarIndexListRequest,
242) -> Result<ColumnarIndexListResponse> {
243 validate_segment_exists(state.store.clone(), &request.segment_id)?;
244 let prefix = columnar_index_prefix(&request.segment_id);
245 let mut txn = state.store.begin(TxnMode::ReadOnly)?;
246 let mut indexes = Vec::new();
247 for (key, value) in txn.scan_prefix(&prefix)? {
248 let column = parse_index_column(&request.segment_id, &key)?;
249 let index_type = parse_index_type(std::str::from_utf8(&value).map_err(|_| {
250 ServerError::BadRequest("columnar index type is not valid UTF-8".into())
251 })?)?;
252 indexes.push(ColumnarIndexInfo { column, index_type });
253 }
254 txn.commit_self()?;
255 Ok(ColumnarIndexListResponse { indexes })
256}
257
258fn index_drop_impl(
259 state: Arc<ServerState>,
260 request: ColumnarIndexDropRequest,
261) -> Result<ColumnarStatusResponse> {
262 validate_segment_exists(state.store.clone(), &request.segment_id)?;
263 let key = columnar_index_key(&request.segment_id, &request.column);
264 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
265 let exists = txn.get(&key)?.is_some();
266 if !exists {
267 txn.rollback_self()?;
268 return Err(ServerError::NotFound(format!(
269 "columnar index {}:{} not found",
270 request.segment_id, request.column
271 )));
272 }
273 txn.delete(key)?;
274 txn.commit_self()?;
275 Ok(ColumnarStatusResponse { success: true })
276}
277
278fn ingest_impl(
279 state: Arc<ServerState>,
280 request: ColumnarIngestRequest,
281) -> Result<ColumnarIngestResponse> {
282 if request.table.is_empty() {
283 return Err(ServerError::BadRequest("table is required".into()));
284 }
285 let start = Instant::now();
286 let segment: ColumnSegmentV2 =
287 bincode_config()
288 .deserialize(&request.segment)
289 .map_err(|err| {
290 ServerError::BadRequest(format!("invalid columnar segment payload: {err}"))
291 })?;
292 let bridge = ColumnarKvsBridge::new(state.store.clone());
293 let table_id = table_id(&request.table)?;
294 let seg_id = bridge
295 .write_segment(table_id, &segment)
296 .map_err(map_columnar_error)?;
297 let segment_id = format!("{}:{}", table_id, seg_id);
298 Ok(ColumnarIngestResponse {
299 row_count: segment.meta.num_rows,
300 segment_id,
301 size_bytes: segment.meta.compressed_size,
302 compression: request.compression,
303 elapsed_ms: start.elapsed().as_millis() as u64,
304 })
305}
306
307fn validate_segment_exists(store: Arc<alopex_core::kv::AnyKV>, segment_id: &str) -> Result<()> {
308 let bridge = ColumnarKvsBridge::new(store);
309 let (table_id, seg_id) = parse_segment_id(segment_id)?;
310 bridge
311 .column_count(table_id, seg_id)
312 .map_err(map_columnar_error)?;
313 Ok(())
314}
315
316fn batches_to_rows(batches: Vec<RecordBatch>) -> Vec<Vec<SqlValue>> {
317 let mut rows = Vec::new();
318 for batch in batches {
319 let num_rows = batch.num_rows();
320 for row_idx in 0..num_rows {
321 let mut row = Vec::with_capacity(batch.columns.len());
322 for col in &batch.columns {
323 row.push(column_value_to_sql_value(col, row_idx));
324 }
325 rows.push(row);
326 }
327 }
328 rows
329}
330
331fn column_value_to_sql_value(col: &Column, row_idx: usize) -> SqlValue {
332 match col {
333 Column::Int64(vals) => vals
334 .get(row_idx)
335 .map(|&v| SqlValue::BigInt(v))
336 .unwrap_or(SqlValue::Null),
337 Column::Float32(vals) => vals
338 .get(row_idx)
339 .map(|&v| SqlValue::Float(v))
340 .unwrap_or(SqlValue::Null),
341 Column::Float64(vals) => vals
342 .get(row_idx)
343 .map(|&v| SqlValue::Double(v))
344 .unwrap_or(SqlValue::Null),
345 Column::Bool(vals) => vals
346 .get(row_idx)
347 .copied()
348 .map(SqlValue::Boolean)
349 .unwrap_or(SqlValue::Null),
350 Column::Binary(vals) => vals
351 .get(row_idx)
352 .cloned()
353 .map(SqlValue::Blob)
354 .unwrap_or(SqlValue::Null),
355 Column::Fixed { values, .. } => values
356 .get(row_idx)
357 .cloned()
358 .map(SqlValue::Blob)
359 .unwrap_or(SqlValue::Null),
360 }
361}
362
363fn parse_segment_id(segment_id: &str) -> Result<(u32, u64)> {
364 let parts: Vec<&str> = segment_id.split(':').collect();
365 if parts.len() != 2 {
366 return Err(ServerError::BadRequest(format!(
367 "invalid segment ID format: expected 'table_id:segment_id', got '{}'",
368 segment_id
369 )));
370 }
371
372 let table_id: u32 = parts[0].parse().map_err(|_| {
373 ServerError::BadRequest(format!("invalid table_id in segment ID: '{}'", parts[0]))
374 })?;
375
376 let seg_id: u64 = parts[1].parse().map_err(|_| {
377 ServerError::BadRequest(format!("invalid segment_id in segment ID: '{}'", parts[1]))
378 })?;
379
380 Ok((table_id, seg_id))
381}
382
383fn table_id(table: &str) -> Result<u32> {
384 let mut hasher = std::collections::hash_map::DefaultHasher::new();
385 table.hash(&mut hasher);
386 Ok((hasher.finish() & 0xffff_ffff) as u32)
387}
388
389const COLUMNAR_INDEX_PREFIX: &str = "__alopex_columnar_index__:";
390
391fn columnar_index_key(segment: &str, column: &str) -> Vec<u8> {
392 let mut key =
393 String::with_capacity(COLUMNAR_INDEX_PREFIX.len() + segment.len() + column.len() + 1);
394 key.push_str(COLUMNAR_INDEX_PREFIX);
395 key.push_str(segment);
396 key.push(':');
397 key.push_str(column);
398 key.into_bytes()
399}
400
401fn columnar_index_prefix(segment: &str) -> Vec<u8> {
402 let mut key = String::with_capacity(COLUMNAR_INDEX_PREFIX.len() + segment.len() + 1);
403 key.push_str(COLUMNAR_INDEX_PREFIX);
404 key.push_str(segment);
405 key.push(':');
406 key.into_bytes()
407}
408
409fn parse_index_column(segment: &str, key: &[u8]) -> Result<String> {
410 let prefix = columnar_index_prefix(segment);
411 if !key.starts_with(&prefix) {
412 return Err(ServerError::BadRequest(
413 "columnar index key is invalid".into(),
414 ));
415 }
416 let suffix = &key[prefix.len()..];
417 String::from_utf8(suffix.to_vec())
418 .map_err(|_| ServerError::BadRequest("columnar index column is not valid UTF-8".into()))
419}
420
421fn parse_index_type(raw: &str) -> Result<String> {
422 match raw {
423 "minmax" | "bloom" => Ok(raw.to_string()),
424 other => Err(ServerError::BadRequest(format!(
425 "unknown columnar index type: {other}"
426 ))),
427 }
428}
429
430fn map_columnar_error(err: ColumnarError) -> ServerError {
431 match err {
432 ColumnarError::NotFound => ServerError::NotFound("columnar segment not found".into()),
433 ColumnarError::InvalidFormat(message) => {
434 ServerError::BadRequest(format!("columnar segment invalid: {message}"))
435 }
436 ColumnarError::MemoryLimitExceeded { limit, requested } => ServerError::PayloadTooLarge(
437 format!("memory limit {limit} exceeded by {requested} bytes"),
438 ),
439 other => ServerError::Core(other.into()),
440 }
441}