1use std::cmp::Ordering;
2use std::collections::HashMap;
3use std::sync::{Arc, Mutex};
4
5use arc_swap::ArcSwap;
6use arrow::array::{
7 Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
8 Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
9 StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
10};
11use arrow::compute;
12use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
13use arrow::datatypes::{DataType, Field, Schema};
14use async_trait::async_trait;
15use parquet::arrow::ProjectionMask;
16use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
17use serde_json::Value as JsonValue;
18
19use datafusion::datasource::file_format::parquet::ParquetFormat;
20use datafusion::datasource::listing::{
21 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
22};
23use datafusion::datasource::{MemTable, TableProvider};
24use datafusion::prelude::SessionContext;
25use datafusion::scalar::ScalarValue;
26
27use object_store::aws::AmazonS3Builder;
28use url::Url;
29
30use datapress_core::backend::{
31 ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
32};
33use datapress_core::config::{
34 AddressingStyle, AppConfig, DatasetConfig, IndexConfig, IndexMode, Partitioning, ResolvedCreds,
35 S3Config, SourceKind,
36};
37use datapress_core::errors::AppError;
38use datapress_core::models::{CountRequest, Predicate, QueryRequest};
39use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
40
41type EqIndex = HashMap<String, HashMap<String, Vec<u32>>>;
47
48pub struct DatasetState {
63 pub schema: DatasetSchema,
64 pub data: Vec<RecordBatch>,
65 pub arrow_schema: Arc<Schema>,
66 pub index: EqIndex,
67 pub lazy: bool,
68}
69
70impl DatasetState {
71 pub fn num_rows(&self) -> usize {
73 self.data.iter().map(|b| b.num_rows()).sum()
74 }
75}
76
77pub struct Store {
82 ctx: SessionContext,
83 max_page_size: u64,
84 configs: HashMap<String, DatasetConfig>,
87 datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
89 reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
92}
93
94impl Store {
95 pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
97 if cfg
100 .datasets
101 .iter()
102 .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
103 {
104 deltalake::aws::register_handlers(None);
105 }
106
107 let ctx = SessionContext::new();
108 let mut datasets = HashMap::with_capacity(cfg.datasets.len());
109 let mut configs = HashMap::with_capacity(cfg.datasets.len());
110
111 for d in &cfg.datasets {
112 let (state, provider) = build_dataset(d, &ctx).await?;
113 ctx.register_table(d.name.as_str(), provider)?;
114 datasets.insert(d.name.clone(), Arc::new(state));
115 configs.insert(d.name.clone(), d.clone());
116 }
117 Ok(Self {
118 ctx,
119 max_page_size: cfg.server.max_page_size.max(1),
120 configs,
121 datasets: ArcSwap::from_pointee(datasets),
122 reload_locks: Mutex::new(HashMap::new()),
123 })
124 }
125
126 pub fn names(&self) -> Vec<String> {
128 let snap = self.datasets.load();
129 let mut v: Vec<String> = snap.keys().cloned().collect();
130 v.sort();
131 v
132 }
133
134 pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
135 self.datasets
136 .load()
137 .get(name)
138 .cloned()
139 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
140 }
141
142 pub async fn sample(&self, name: &str) -> Result<String, AppError> {
145 let st = self.dataset(name)?;
146
147 if st.lazy {
149 let table = DatasetSchema::quote_ident(&st.schema.name);
150 let sql = format!("SELECT * FROM {table} LIMIT 1");
151 let df = self.ctx.sql(&sql).await?;
152 let batches = df.collect().await?;
153 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
154 return Ok("null".into());
155 }
156 let arr = serialize(&batches[0].slice(0, 1))?;
157 let trimmed = arr.trim();
158 let inner = trimmed
159 .strip_prefix('[')
160 .and_then(|s| s.strip_suffix(']'))
161 .unwrap_or(trimmed);
162 return Ok(inner.to_string());
163 }
164
165 let first = match st.data.iter().find(|b| b.num_rows() > 0) {
166 Some(b) => b,
167 None => return Ok("null".into()),
168 };
169 let arr = serialize(&first.slice(0, 1))?;
170 let trimmed = arr.trim();
172 let inner = trimmed
173 .strip_prefix('[')
174 .and_then(|s| s.strip_suffix(']'))
175 .unwrap_or(trimmed);
176 Ok(inner.to_string())
177 }
178
179 pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
184 let cfg = self
186 .configs
187 .get(name)
188 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
189 .clone();
190
191 let lock = {
193 let mut locks = self.reload_locks.lock().unwrap();
194 locks
195 .entry(name.to_string())
196 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
197 .clone()
198 };
199 let _guard = lock.lock().await;
200
201 let started = std::time::Instant::now();
202
203 let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
206 let rows = state.num_rows();
207
208 let _ = self.ctx.deregister_table(name)?;
214 self.ctx.register_table(name, provider)?;
215
216 let mut new_map = (**self.datasets.load()).clone();
217 new_map.insert(name.to_string(), Arc::new(state));
218 self.datasets.store(Arc::new(new_map));
219
220 let elapsed_ms = started.elapsed().as_millis();
221 log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
222 Ok(ReloadStats { rows, elapsed_ms })
223 }
224
225 pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
229 let batch = self.query_batch(name, req).await?;
230 if batch.num_rows() == 0 {
231 return Ok("[]".to_string());
232 }
233 serialize(&batch)
234 }
235
236 pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
240 let batch = self.query_batch(name, req).await?;
241 let schema = batch.schema();
242 let mut buf = Vec::with_capacity(8 * 1024);
243 {
244 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
245 if batch.num_rows() > 0 {
246 w.write(&batch)?;
247 }
248 w.finish()?;
249 }
250 Ok(buf)
251 }
252
253 pub async fn query_arrow_stream(
254 &self,
255 name: &str,
256 req: &QueryRequest,
257 ) -> Result<ArrowIpcStream, AppError> {
258 let batches = self.query_batches(name, req).await?;
259 Ok(stream_arrow_batches(batches))
260 } pub async fn query_arrow_stream_all(
261 &self,
262 name: &str,
263 req: &QueryRequest,
264 ) -> Result<ArrowIpcStream, AppError> {
265 let batches = self.query_batches_all(name, req).await?;
266 Ok(stream_arrow_batches(batches))
267 }
268
269 pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
277 let req = QueryRequest {
279 columns: Vec::new(),
280 predicates: Vec::new(),
281 group_by: Vec::new(),
282 aggregations: Vec::new(),
283 distinct: false,
284 order_by: Vec::new(),
285 limit: None,
286 page: 1,
287 page_size: 1,
288 };
289 let st = self.dataset(name)?;
290 let batches = self.query_batches_all(name, &req).await?;
291 let schema = batches
295 .first()
296 .map(|b| b.schema())
297 .unwrap_or_else(|| st.arrow_schema.clone());
298
299 let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
300 {
301 let props = parquet::file::properties::WriterProperties::builder()
302 .set_compression(parquet::basic::Compression::SNAPPY)
303 .build();
304 let mut writer =
305 parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
306 .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
307 for batch in &batches {
308 if batch.num_rows() > 0 {
309 writer
310 .write(batch)
311 .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
312 }
313 }
314 writer
315 .close()
316 .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
317 }
318 Ok(bytes::Bytes::from(buf))
319 }
320
321 async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
324 let batches = self.query_batches(name, req).await?;
325 if batches.is_empty() {
326 return Ok(RecordBatch::new_empty(Arc::new(
327 arrow::datatypes::Schema::empty(),
328 )));
329 }
330 if batches.len() == 1 {
331 return Ok(batches.into_iter().next().expect("checked len"));
332 }
333 if batches.iter().all(|b| b.num_rows() == 0) {
334 return Ok(RecordBatch::new_empty(batches[0].schema()));
335 }
336 let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
337 Ok(batch)
338 }
339
340 async fn query_batches(
344 &self,
345 name: &str,
346 req: &QueryRequest,
347 ) -> Result<Vec<RecordBatch>, AppError> {
348 let st = self.dataset(name)?;
349
350 let page = req.page.max(1);
351 let page_size = req.page_size.clamp(1, self.max_page_size);
352 let offset = ((page - 1) * page_size) as usize;
353 let limit = page_size as usize;
354
355 self.query_batches_inner(st, req, Some((offset, limit)))
356 .await
357 }
358
359 async fn query_batches_all(
363 &self,
364 name: &str,
365 req: &QueryRequest,
366 ) -> Result<Vec<RecordBatch>, AppError> {
367 let st = self.dataset(name)?;
368 self.query_batches_inner(st, req, None).await
369 }
370
371 async fn query_batches_inner(
372 &self,
373 st: Arc<DatasetState>,
374 req: &QueryRequest,
375 page_window: Option<(usize, usize)>,
376 ) -> Result<Vec<RecordBatch>, AppError> {
377 let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
378
379 let can_fast_path = !st.lazy
386 && req.order_by.is_empty()
387 && (page_window.is_none() || req.limit.is_none())
388 && req.group_by.is_empty()
389 && !req.distinct;
390
391 if can_fast_path {
392 let total = st.num_rows();
393
394 if req.predicates.is_empty() {
397 if page_window.is_none() && req.limit.is_none() {
398 return st
399 .data
400 .iter()
401 .cloned()
402 .map(|batch| project(&st.schema, batch, &req.columns))
403 .collect();
404 }
405 let start = offset.min(total);
406 let len = limit.min(total - start);
407 let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
408 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
409 }
410
411 if let Some(rows) = try_index(&st.index, &req.predicates) {
414 let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
415 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
416 }
417 }
418
419 let (sql, params) = match page_window {
421 Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
422 None => build_query_stream_sql(&st.schema, req)?,
423 };
424 let mut df = self.ctx.sql(&sql).await?;
425 if !params.is_empty() {
426 df = df.with_param_values(params)?;
427 }
428 let batches = df.collect().await?;
429 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
430 let schema = batches
431 .first()
432 .map(|b| b.schema())
433 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
434 return Ok(vec![RecordBatch::new_empty(schema)]);
435 }
436 Ok(batches)
437 }
438}
439
440fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
441 let schema = batches
442 .first()
443 .map(|batch| batch.schema())
444 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
445 let (mut writer, stream) = arrow_ipc_stream_channel(8);
446
447 tokio::task::spawn_blocking(move || {
448 let result = (|| -> Result<(), AppError> {
449 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
450 for batch in batches {
451 if batch.num_rows() > 0 {
452 w.write(&batch)?;
453 }
454 }
455 w.finish()?;
456 Ok(())
457 })();
458 if let Err(err) = result {
459 log::error!("datafusion arrow stream failed: {err}");
460 writer.send_error(err);
461 }
462 });
463
464 stream
465}
466
467impl Store {
468 pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
472 let st = self.dataset(name)?;
473
474 if !st.lazy {
475 if req.predicates.is_empty() {
477 return Ok(st.num_rows() as i64);
478 }
479 if let Some(rows) = try_index(&st.index, &req.predicates) {
481 return Ok(rows.len() as i64);
482 }
483 }
484
485 let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
488 let mut df = self.ctx.sql(&sql).await?;
489 if !params.is_empty() {
490 df = df.with_param_values(params)?;
491 }
492 let batches = df.collect().await?;
493 let n = batches
494 .first()
495 .and_then(|b| {
496 b.column(0)
497 .as_any()
498 .downcast_ref::<arrow::array::Int64Array>()
499 })
500 .filter(|a| !a.is_empty())
501 .map(|a| a.value(0))
502 .unwrap_or(0);
503 Ok(n)
504 }
505}
506
507async fn build_dataset(
512 d: &DatasetConfig,
513 ctx: &SessionContext,
514) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
515 if d.lazy {
521 match (d.source.kind, d.source.is_s3()) {
522 (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
523 (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
524 (SourceKind::Delta, _) => {
525 return Err(AppError::Internal(format!(
526 "dataset '{}': lazy mode is not supported for delta sources",
527 d.name
528 )));
529 }
530 }
531 }
532
533 let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
538 (SourceKind::Parquet, false) => read_local_parquet(d)?,
539 (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
540 (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
541 (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
542 };
543 if raw_batches.is_empty() {
544 return Err(AppError::Internal(format!(
545 "dataset '{}': source produced no batches",
546 d.name
547 )));
548 }
549
550 let chunks = raw_batches;
551 let arrow_sch = chunks[0].schema();
552
553 let columns: Vec<ColumnInfo> = arrow_sch
555 .fields()
556 .iter()
557 .map(|f| {
558 let dt = f.data_type();
559 ColumnInfo {
560 name: f.name().clone(),
561 logical: arrow_to_logical(dt),
562 sql_type: format!("{dt:?}"),
563 nullable: f.is_nullable(),
564 }
565 })
566 .collect();
567 let schema = DatasetSchema::new(&d.name, columns);
568
569 let index = build_eq_index_with_policy(&chunks, &d.index);
574
575 let n_parts = std::thread::available_parallelism()
580 .map(|n| n.get())
581 .unwrap_or(4);
582 let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
583 for (i, b) in chunks.iter().enumerate() {
584 if b.num_rows() == 0 {
585 continue;
586 }
587 parts[i % n_parts].push(b.clone());
588 }
589 parts.retain(|p| !p.is_empty());
590 let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
591
592 let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
593 let mem_mb: usize = chunks
594 .iter()
595 .flat_map(|b| b.columns().iter())
596 .map(|c| c.get_buffer_memory_size())
597 .sum::<usize>()
598 / 1_048_576;
599 log::info!(
600 "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
601 d.name,
602 d.source.kind.as_str(),
603 total_rows,
604 schema.columns.len(),
605 mem_mb,
606 chunks.len(),
607 index.len()
608 );
609
610 Ok((
611 DatasetState {
612 schema,
613 data: chunks,
614 arrow_schema: arrow_sch,
615 index,
616 lazy: false,
617 },
618 provider,
619 ))
620}
621
622async fn build_lazy_local_parquet(
627 d: &DatasetConfig,
628 ctx: &SessionContext,
629) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
630 let (url, part_keys) = lazy_local_listing(d)?;
631
632 let mut opts =
633 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
634 if !part_keys.is_empty() {
635 opts = opts.with_table_partition_cols(
636 part_keys
637 .iter()
638 .map(|k| (k.clone(), DataType::Utf8))
639 .collect(),
640 );
641 }
642
643 let session_state = ctx.state();
644 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
647 AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
648 })?;
649
650 let cfg = ListingTableConfig::new(url)
651 .with_listing_options(opts)
652 .with_schema(file_schema.clone());
653 let table = ListingTable::try_new(cfg).map_err(|e| {
654 AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
655 })?;
656 let provider: Arc<dyn TableProvider> = Arc::new(table);
657
658 let mut fields: Vec<Field> = file_schema
660 .fields()
661 .iter()
662 .map(|f| f.as_ref().clone())
663 .collect();
664 for k in &part_keys {
665 if !fields.iter().any(|f| f.name() == k) {
666 fields.push(Field::new(k, DataType::Utf8, false));
667 }
668 }
669 let arrow_sch = Arc::new(Schema::new(fields));
670
671 let columns: Vec<ColumnInfo> = arrow_sch
672 .fields()
673 .iter()
674 .map(|f| {
675 let dt = f.data_type();
676 ColumnInfo {
677 name: f.name().clone(),
678 logical: arrow_to_logical(dt),
679 sql_type: format!("{dt:?}"),
680 nullable: f.is_nullable(),
681 }
682 })
683 .collect();
684 let schema = DatasetSchema::new(&d.name, columns);
685
686 log::info!(
687 "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
688 d.name,
689 d.source.kind.as_str(),
690 schema.columns.len(),
691 part_keys.len()
692 );
693
694 Ok((
695 DatasetState {
696 schema,
697 data: Vec::new(),
698 arrow_schema: arrow_sch,
699 index: EqIndex::new(),
700 lazy: true,
701 },
702 provider,
703 ))
704}
705
706fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
711 let loc = &d.source.location;
712
713 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
714 let parts: Vec<&str> = loc.split('/').collect();
715 let first_wild = parts
716 .iter()
717 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
718 .unwrap_or(parts.len());
719 let base = parts[..first_wild].join("/");
720 let base = if base.is_empty() {
721 "/".to_string()
722 } else {
723 base
724 };
725 let upper = parts.len().saturating_sub(1);
728 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
729 .iter()
730 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
731 .filter(|k| !k.is_empty())
732 .collect();
733 return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
734 }
735
736 let path = std::path::Path::new(loc);
737 if path.is_dir() {
738 let keys = discover_hive_keys(path);
739 return Ok((dir_url(path, d)?, keys));
740 }
741
742 let url = ListingTableUrl::parse(loc)
743 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
744 Ok((url, Vec::new()))
745}
746
747fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
750 let s = path.to_str().ok_or_else(|| {
751 AppError::Internal(format!(
752 "dataset '{}': non-utf8 path {}",
753 d.name,
754 path.display()
755 ))
756 })?;
757 let s = if s.ends_with('/') {
758 s.to_string()
759 } else {
760 format!("{s}/")
761 };
762 ListingTableUrl::parse(&s)
763 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
764}
765
766fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
770 let mut keys = Vec::new();
771 let mut cur = base.to_path_buf();
772 loop {
773 let Ok(rd) = std::fs::read_dir(&cur) else {
774 break;
775 };
776 let mut next: Option<(String, std::path::PathBuf)> = None;
777 for entry in rd.flatten() {
778 let p = entry.path();
779 if !p.is_dir() {
780 continue;
781 }
782 let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
783 continue;
784 };
785 if let Some((k, v)) = name.split_once('=')
786 && !k.is_empty()
787 && !v.is_empty()
788 {
789 next = Some((k.to_string(), p));
790 break;
791 }
792 }
793 match next {
794 Some((k, p)) => {
795 keys.push(k);
796 cur = p;
797 }
798 None => break,
799 }
800 }
801 keys
802}
803
804async fn build_lazy_s3_parquet(
810 d: &DatasetConfig,
811 ctx: &SessionContext,
812) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
813 register_s3_object_store(d, ctx)?;
814
815 let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
816
817 let mut fields: Vec<Field> = file_schema
819 .fields()
820 .iter()
821 .map(|f| f.as_ref().clone())
822 .collect();
823 for k in &part_keys {
824 if !fields.iter().any(|f| f.name() == k) {
825 fields.push(Field::new(k, DataType::Utf8, false));
826 }
827 }
828 let arrow_sch = Arc::new(Schema::new(fields));
829
830 let columns: Vec<ColumnInfo> = arrow_sch
831 .fields()
832 .iter()
833 .map(|f| {
834 let dt = f.data_type();
835 ColumnInfo {
836 name: f.name().clone(),
837 logical: arrow_to_logical(dt),
838 sql_type: format!("{dt:?}"),
839 nullable: f.is_nullable(),
840 }
841 })
842 .collect();
843 let schema = DatasetSchema::new(&d.name, columns);
844
845 log::info!(
846 "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
847 d.name,
848 d.source.kind.as_str(),
849 schema.columns.len(),
850 part_keys.len()
851 );
852
853 Ok((
854 DatasetState {
855 schema,
856 data: Vec::new(),
857 arrow_schema: arrow_sch,
858 index: EqIndex::new(),
859 lazy: true,
860 },
861 provider,
862 ))
863}
864
865async fn build_s3_listing_table(
871 d: &DatasetConfig,
872 ctx: &SessionContext,
873) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
874 let (url, part_keys) = s3_listing(d, ctx).await?;
875
876 let mut opts =
877 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
878 if !part_keys.is_empty() {
879 opts = opts.with_table_partition_cols(
880 part_keys
881 .iter()
882 .map(|k| (k.clone(), DataType::Utf8))
883 .collect(),
884 );
885 }
886
887 let session_state = ctx.state();
888 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
889 AppError::Internal(format!(
890 "dataset '{}': infer parquet schema on s3: {e}",
891 d.name
892 ))
893 })?;
894
895 let cfg = ListingTableConfig::new(url)
896 .with_listing_options(opts)
897 .with_schema(file_schema.clone());
898 let table = ListingTable::try_new(cfg).map_err(|e| {
899 AppError::Internal(format!(
900 "dataset '{}': ListingTable::try_new (s3): {e}",
901 d.name
902 ))
903 })?;
904 Ok((Arc::new(table), file_schema, part_keys))
905}
906
907async fn s3_listing(
913 d: &DatasetConfig,
914 ctx: &SessionContext,
915) -> Result<(ListingTableUrl, Vec<String>), AppError> {
916 let s3 = d.s3.clone().unwrap_or_default();
917 let want_partitions = !matches!(s3.partitioning, Partitioning::None);
918 let loc = &d.source.location;
919
920 if d.source.has_glob() {
921 let (base, keys) = split_glob_base_keys(loc);
922 let base = format!("{}/", base.trim_end_matches('/'));
923 let url = ListingTableUrl::parse(&base).map_err(|e| {
924 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
925 })?;
926 let keys = if want_partitions { keys } else { Vec::new() };
927 return Ok((url, keys));
928 }
929
930 let base = if loc.ends_with('/') {
931 loc.clone()
932 } else {
933 format!("{loc}/")
934 };
935 let url = ListingTableUrl::parse(&base).map_err(|e| {
936 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
937 })?;
938 let keys = if want_partitions {
939 discover_s3_hive_keys(ctx, &url).await
940 } else {
941 Vec::new()
942 };
943 Ok((url, keys))
944}
945
946fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
950 let parts: Vec<&str> = loc.split('/').collect();
951 let first_wild = parts
952 .iter()
953 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
954 .unwrap_or(parts.len());
955 let base = parts[..first_wild].join("/");
956 let base = if base.is_empty() {
957 "/".to_string()
958 } else {
959 base
960 };
961 let upper = parts.len().saturating_sub(1);
962 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
963 .iter()
964 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
965 .filter(|k| !k.is_empty())
966 .collect();
967 (base, keys)
968}
969
970async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
975 let store = match ctx.runtime_env().object_store(url.object_store()) {
976 Ok(s) => s,
977 Err(_) => return Vec::new(),
978 };
979 let mut keys = Vec::new();
980 let mut prefix = url.prefix().clone();
981 loop {
982 let listing = match store.list_with_delimiter(Some(&prefix)).await {
983 Ok(l) => l,
984 Err(_) => break,
985 };
986 let mut next: Option<object_store::path::Path> = None;
987 for cp in &listing.common_prefixes {
988 if let Some(seg) = cp.parts().next_back() {
989 let seg = seg.as_ref().to_string();
990 if let Some((k, v)) = seg.split_once('=')
991 && !k.is_empty()
992 && !v.is_empty()
993 {
994 keys.push(k.to_string());
995 next = Some(cp.clone());
996 break;
997 }
998 }
999 }
1000 match next {
1001 Some(p) => prefix = p,
1002 None => break,
1003 }
1004 }
1005 keys
1006}
1007
1008fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1025 let files = d.resolve_local_parquet_files()?;
1026 let mut all = Vec::new();
1027 let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1028 None
1029 } else {
1030 Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1031 };
1032
1033 for f in &files {
1034 let file = std::fs::File::open(f)
1035 .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1036
1037 let probe = ParquetRecordBatchReaderBuilder::try_new(
1042 file.try_clone()
1043 .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1044 )?;
1045 let parquet_schema = probe.parquet_schema().clone();
1046 let arrow_schema = probe.schema().clone();
1047 let metadata = probe.metadata().clone();
1048 drop(probe);
1049
1050 let projection = if let Some(w) = &wanted {
1052 let indices: Vec<usize> = arrow_schema
1053 .fields()
1054 .iter()
1055 .enumerate()
1056 .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1057 .map(|(i, _)| i)
1058 .collect();
1059 if indices.is_empty() {
1060 return Err(AppError::Internal(format!(
1061 "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1062 d.name,
1063 d.columns,
1064 f.display()
1065 )));
1066 }
1067 ProjectionMask::roots(&parquet_schema, indices)
1068 } else {
1069 ProjectionMask::all()
1070 };
1071
1072 let mut new_fields: Vec<Field> = arrow_schema
1080 .fields()
1081 .iter()
1082 .map(|f| f.as_ref().clone())
1083 .collect();
1084 if d.dict_encode
1085 && let Some(rg0) = metadata.row_groups().first()
1086 {
1087 for (i, fld) in arrow_schema.fields().iter().enumerate() {
1088 if !matches!(
1089 fld.data_type(),
1090 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1091 ) {
1092 continue;
1093 }
1094 if let Some(col) = rg0.columns().get(i)
1095 && col.dictionary_page_offset().is_some()
1096 {
1097 new_fields[i] = Field::new(
1098 fld.name(),
1099 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1100 fld.is_nullable(),
1101 );
1102 }
1103 }
1104 }
1105 let forced_schema = Arc::new(Schema::new(new_fields));
1106
1107 let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1108 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1109 .with_batch_size(65_536)
1110 .with_projection(projection)
1111 .build()?;
1112 let pairs = hive_pairs(f);
1116 for batch in reader {
1117 let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1118 all.push(if pairs.is_empty() {
1119 batch
1120 } else {
1121 append_partition_cols(&batch, &pairs)?
1122 });
1123 }
1124 }
1125 if all.is_empty() {
1126 return Err(AppError::Internal(format!(
1127 "dataset '{}': parquet source is empty",
1128 d.name
1129 )));
1130 }
1131 Ok(all)
1132}
1133
1134fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1137 path.components()
1138 .filter_map(|c| c.as_os_str().to_str())
1139 .filter_map(|seg| {
1140 let (k, v) = seg.split_once('=')?;
1141 if k.is_empty() || v.is_empty() || v.contains('=') {
1142 return None;
1143 }
1144 Some((k.to_string(), v.to_string()))
1145 })
1146 .collect()
1147}
1148
1149fn append_partition_cols(
1152 batch: &RecordBatch,
1153 pairs: &[(String, String)],
1154) -> Result<RecordBatch, AppError> {
1155 let n = batch.num_rows();
1156 let mut fields: Vec<Field> = batch
1157 .schema()
1158 .fields()
1159 .iter()
1160 .map(|f| f.as_ref().clone())
1161 .collect();
1162 let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1163 for (k, v) in pairs {
1164 if fields.iter().any(|f| f.name() == k) {
1165 continue;
1166 }
1167 fields.push(Field::new(k, DataType::Utf8, false));
1168 cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1169 }
1170 RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1171 .map_err(|e| AppError::Internal(e.to_string()))
1172}
1173
1174async fn read_s3_parquet(
1180 d: &DatasetConfig,
1181 ctx: &SessionContext,
1182) -> Result<Vec<RecordBatch>, AppError> {
1183 register_s3_object_store(d, ctx)?;
1184 let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1185 let df = ctx
1186 .read_table(provider)
1187 .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1188 Ok(df.collect().await?)
1189}
1190
1191async fn read_delta(
1195 d: &DatasetConfig,
1196 opts: HashMap<String, String>,
1197) -> Result<Vec<RecordBatch>, AppError> {
1198 let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1199 AppError::Internal(format!(
1200 "dataset '{}': bad delta location '{}': {e}",
1201 d.name, d.source.location
1202 ))
1203 })?;
1204 let table = deltalake::open_table_with_storage_options(url, opts)
1205 .await
1206 .map_err(|e| {
1207 AppError::Internal(format!(
1208 "dataset '{}': delta open '{}': {e}",
1209 d.name, d.source.location
1210 ))
1211 })?;
1212 let provider = table.table_provider().await.map_err(|e| {
1213 AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1214 })?;
1215 let scan_ctx = SessionContext::new();
1218 let df = scan_ctx
1219 .read_table(provider)
1220 .map_err(|e| AppError::Internal(format!("dataset '{}': delta read_table: {e}", d.name)))?;
1221 Ok(df.collect().await?)
1222}
1223
1224fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1228 let creds = d.resolved_creds();
1229 let region = d.resolved_region();
1230 let s3 = d.s3.clone().unwrap_or_default();
1231 let (bucket, _) = d.source.s3_bucket()?;
1232
1233 let mut opts = HashMap::new();
1234 opts.insert("AWS_REGION".into(), region);
1235 if let Some(ep) = s3.effective_endpoint(bucket) {
1236 opts.insert("AWS_ENDPOINT_URL".into(), ep);
1237 }
1238 if s3.allow_http {
1239 opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1240 }
1241 opts.insert(
1242 "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1243 (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1244 );
1245 if let Some(k) = creds.access_key_id {
1246 opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1247 }
1248 if let Some(s) = creds.secret_access_key {
1249 opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1250 }
1251 if let Some(t) = creds.session_token {
1252 opts.insert("AWS_SESSION_TOKEN".into(), t);
1253 }
1254 opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1256 Ok(opts)
1257}
1258
1259fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1263 let (bucket, _key) = d.source.s3_bucket()?;
1264 let creds = d.resolved_creds();
1265 let region = d.resolved_region();
1266 let s3 = d.s3.clone().unwrap_or_default();
1267
1268 let store = build_s3(bucket, ®ion, &s3, &creds).map_err(|e| {
1269 AppError::Internal(format!(
1270 "dataset '{}': build S3 store for '{bucket}': {e}",
1271 d.name
1272 ))
1273 })?;
1274
1275 let url = Url::parse(&format!("s3://{bucket}"))
1276 .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1277 ctx.register_object_store(&url, Arc::new(store));
1278 Ok(())
1279}
1280
1281fn build_s3(
1282 bucket: &str,
1283 region: &str,
1284 s3: &S3Config,
1285 creds: &ResolvedCreds,
1286) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1287 let mut b = AmazonS3Builder::new()
1288 .with_bucket_name(bucket)
1289 .with_region(region)
1290 .with_allow_http(s3.allow_http)
1291 .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1292 if let Some(ep) = s3.effective_endpoint(bucket) {
1293 b = b.with_endpoint(ep);
1294 }
1295 if let Some(k) = creds.access_key_id.as_deref() {
1296 b = b.with_access_key_id(k);
1297 }
1298 if let Some(s) = creds.secret_access_key.as_deref() {
1299 b = b.with_secret_access_key(s);
1300 }
1301 if let Some(t) = creds.session_token.as_deref() {
1302 b = b.with_token(t);
1303 }
1304 b.build()
1305}
1306
1307fn arrow_to_logical(dt: &DataType) -> LogicalType {
1308 match dt {
1309 DataType::Boolean => LogicalType::Bool,
1310 DataType::Int8
1311 | DataType::Int16
1312 | DataType::Int32
1313 | DataType::Int64
1314 | DataType::UInt8
1315 | DataType::UInt16
1316 | DataType::UInt32
1317 | DataType::UInt64 => LogicalType::Int,
1318 DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1319 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1320 DataType::Dictionary(_, v)
1324 if matches!(
1325 v.as_ref(),
1326 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1327 ) =>
1328 {
1329 LogicalType::Utf8
1330 }
1331 DataType::Date32
1332 | DataType::Date64
1333 | DataType::Time32(_)
1334 | DataType::Time64(_)
1335 | DataType::Timestamp(_, _)
1336 | DataType::Duration(_)
1337 | DataType::Interval(_) => LogicalType::Temporal,
1338 _ => LogicalType::Other,
1339 }
1340}
1341
1342fn project(
1347 schema: &DatasetSchema,
1348 batch: RecordBatch,
1349 columns: &[String],
1350) -> Result<RecordBatch, AppError> {
1351 if columns.is_empty() {
1352 return Ok(batch);
1353 }
1354 let indices: Vec<usize> = columns
1355 .iter()
1356 .map(|c| {
1357 schema
1358 .find(c)
1359 .map(|info| schema.by_name[&info.name.to_lowercase()])
1360 })
1361 .collect::<Result<_, _>>()?;
1362 let fields: Vec<Field> = indices
1363 .iter()
1364 .map(|&i| batch.schema().field(i).clone())
1365 .collect();
1366 let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
1367 Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
1368}
1369
1370#[derive(Default)]
1383struct Params {
1384 values: Vec<ScalarValue>,
1385}
1386
1387impl Params {
1388 fn new() -> Self {
1389 Self::default()
1390 }
1391
1392 fn bind(&mut self, v: ScalarValue) -> String {
1394 self.values.push(v);
1395 format!("${}", self.values.len())
1396 }
1397
1398 fn into_values(self) -> Vec<ScalarValue> {
1399 self.values
1400 }
1401}
1402
1403fn build_query_sql(
1404 schema: &DatasetSchema,
1405 req: &QueryRequest,
1406 max_page_size: u64,
1407) -> Result<(String, Vec<ScalarValue>), AppError> {
1408 let (limit, offset) = req.effective_limit_offset(max_page_size);
1409 build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
1410}
1411
1412fn build_query_stream_sql(
1413 schema: &DatasetSchema,
1414 req: &QueryRequest,
1415) -> Result<(String, Vec<ScalarValue>), AppError> {
1416 let suffix = req
1417 .limit
1418 .map(|limit| format!(" LIMIT {limit}"))
1419 .unwrap_or_default();
1420 build_query_sql_with_suffix(schema, req, &suffix)
1421}
1422
1423fn build_query_sql_with_suffix(
1424 schema: &DatasetSchema,
1425 req: &QueryRequest,
1426 suffix: &str,
1427) -> Result<(String, Vec<ScalarValue>), AppError> {
1428 let agg_plan = req.agg_plan(schema)?;
1429
1430 let cols = if let Some(plan) = &agg_plan {
1431 let mut parts: Vec<String> = plan
1433 .group_cols
1434 .iter()
1435 .map(|c| DatasetSchema::quote_ident(c))
1436 .collect();
1437 for a in &plan.aggs {
1438 let expr = a.sql_expr()?;
1439 parts.push(format!(
1440 "{expr} AS {}",
1441 DatasetSchema::quote_ident(&a.alias)
1442 ));
1443 }
1444 parts.join(", ")
1445 } else if req.columns.is_empty() {
1446 if req.distinct {
1447 "DISTINCT *".to_string()
1448 } else {
1449 "*".to_string()
1450 }
1451 } else {
1452 let list = req
1453 .columns
1454 .iter()
1455 .map(|c| {
1456 schema
1457 .find(c)
1458 .map(|info| DatasetSchema::quote_ident(&info.name))
1459 })
1460 .collect::<Result<Vec<_>, _>>()?
1461 .join(", ");
1462 if req.distinct {
1463 format!("DISTINCT {list}")
1464 } else {
1465 list
1466 }
1467 };
1468
1469 let mut params = Params::new();
1470 let clauses: Vec<String> = req
1471 .predicates
1472 .iter()
1473 .map(|p| pred_to_sql(schema, p, &mut params))
1474 .collect::<Result<_, _>>()?;
1475
1476 let table = DatasetSchema::quote_ident(&schema.name);
1477 let where_clause = if clauses.is_empty() {
1478 String::new()
1479 } else {
1480 format!(" WHERE {}", clauses.join(" AND "))
1481 };
1482 let group_clause = match &agg_plan {
1483 Some(p) => format!(
1484 " GROUP BY {}",
1485 p.group_cols
1486 .iter()
1487 .map(|c| DatasetSchema::quote_ident(c))
1488 .collect::<Vec<_>>()
1489 .join(", "),
1490 ),
1491 None => String::new(),
1492 };
1493 let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
1494 Some(s) => format!(" ORDER BY {s}"),
1495 None => String::new(),
1496 };
1497 let sql =
1498 format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{order_clause}{suffix}");
1499 Ok((sql, params.into_values()))
1500}
1501
1502fn build_count_sql(
1503 schema: &DatasetSchema,
1504 predicates: &[Predicate],
1505) -> Result<(String, Vec<ScalarValue>), AppError> {
1506 let mut params = Params::new();
1507 let clauses: Vec<String> = predicates
1508 .iter()
1509 .map(|p| pred_to_sql(schema, p, &mut params))
1510 .collect::<Result<_, _>>()?;
1511 let table = DatasetSchema::quote_ident(&schema.name);
1512 let where_clause = if clauses.is_empty() {
1513 String::new()
1514 } else {
1515 format!(" WHERE {}", clauses.join(" AND "))
1516 };
1517 let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
1518 Ok((sql, params.into_values()))
1519}
1520
1521fn pred_to_sql(
1522 schema: &DatasetSchema,
1523 pred: &Predicate,
1524 params: &mut Params,
1525) -> Result<String, AppError> {
1526 let info = schema.find(&pred.col)?;
1527 let col = DatasetSchema::quote_ident(&info.name);
1528
1529 match pred.op.as_str() {
1530 "is_null" => return Ok(format!("{col} IS NULL")),
1531 "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
1532 _ => {}
1533 }
1534
1535 let val = pred
1536 .val
1537 .as_ref()
1538 .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
1539
1540 if pred.op == "in" {
1541 let items = val
1542 .as_array()
1543 .filter(|a| !a.is_empty())
1544 .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
1545 let placeholders: Vec<String> = items
1546 .iter()
1547 .map(|item| Ok(params.bind(json_to_scalar(item)?)))
1548 .collect::<Result<_, AppError>>()?;
1549 return Ok(format!("{col} IN ({})", placeholders.join(", ")));
1550 }
1551
1552 let sql_op = match pred.op.as_str() {
1553 "eq" => "=",
1554 "neq" => "!=",
1555 "gt" => ">",
1556 "gte" => ">=",
1557 "lt" => "<",
1558 "lte" => "<=",
1559 "like" => "LIKE",
1560 "ilike" => "ILIKE",
1561 other => return Err(AppError::UnknownOperator(other.into())),
1562 };
1563 let placeholder = params.bind(json_to_scalar(val)?);
1564 Ok(format!("{col} {sql_op} {placeholder}"))
1565}
1566
1567fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
1571 match val {
1572 JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
1573 JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
1574 JsonValue::Null => Ok(ScalarValue::Null),
1575 JsonValue::Number(n) => {
1576 if let Some(i) = n.as_i64() {
1577 Ok(ScalarValue::Int64(Some(i)))
1578 } else if let Some(u) = n.as_u64() {
1579 Ok(ScalarValue::UInt64(Some(u)))
1580 } else if let Some(f) = n.as_f64() {
1581 Ok(ScalarValue::Float64(Some(f)))
1582 } else {
1583 Err(AppError::InvalidValue(
1584 "unsupported numeric literal in predicate".into(),
1585 ))
1586 }
1587 }
1588 _ => Err(AppError::InvalidValue(
1589 "unsupported literal type in predicate".into(),
1590 )),
1591 }
1592}
1593
1594fn json_index_key(val: &JsonValue) -> Option<String> {
1599 match val {
1600 JsonValue::String(s) => Some(s.clone()),
1601 JsonValue::Number(n) => Some(n.to_string()),
1602 JsonValue::Bool(b) => Some(b.to_string()),
1603 _ => None,
1604 }
1605}
1606
1607fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1608 let mut out = Vec::new();
1609 let (mut i, mut j) = (0, 0);
1610 while i < a.len() && j < b.len() {
1611 match a[i].cmp(&b[j]) {
1612 Ordering::Equal => {
1613 out.push(a[i]);
1614 i += 1;
1615 j += 1;
1616 }
1617 Ordering::Less => i += 1,
1618 Ordering::Greater => j += 1,
1619 }
1620 }
1621 out
1622}
1623
1624fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1625 let mut out = Vec::with_capacity(a.len() + b.len());
1626 let (mut i, mut j) = (0, 0);
1627 while i < a.len() && j < b.len() {
1628 match a[i].cmp(&b[j]) {
1629 Ordering::Less => {
1630 out.push(a[i]);
1631 i += 1;
1632 }
1633 Ordering::Greater => {
1634 out.push(b[j]);
1635 j += 1;
1636 }
1637 Ordering::Equal => {
1638 out.push(a[i]);
1639 i += 1;
1640 j += 1;
1641 }
1642 }
1643 }
1644 out.extend_from_slice(&a[i..]);
1645 out.extend_from_slice(&b[j..]);
1646 out
1647}
1648
1649fn try_index(index: &EqIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
1650 if predicates.is_empty() || index.is_empty() {
1651 return None;
1652 }
1653
1654 let mut result: Option<Vec<u32>> = None;
1655 for pred in predicates {
1656 let col_lower = pred.col.to_lowercase();
1657 let col_map = index.get(&col_lower)?;
1658
1659 let rows: Vec<u32> = match pred.op.as_str() {
1660 "eq" => {
1661 let key = json_index_key(pred.val.as_ref()?)?;
1662 col_map.get(&key).cloned().unwrap_or_default()
1663 }
1664 "in" => {
1665 let items = pred.val.as_ref()?.as_array()?;
1666 let mut merged: Vec<u32> = Vec::new();
1667 for item in items {
1668 if let Some(r) = col_map.get(&json_index_key(item)?) {
1669 merged = union_sorted(&merged, r);
1670 }
1671 }
1672 merged
1673 }
1674 _ => return None,
1675 };
1676
1677 result = Some(match result {
1678 None => rows,
1679 Some(r) => intersect_sorted(&r, &rows),
1680 });
1681 }
1682 result
1683}
1684
1685fn slice_global(
1688 chunks: &[RecordBatch],
1689 schema: &Arc<Schema>,
1690 offset: usize,
1691 limit: usize,
1692) -> Result<RecordBatch, AppError> {
1693 if limit == 0 || chunks.is_empty() {
1694 return Ok(RecordBatch::new_empty(schema.clone()));
1695 }
1696 let mut out = Vec::new();
1697 let mut to_skip = offset;
1698 let mut remaining = limit;
1699 for b in chunks {
1700 if remaining == 0 {
1701 break;
1702 }
1703 let n = b.num_rows();
1704 if to_skip >= n {
1705 to_skip -= n;
1706 continue;
1707 }
1708 let take = remaining.min(n - to_skip);
1709 out.push(b.slice(to_skip, take));
1710 to_skip = 0;
1711 remaining -= take;
1712 }
1713 if out.is_empty() {
1714 return Ok(RecordBatch::new_empty(schema.clone()));
1715 }
1716 compute::concat_batches(schema, out.iter()).map_err(AppError::from)
1717}
1718
1719fn take_page(
1724 chunks: &[RecordBatch],
1725 schema: &Arc<Schema>,
1726 rows: &[u32],
1727 offset: usize,
1728 limit: usize,
1729) -> Result<RecordBatch, AppError> {
1730 let start = offset.min(rows.len());
1731 let len = limit.min(rows.len() - start);
1732 if len == 0 || chunks.is_empty() {
1733 return Ok(RecordBatch::new_empty(schema.clone()));
1734 }
1735
1736 let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
1739 let mut acc: u32 = 0;
1740 offsets.push(0);
1741 for b in chunks {
1742 acc = acc
1743 .checked_add(b.num_rows() as u32)
1744 .expect("row count exceeds u32::MAX");
1745 offsets.push(acc);
1746 }
1747
1748 let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
1751 for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
1752 let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
1753 let local = gid - offsets[bi];
1754 buckets[bi].push((out_pos as u32, local));
1755 }
1756
1757 let mut takens: Vec<RecordBatch> = Vec::new();
1759 let mut dest: Vec<u32> = Vec::with_capacity(len);
1760 for (bi, bucket) in buckets.iter().enumerate() {
1761 if bucket.is_empty() {
1762 continue;
1763 }
1764 let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
1765 let cols: Vec<ArrayRef> = chunks[bi]
1766 .columns()
1767 .iter()
1768 .map(|c| {
1769 arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
1770 .map_err(AppError::from)
1771 })
1772 .collect::<Result<_, _>>()?;
1773 takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
1774 dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
1775 }
1776
1777 let stitched = compute::concat_batches(schema, takens.iter())?;
1779 let mut inv = vec![0u32; len];
1780 for (i, &d) in dest.iter().enumerate() {
1781 inv[d as usize] = i as u32;
1782 }
1783 let perm = UInt32Array::from(inv);
1784 let cols: Vec<ArrayRef> = stitched
1785 .columns()
1786 .iter()
1787 .map(|c| {
1788 arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
1789 .map_err(AppError::from)
1790 })
1791 .collect::<Result<_, _>>()?;
1792 RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
1793}
1794
1795fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
1799 use rayon::prelude::*;
1800
1801 if cfg.mode == IndexMode::None || chunks.is_empty() {
1802 return EqIndex::new();
1803 }
1804
1805 let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
1806 Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
1807 } else {
1808 None
1809 };
1810
1811 let max_card = if cfg.mode == IndexMode::Auto {
1812 Some(cfg.max_cardinality)
1813 } else {
1814 None
1815 };
1816
1817 let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
1819 let mut acc: u32 = 0;
1820 for b in chunks {
1821 batch_offsets.push(acc);
1822 acc = acc
1823 .checked_add(b.num_rows() as u32)
1824 .expect("row count exceeds u32::MAX");
1825 }
1826
1827 let schema = chunks[0].schema();
1828
1829 schema
1830 .fields()
1831 .par_iter()
1832 .enumerate()
1833 .filter_map(|(ci, field)| {
1834 let col_lower = field.name().to_lowercase();
1835 if let Some(a) = &allow
1836 && !a.contains_key(&col_lower)
1837 {
1838 return None;
1839 }
1840
1841 let dtype = field.data_type();
1844 let dict_utf8 = matches!(dtype,
1845 DataType::Dictionary(k, v)
1846 if matches!(k.as_ref(), DataType::Int32)
1847 && matches!(v.as_ref(), DataType::Utf8));
1848 match dtype {
1849 DataType::Utf8
1850 | DataType::Utf8View
1851 | DataType::Boolean
1852 | DataType::Int8
1853 | DataType::Int16
1854 | DataType::Int32
1855 | DataType::Int64 => {}
1856 _ if dict_utf8 => {}
1857 _ => return None,
1858 }
1859
1860 let mut map: HashMap<String, Vec<u32>> = HashMap::new();
1861
1862 for (bi, batch) in chunks.iter().enumerate() {
1863 let base = batch_offsets[bi];
1864 let col = batch.column(ci);
1865
1866 macro_rules! index_col {
1867 ($arr_ty:ty) => {{
1868 let arr = col.as_any().downcast_ref::<$arr_ty>()?;
1869 for row in 0..arr.len() {
1870 if arr.is_null(row) {
1871 continue;
1872 }
1873 let key = arr.value(row).to_string();
1874 let gid = base + row as u32;
1875 if let Some(v) = map.get_mut(&key) {
1876 v.push(gid);
1877 } else {
1878 if let Some(mc) = max_card {
1879 if map.len() >= mc {
1880 return None;
1881 }
1882 }
1883 map.insert(key, vec![gid]);
1884 }
1885 }
1886 }};
1887 }
1888
1889 if dict_utf8 {
1890 let arr = col
1897 .as_any()
1898 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
1899 )?;
1900 let keys = arr.keys();
1901 let values = arr.values().as_any().downcast_ref::<StringArray>()?;
1902 for row in 0..arr.len() {
1903 if arr.is_null(row) {
1904 continue;
1905 }
1906 let k = keys.value(row) as usize;
1907 let s = values.value(k);
1908 let gid = base + row as u32;
1909 if let Some(v) = map.get_mut(s) {
1910 v.push(gid);
1911 } else {
1912 if let Some(mc) = max_card
1913 && map.len() >= mc
1914 {
1915 return None;
1916 }
1917 map.insert(s.to_string(), vec![gid]);
1918 }
1919 }
1920 } else {
1921 match dtype {
1922 DataType::Utf8 => index_col!(StringArray),
1923 DataType::Utf8View => index_col!(StringViewArray),
1924 DataType::Boolean => index_col!(BooleanArray),
1925 DataType::Int8 => index_col!(Int8Array),
1926 DataType::Int16 => index_col!(Int16Array),
1927 DataType::Int32 => index_col!(Int32Array),
1928 DataType::Int64 => index_col!(Int64Array),
1929 _ => unreachable!(),
1930 }
1931 }
1932 }
1933
1934 Some((col_lower, map))
1935 })
1936 .collect()
1937}
1938
1939fn writable_inline(dt: &DataType) -> bool {
1952 match dt {
1953 DataType::Utf8
1954 | DataType::LargeUtf8
1955 | DataType::Utf8View
1956 | DataType::Boolean
1957 | DataType::Int8
1958 | DataType::Int16
1959 | DataType::Int32
1960 | DataType::Int64
1961 | DataType::UInt8
1962 | DataType::UInt16
1963 | DataType::UInt32
1964 | DataType::UInt64
1965 | DataType::Float32
1966 | DataType::Float64
1967 | DataType::Decimal128(_, _)
1968 | DataType::Decimal256(_, _) => true,
1969 DataType::Dictionary(k, v)
1970 if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
1971 {
1972 true
1973 }
1974 _ => false,
1975 }
1976}
1977
1978fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
1984 let schema = batch.schema();
1985 let to_cast: Vec<usize> = schema
1986 .fields()
1987 .iter()
1988 .enumerate()
1989 .filter_map(|(i, f)| {
1990 if writable_inline(f.data_type()) {
1991 None
1992 } else {
1993 Some(i)
1994 }
1995 })
1996 .collect();
1997 if to_cast.is_empty() {
1998 return Ok(batch.clone());
1999 }
2000 let new_fields: Vec<Field> = schema
2001 .fields()
2002 .iter()
2003 .enumerate()
2004 .map(|(i, f)| {
2005 if to_cast.contains(&i) {
2006 Field::new(f.name(), DataType::Utf8, f.is_nullable())
2007 } else {
2008 f.as_ref().clone()
2009 }
2010 })
2011 .collect();
2012 let new_schema = Arc::new(Schema::new(new_fields));
2013 let cols: Vec<ArrayRef> = batch
2014 .columns()
2015 .iter()
2016 .enumerate()
2017 .map(|(i, c)| {
2018 if to_cast.contains(&i) {
2019 compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2020 } else {
2021 Ok(c.clone())
2022 }
2023 })
2024 .collect::<Result<_, _>>()?;
2025 RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2026}
2027
2028#[allow(dead_code)]
2034#[derive(Clone, Copy)]
2035enum CmpOp {
2036 Eq,
2037 Neq,
2038 Gt,
2039 Gte,
2040 Lt,
2041 Lte,
2042 Like,
2043 ILike,
2044}
2045
2046#[allow(dead_code)]
2047fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2048 let arr = col
2049 .as_any()
2050 .downcast_ref::<StringArray>()
2051 .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2052 let s = Scalar::new(StringArray::from(vec![val]));
2053 Ok(eq(arr, &s)?)
2054}
2055
2056#[allow(dead_code)]
2057fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2058 macro_rules! num_cmp {
2059 ($arr_type:ty, $cast:ty) => {{
2060 let n = val
2061 .as_f64()
2062 .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2063 as $cast;
2064 let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2065 let s = Scalar::new(<$arr_type>::from(vec![n]));
2066 Ok(match op {
2067 CmpOp::Eq => eq(arr, &s)?,
2068 CmpOp::Neq => neq(arr, &s)?,
2069 CmpOp::Gt => gt(arr, &s)?,
2070 CmpOp::Gte => gt_eq(arr, &s)?,
2071 CmpOp::Lt => lt(arr, &s)?,
2072 CmpOp::Lte => lt_eq(arr, &s)?,
2073 CmpOp::Like | CmpOp::ILike => {
2074 return Err(AppError::InvalidValue(
2075 "LIKE requires a string column".into(),
2076 ));
2077 }
2078 })
2079 }};
2080 }
2081 match col.data_type() {
2082 DataType::Utf8 => {
2083 let s = val
2084 .as_str()
2085 .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
2086 let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
2087 let sc = Scalar::new(StringArray::from(vec![s]));
2088 Ok(match op {
2089 CmpOp::Eq => eq(arr, &sc)?,
2090 CmpOp::Neq => neq(arr, &sc)?,
2091 CmpOp::Gt => gt(arr, &sc)?,
2092 CmpOp::Gte => gt_eq(arr, &sc)?,
2093 CmpOp::Lt => lt(arr, &sc)?,
2094 CmpOp::Lte => lt_eq(arr, &sc)?,
2095 CmpOp::Like => compute::like(arr, &sc)?,
2096 CmpOp::ILike => compute::ilike(arr, &sc)?,
2097 })
2098 }
2099 DataType::Int8 => num_cmp!(Int8Array, i8),
2100 DataType::Int16 => num_cmp!(Int16Array, i16),
2101 DataType::Int32 => num_cmp!(Int32Array, i32),
2102 DataType::Int64 => num_cmp!(Int64Array, i64),
2103 DataType::Float32 => num_cmp!(Float32Array, f32),
2104 DataType::Float64 => num_cmp!(Float64Array, f64),
2105 dt => Err(AppError::InvalidValue(format!(
2106 "unsupported type for comparison: {dt:?}"
2107 ))),
2108 }
2109}
2110
2111pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
2116 let batch = cast_for_serialize(batch)?;
2121 let schema = batch.schema();
2122 let n_rows = batch.num_rows();
2123
2124 let keys: Vec<Vec<u8>> = schema
2125 .fields()
2126 .iter()
2127 .map(|f| {
2128 let mut k = Vec::with_capacity(f.name().len() + 3);
2129 k.push(b'"');
2130 k.extend_from_slice(f.name().as_bytes());
2131 k.extend_from_slice(b"\":");
2132 k
2133 })
2134 .collect();
2135
2136 let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
2137 buf.push(b'[');
2138
2139 for row in 0..n_rows {
2140 if row > 0 {
2141 buf.push(b',');
2142 }
2143 buf.push(b'{');
2144 for (i, key) in keys.iter().enumerate() {
2145 if i > 0 {
2146 buf.push(b',');
2147 }
2148 buf.extend_from_slice(key);
2149 let col = batch.column(i);
2150 if col.is_null(row) {
2151 buf.extend_from_slice(b"null");
2152 } else {
2153 write_value(&mut buf, col.as_ref(), row);
2154 }
2155 }
2156 buf.push(b'}');
2157 }
2158
2159 buf.push(b']');
2160 Ok(unsafe { String::from_utf8_unchecked(buf) })
2161}
2162
2163#[inline]
2164fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
2165 match col.data_type() {
2166 DataType::Utf8 => write_str(
2167 buf,
2168 col.as_any()
2169 .downcast_ref::<StringArray>()
2170 .unwrap()
2171 .value(row),
2172 ),
2173 DataType::LargeUtf8 => write_str(
2174 buf,
2175 col.as_any()
2176 .downcast_ref::<LargeStringArray>()
2177 .unwrap()
2178 .value(row),
2179 ),
2180 DataType::Utf8View => write_str(
2181 buf,
2182 col.as_any()
2183 .downcast_ref::<StringViewArray>()
2184 .unwrap()
2185 .value(row),
2186 ),
2187 DataType::Dictionary(key, value)
2188 if matches!(key.as_ref(), DataType::Int32)
2189 && matches!(value.as_ref(), DataType::Utf8) =>
2190 {
2191 let dict = col
2192 .as_any()
2193 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
2194 .unwrap();
2195 let keys = dict.keys();
2196 let values = dict
2197 .values()
2198 .as_any()
2199 .downcast_ref::<StringArray>()
2200 .unwrap();
2201 let k = keys.value(row) as usize;
2202 write_str(buf, values.value(k));
2203 }
2204 DataType::Boolean => {
2205 let v = col
2206 .as_any()
2207 .downcast_ref::<BooleanArray>()
2208 .unwrap()
2209 .value(row);
2210 buf.extend_from_slice(if v { b"true" } else { b"false" });
2211 }
2212 DataType::Int8 => {
2213 let mut b = itoa::Buffer::new();
2214 buf.extend_from_slice(
2215 b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
2216 .as_bytes(),
2217 );
2218 }
2219 DataType::Int16 => {
2220 let mut b = itoa::Buffer::new();
2221 buf.extend_from_slice(
2222 b.format(
2223 col.as_any()
2224 .downcast_ref::<Int16Array>()
2225 .unwrap()
2226 .value(row),
2227 )
2228 .as_bytes(),
2229 );
2230 }
2231 DataType::Int32 => {
2232 let mut b = itoa::Buffer::new();
2233 buf.extend_from_slice(
2234 b.format(
2235 col.as_any()
2236 .downcast_ref::<Int32Array>()
2237 .unwrap()
2238 .value(row),
2239 )
2240 .as_bytes(),
2241 );
2242 }
2243 DataType::Int64 => {
2244 let mut b = itoa::Buffer::new();
2245 buf.extend_from_slice(
2246 b.format(
2247 col.as_any()
2248 .downcast_ref::<Int64Array>()
2249 .unwrap()
2250 .value(row),
2251 )
2252 .as_bytes(),
2253 );
2254 }
2255 DataType::UInt8 => {
2256 let mut b = itoa::Buffer::new();
2257 buf.extend_from_slice(
2258 b.format(
2259 col.as_any()
2260 .downcast_ref::<UInt8Array>()
2261 .unwrap()
2262 .value(row),
2263 )
2264 .as_bytes(),
2265 );
2266 }
2267 DataType::UInt16 => {
2268 let mut b = itoa::Buffer::new();
2269 buf.extend_from_slice(
2270 b.format(
2271 col.as_any()
2272 .downcast_ref::<UInt16Array>()
2273 .unwrap()
2274 .value(row),
2275 )
2276 .as_bytes(),
2277 );
2278 }
2279 DataType::UInt32 => {
2280 let mut b = itoa::Buffer::new();
2281 buf.extend_from_slice(
2282 b.format(
2283 col.as_any()
2284 .downcast_ref::<UInt32Array>()
2285 .unwrap()
2286 .value(row),
2287 )
2288 .as_bytes(),
2289 );
2290 }
2291 DataType::UInt64 => {
2292 let mut b = itoa::Buffer::new();
2293 buf.extend_from_slice(
2294 b.format(
2295 col.as_any()
2296 .downcast_ref::<UInt64Array>()
2297 .unwrap()
2298 .value(row),
2299 )
2300 .as_bytes(),
2301 );
2302 }
2303 DataType::Decimal128(_, _) => {
2304 let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
2305 write_str(buf, &arr.value_as_string(row));
2306 }
2307 DataType::Decimal256(_, _) => {
2308 let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
2309 write_str(buf, &arr.value_as_string(row));
2310 }
2311 DataType::Float32 => {
2312 let v = col
2313 .as_any()
2314 .downcast_ref::<Float32Array>()
2315 .unwrap()
2316 .value(row);
2317 if v.is_finite() {
2318 let mut b = ryu::Buffer::new();
2319 buf.extend_from_slice(b.format_finite(v).as_bytes());
2320 } else {
2321 buf.extend_from_slice(b"null");
2322 }
2323 }
2324 DataType::Float64 => {
2325 let v = col
2326 .as_any()
2327 .downcast_ref::<Float64Array>()
2328 .unwrap()
2329 .value(row);
2330 if v.is_finite() {
2331 let mut b = ryu::Buffer::new();
2332 buf.extend_from_slice(b.format_finite(v).as_bytes());
2333 } else {
2334 buf.extend_from_slice(b"null");
2335 }
2336 }
2337 other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
2342 }
2343}
2344
2345#[inline]
2346fn write_str(buf: &mut Vec<u8>, s: &str) {
2347 buf.push(b'"');
2348 for &byte in s.as_bytes() {
2349 match byte {
2350 b'"' => buf.extend_from_slice(b"\\\""),
2351 b'\\' => buf.extend_from_slice(b"\\\\"),
2352 b'\n' => buf.extend_from_slice(b"\\n"),
2353 b'\r' => buf.extend_from_slice(b"\\r"),
2354 b'\t' => buf.extend_from_slice(b"\\t"),
2355 0x00..=0x1f => {
2356 buf.extend_from_slice(b"\\u00");
2357 const HEX: &[u8] = b"0123456789abcdef";
2358 buf.push(HEX[(byte >> 4) as usize]);
2359 buf.push(HEX[(byte & 0xf) as usize]);
2360 }
2361 b => buf.push(b),
2362 }
2363 }
2364 buf.push(b'"');
2365}
2366
2367#[async_trait]
2372impl Backend for Store {
2373 fn names(&self) -> Vec<String> {
2374 Store::names(self)
2375 }
2376
2377 fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
2378 let st = self.dataset(name)?;
2379 Ok(DatasetSummary {
2380 name: st.schema.name.clone(),
2381 columns: st.schema.columns.len(),
2382 rows: st.num_rows(),
2383 })
2384 }
2385
2386 fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
2387 let st = self.dataset(name)?;
2388 Ok(Arc::new(st.schema.clone()))
2389 }
2390
2391 fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
2392 let st = self.dataset(name)?;
2393 let mut cols: Vec<String> = st
2396 .schema
2397 .columns
2398 .iter()
2399 .map(|c| c.name.clone())
2400 .filter(|n| st.index.contains_key(n))
2401 .collect();
2402 let mut extras: Vec<String> = st
2405 .index
2406 .keys()
2407 .filter(|n| !cols.iter().any(|c| c == *n))
2408 .cloned()
2409 .collect();
2410 extras.sort();
2411 cols.extend(extras);
2412 Ok(cols)
2413 }
2414
2415 async fn sample(&self, name: &str) -> Result<String, AppError> {
2416 Store::sample(self, name).await
2417 }
2418
2419 async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
2420 Store::query(self, name, req).await
2421 }
2422
2423 async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
2424 Store::query_arrow(self, name, req).await
2425 }
2426
2427 async fn query_arrow_stream(
2428 &self,
2429 name: &str,
2430 req: &QueryRequest,
2431 ) -> Result<ArrowIpcStream, AppError> {
2432 Store::query_arrow_stream(self, name, req).await
2433 }
2434
2435 async fn query_arrow_stream_all(
2436 &self,
2437 name: &str,
2438 req: &QueryRequest,
2439 ) -> Result<ArrowIpcStream, AppError> {
2440 Store::query_arrow_stream_all(self, name, req).await
2441 }
2442
2443 async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
2444 Store::count(self, name, req).await
2445 }
2446
2447 async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
2448 Store::parquet(self, name).await
2449 }
2450
2451 async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
2452 Store::reload(self, name).await
2453 }
2454}