1use std::cmp::Ordering;
2use std::collections::HashMap;
3use std::sync::{Arc, Mutex};
4
5use arc_swap::ArcSwap;
6use arrow::array::{
7 Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
8 Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
9 StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
10};
11use arrow::compute;
12use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
13use arrow::datatypes::{DataType, Field, Schema};
14use async_trait::async_trait;
15use parquet::arrow::ProjectionMask;
16use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
17use serde_json::Value as JsonValue;
18
19use datafusion::datasource::file_format::parquet::ParquetFormat;
20use datafusion::datasource::listing::{
21 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
22};
23use datafusion::datasource::{MemTable, TableProvider};
24use datafusion::prelude::{ParquetReadOptions, SessionContext};
25use datafusion::scalar::ScalarValue;
26
27use object_store::aws::AmazonS3Builder;
28use url::Url;
29
30use datapress_core::backend::{
31 ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
32};
33use datapress_core::config::{
34 AddressingStyle, AppConfig, DatasetConfig, IndexConfig, IndexMode, ResolvedCreds, S3Config,
35 SourceKind,
36};
37use datapress_core::errors::AppError;
38use datapress_core::models::{CountRequest, Predicate, QueryRequest};
39use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
40
41type EqIndex = HashMap<String, HashMap<String, Vec<u32>>>;
47
48pub struct DatasetState {
63 pub schema: DatasetSchema,
64 pub data: Vec<RecordBatch>,
65 pub arrow_schema: Arc<Schema>,
66 pub index: EqIndex,
67 pub lazy: bool,
68}
69
70impl DatasetState {
71 pub fn num_rows(&self) -> usize {
73 self.data.iter().map(|b| b.num_rows()).sum()
74 }
75}
76
77pub struct Store {
82 ctx: SessionContext,
83 max_page_size: u64,
84 configs: HashMap<String, DatasetConfig>,
87 datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
89 reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
92}
93
94impl Store {
95 pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
97 if cfg
100 .datasets
101 .iter()
102 .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
103 {
104 deltalake::aws::register_handlers(None);
105 }
106
107 let ctx = SessionContext::new();
108 let mut datasets = HashMap::with_capacity(cfg.datasets.len());
109 let mut configs = HashMap::with_capacity(cfg.datasets.len());
110
111 for d in &cfg.datasets {
112 let (state, provider) = build_dataset(d, &ctx).await?;
113 ctx.register_table(d.name.as_str(), provider)?;
114 datasets.insert(d.name.clone(), Arc::new(state));
115 configs.insert(d.name.clone(), d.clone());
116 }
117 Ok(Self {
118 ctx,
119 max_page_size: cfg.server.max_page_size.max(1),
120 configs,
121 datasets: ArcSwap::from_pointee(datasets),
122 reload_locks: Mutex::new(HashMap::new()),
123 })
124 }
125
126 pub fn names(&self) -> Vec<String> {
128 let snap = self.datasets.load();
129 let mut v: Vec<String> = snap.keys().cloned().collect();
130 v.sort();
131 v
132 }
133
134 pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
135 self.datasets
136 .load()
137 .get(name)
138 .cloned()
139 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
140 }
141
142 pub async fn sample(&self, name: &str) -> Result<String, AppError> {
145 let st = self.dataset(name)?;
146
147 if st.lazy {
149 let table = DatasetSchema::quote_ident(&st.schema.name);
150 let sql = format!("SELECT * FROM {table} LIMIT 1");
151 let df = self.ctx.sql(&sql).await?;
152 let batches = df.collect().await?;
153 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
154 return Ok("null".into());
155 }
156 let arr = serialize(&batches[0].slice(0, 1))?;
157 let trimmed = arr.trim();
158 let inner = trimmed
159 .strip_prefix('[')
160 .and_then(|s| s.strip_suffix(']'))
161 .unwrap_or(trimmed);
162 return Ok(inner.to_string());
163 }
164
165 let first = match st.data.iter().find(|b| b.num_rows() > 0) {
166 Some(b) => b,
167 None => return Ok("null".into()),
168 };
169 let arr = serialize(&first.slice(0, 1))?;
170 let trimmed = arr.trim();
172 let inner = trimmed
173 .strip_prefix('[')
174 .and_then(|s| s.strip_suffix(']'))
175 .unwrap_or(trimmed);
176 Ok(inner.to_string())
177 }
178
179 pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
184 let cfg = self
186 .configs
187 .get(name)
188 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
189 .clone();
190
191 let lock = {
193 let mut locks = self.reload_locks.lock().unwrap();
194 locks
195 .entry(name.to_string())
196 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
197 .clone()
198 };
199 let _guard = lock.lock().await;
200
201 let started = std::time::Instant::now();
202
203 let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
206 let rows = state.num_rows();
207
208 let _ = self.ctx.deregister_table(name)?;
214 self.ctx.register_table(name, provider)?;
215
216 let mut new_map = (**self.datasets.load()).clone();
217 new_map.insert(name.to_string(), Arc::new(state));
218 self.datasets.store(Arc::new(new_map));
219
220 let elapsed_ms = started.elapsed().as_millis();
221 log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
222 Ok(ReloadStats { rows, elapsed_ms })
223 }
224
225 pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
229 let batch = self.query_batch(name, req).await?;
230 if batch.num_rows() == 0 {
231 return Ok("[]".to_string());
232 }
233 serialize(&batch)
234 }
235
236 pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
240 let batch = self.query_batch(name, req).await?;
241 let schema = batch.schema();
242 let mut buf = Vec::with_capacity(8 * 1024);
243 {
244 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
245 if batch.num_rows() > 0 {
246 w.write(&batch)?;
247 }
248 w.finish()?;
249 }
250 Ok(buf)
251 }
252
253 pub async fn query_arrow_stream(
254 &self,
255 name: &str,
256 req: &QueryRequest,
257 ) -> Result<ArrowIpcStream, AppError> {
258 let batches = self.query_batches(name, req).await?;
259 Ok(stream_arrow_batches(batches))
260 }
261
262 pub async fn query_arrow_stream_all(
263 &self,
264 name: &str,
265 req: &QueryRequest,
266 ) -> Result<ArrowIpcStream, AppError> {
267 let batches = self.query_batches_all(name, req).await?;
268 Ok(stream_arrow_batches(batches))
269 }
270
271 async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
274 let batches = self.query_batches(name, req).await?;
275 if batches.is_empty() {
276 return Ok(RecordBatch::new_empty(Arc::new(
277 arrow::datatypes::Schema::empty(),
278 )));
279 }
280 if batches.len() == 1 {
281 return Ok(batches.into_iter().next().expect("checked len"));
282 }
283 if batches.iter().all(|b| b.num_rows() == 0) {
284 return Ok(RecordBatch::new_empty(batches[0].schema()));
285 }
286 let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
287 Ok(batch)
288 }
289
290 async fn query_batches(
294 &self,
295 name: &str,
296 req: &QueryRequest,
297 ) -> Result<Vec<RecordBatch>, AppError> {
298 let st = self.dataset(name)?;
299
300 let page = req.page.max(1);
301 let page_size = req.page_size.clamp(1, self.max_page_size);
302 let offset = ((page - 1) * page_size) as usize;
303 let limit = page_size as usize;
304
305 self.query_batches_inner(st, req, Some((offset, limit)))
306 .await
307 }
308
309 async fn query_batches_all(
313 &self,
314 name: &str,
315 req: &QueryRequest,
316 ) -> Result<Vec<RecordBatch>, AppError> {
317 let st = self.dataset(name)?;
318 self.query_batches_inner(st, req, None).await
319 }
320
321 async fn query_batches_inner(
322 &self,
323 st: Arc<DatasetState>,
324 req: &QueryRequest,
325 page_window: Option<(usize, usize)>,
326 ) -> Result<Vec<RecordBatch>, AppError> {
327 let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
328
329 let can_fast_path = !st.lazy
336 && req.order_by.is_empty()
337 && (page_window.is_none() || req.limit.is_none())
338 && req.group_by.is_empty()
339 && !req.distinct;
340
341 if can_fast_path {
342 let total = st.num_rows();
343
344 if req.predicates.is_empty() {
347 if page_window.is_none() && req.limit.is_none() {
348 return st
349 .data
350 .iter()
351 .cloned()
352 .map(|batch| project(&st.schema, batch, &req.columns))
353 .collect();
354 }
355 let start = offset.min(total);
356 let len = limit.min(total - start);
357 let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
358 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
359 }
360
361 if let Some(rows) = try_index(&st.index, &req.predicates) {
364 let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
365 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
366 }
367 }
368
369 let (sql, params) = match page_window {
371 Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
372 None => build_query_stream_sql(&st.schema, req)?,
373 };
374 let mut df = self.ctx.sql(&sql).await?;
375 if !params.is_empty() {
376 df = df.with_param_values(params)?;
377 }
378 let batches = df.collect().await?;
379 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
380 let schema = batches
381 .first()
382 .map(|b| b.schema())
383 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
384 return Ok(vec![RecordBatch::new_empty(schema)]);
385 }
386 Ok(batches)
387 }
388}
389
390fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
391 let schema = batches
392 .first()
393 .map(|batch| batch.schema())
394 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
395 let (mut writer, stream) = arrow_ipc_stream_channel(8);
396
397 tokio::task::spawn_blocking(move || {
398 let result = (|| -> Result<(), AppError> {
399 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
400 for batch in batches {
401 if batch.num_rows() > 0 {
402 w.write(&batch)?;
403 }
404 }
405 w.finish()?;
406 Ok(())
407 })();
408 if let Err(err) = result {
409 log::error!("datafusion arrow stream failed: {err}");
410 writer.send_error(err);
411 }
412 });
413
414 stream
415}
416
417impl Store {
418 pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
422 let st = self.dataset(name)?;
423
424 if !st.lazy {
425 if req.predicates.is_empty() {
427 return Ok(st.num_rows() as i64);
428 }
429 if let Some(rows) = try_index(&st.index, &req.predicates) {
431 return Ok(rows.len() as i64);
432 }
433 }
434
435 let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
438 let mut df = self.ctx.sql(&sql).await?;
439 if !params.is_empty() {
440 df = df.with_param_values(params)?;
441 }
442 let batches = df.collect().await?;
443 let n = batches
444 .first()
445 .and_then(|b| {
446 b.column(0)
447 .as_any()
448 .downcast_ref::<arrow::array::Int64Array>()
449 })
450 .filter(|a| !a.is_empty())
451 .map(|a| a.value(0))
452 .unwrap_or(0);
453 Ok(n)
454 }
455}
456
457async fn build_dataset(
462 d: &DatasetConfig,
463 ctx: &SessionContext,
464) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
465 if d.lazy {
471 match (d.source.kind, d.source.is_s3()) {
472 (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
473 (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
474 (SourceKind::Delta, _) => {
475 return Err(AppError::Internal(format!(
476 "dataset '{}': lazy mode is not supported for delta sources",
477 d.name
478 )));
479 }
480 }
481 }
482
483 let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
488 (SourceKind::Parquet, false) => read_local_parquet(d)?,
489 (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
490 (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
491 (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
492 };
493 if raw_batches.is_empty() {
494 return Err(AppError::Internal(format!(
495 "dataset '{}': source produced no batches",
496 d.name
497 )));
498 }
499
500 let chunks = raw_batches;
501 let arrow_sch = chunks[0].schema();
502
503 let columns: Vec<ColumnInfo> = arrow_sch
505 .fields()
506 .iter()
507 .map(|f| {
508 let dt = f.data_type();
509 ColumnInfo {
510 name: f.name().clone(),
511 logical: arrow_to_logical(dt),
512 sql_type: format!("{dt:?}"),
513 nullable: f.is_nullable(),
514 }
515 })
516 .collect();
517 let schema = DatasetSchema::new(&d.name, columns);
518
519 let index = build_eq_index_with_policy(&chunks, &d.index);
524
525 let n_parts = std::thread::available_parallelism()
530 .map(|n| n.get())
531 .unwrap_or(4);
532 let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
533 for (i, b) in chunks.iter().enumerate() {
534 if b.num_rows() == 0 {
535 continue;
536 }
537 parts[i % n_parts].push(b.clone());
538 }
539 parts.retain(|p| !p.is_empty());
540 let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
541
542 let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
543 let mem_mb: usize = chunks
544 .iter()
545 .flat_map(|b| b.columns().iter())
546 .map(|c| c.get_buffer_memory_size())
547 .sum::<usize>()
548 / 1_048_576;
549 log::info!(
550 "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
551 d.name,
552 d.source.kind.as_str(),
553 total_rows,
554 schema.columns.len(),
555 mem_mb,
556 chunks.len(),
557 index.len()
558 );
559
560 Ok((
561 DatasetState {
562 schema,
563 data: chunks,
564 arrow_schema: arrow_sch,
565 index,
566 lazy: false,
567 },
568 provider,
569 ))
570}
571
572async fn build_lazy_local_parquet(
577 d: &DatasetConfig,
578 ctx: &SessionContext,
579) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
580 let (url, part_keys) = lazy_local_listing(d)?;
581
582 let mut opts =
583 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
584 if !part_keys.is_empty() {
585 opts = opts.with_table_partition_cols(
586 part_keys
587 .iter()
588 .map(|k| (k.clone(), DataType::Utf8))
589 .collect(),
590 );
591 }
592
593 let session_state = ctx.state();
594 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
597 AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
598 })?;
599
600 let cfg = ListingTableConfig::new(url)
601 .with_listing_options(opts)
602 .with_schema(file_schema.clone());
603 let table = ListingTable::try_new(cfg).map_err(|e| {
604 AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
605 })?;
606 let provider: Arc<dyn TableProvider> = Arc::new(table);
607
608 let mut fields: Vec<Field> = file_schema
610 .fields()
611 .iter()
612 .map(|f| f.as_ref().clone())
613 .collect();
614 for k in &part_keys {
615 if !fields.iter().any(|f| f.name() == k) {
616 fields.push(Field::new(k, DataType::Utf8, false));
617 }
618 }
619 let arrow_sch = Arc::new(Schema::new(fields));
620
621 let columns: Vec<ColumnInfo> = arrow_sch
622 .fields()
623 .iter()
624 .map(|f| {
625 let dt = f.data_type();
626 ColumnInfo {
627 name: f.name().clone(),
628 logical: arrow_to_logical(dt),
629 sql_type: format!("{dt:?}"),
630 nullable: f.is_nullable(),
631 }
632 })
633 .collect();
634 let schema = DatasetSchema::new(&d.name, columns);
635
636 log::info!(
637 "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
638 d.name,
639 d.source.kind.as_str(),
640 schema.columns.len(),
641 part_keys.len()
642 );
643
644 Ok((
645 DatasetState {
646 schema,
647 data: Vec::new(),
648 arrow_schema: arrow_sch,
649 index: EqIndex::new(),
650 lazy: true,
651 },
652 provider,
653 ))
654}
655
656fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
661 let loc = &d.source.location;
662
663 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
664 let parts: Vec<&str> = loc.split('/').collect();
665 let first_wild = parts
666 .iter()
667 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
668 .unwrap_or(parts.len());
669 let base = parts[..first_wild].join("/");
670 let base = if base.is_empty() {
671 "/".to_string()
672 } else {
673 base
674 };
675 let upper = parts.len().saturating_sub(1);
678 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
679 .iter()
680 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
681 .filter(|k| !k.is_empty())
682 .collect();
683 return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
684 }
685
686 let path = std::path::Path::new(loc);
687 if path.is_dir() {
688 let keys = discover_hive_keys(path);
689 return Ok((dir_url(path, d)?, keys));
690 }
691
692 let url = ListingTableUrl::parse(loc)
693 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
694 Ok((url, Vec::new()))
695}
696
697fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
700 let s = path.to_str().ok_or_else(|| {
701 AppError::Internal(format!(
702 "dataset '{}': non-utf8 path {}",
703 d.name,
704 path.display()
705 ))
706 })?;
707 let s = if s.ends_with('/') {
708 s.to_string()
709 } else {
710 format!("{s}/")
711 };
712 ListingTableUrl::parse(&s)
713 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
714}
715
716fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
720 let mut keys = Vec::new();
721 let mut cur = base.to_path_buf();
722 loop {
723 let Ok(rd) = std::fs::read_dir(&cur) else {
724 break;
725 };
726 let mut next: Option<(String, std::path::PathBuf)> = None;
727 for entry in rd.flatten() {
728 let p = entry.path();
729 if !p.is_dir() {
730 continue;
731 }
732 let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
733 continue;
734 };
735 if let Some((k, v)) = name.split_once('=')
736 && !k.is_empty()
737 && !v.is_empty()
738 {
739 next = Some((k.to_string(), p));
740 break;
741 }
742 }
743 match next {
744 Some((k, p)) => {
745 keys.push(k);
746 cur = p;
747 }
748 None => break,
749 }
750 }
751 keys
752}
753
754async fn build_lazy_s3_parquet(
759 d: &DatasetConfig,
760 ctx: &SessionContext,
761) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
762 register_s3_object_store(d, ctx)?;
763
764 let url = ListingTableUrl::parse(&d.source.location).map_err(|e| {
765 AppError::Internal(format!(
766 "dataset '{}': bad s3 url '{}': {e}",
767 d.name, d.source.location
768 ))
769 })?;
770
771 let opts =
772 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
773
774 let session_state = ctx.state();
775 let resolved_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
776 AppError::Internal(format!(
777 "dataset '{}': infer parquet schema on s3: {e}",
778 d.name
779 ))
780 })?;
781
782 let cfg = ListingTableConfig::new(url)
783 .with_listing_options(opts)
784 .with_schema(resolved_schema.clone());
785 let table = ListingTable::try_new(cfg).map_err(|e| {
786 AppError::Internal(format!(
787 "dataset '{}': ListingTable::try_new (s3): {e}",
788 d.name
789 ))
790 })?;
791 let provider: Arc<dyn TableProvider> = Arc::new(table);
792
793 let arrow_sch = resolved_schema;
794 let columns: Vec<ColumnInfo> = arrow_sch
795 .fields()
796 .iter()
797 .map(|f| {
798 let dt = f.data_type();
799 ColumnInfo {
800 name: f.name().clone(),
801 logical: arrow_to_logical(dt),
802 sql_type: format!("{dt:?}"),
803 nullable: f.is_nullable(),
804 }
805 })
806 .collect();
807 let schema = DatasetSchema::new(&d.name, columns);
808
809 log::info!(
810 "dataset '{}' [{}, lazy, s3]: {} cols (no materialise, no index)",
811 d.name,
812 d.source.kind.as_str(),
813 schema.columns.len()
814 );
815
816 Ok((
817 DatasetState {
818 schema,
819 data: Vec::new(),
820 arrow_schema: arrow_sch,
821 index: EqIndex::new(),
822 lazy: true,
823 },
824 provider,
825 ))
826}
827
828fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
845 let files = d.resolve_local_parquet_files()?;
846 let mut all = Vec::new();
847 let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
848 None
849 } else {
850 Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
851 };
852
853 for f in &files {
854 let file = std::fs::File::open(f)
855 .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
856
857 let probe = ParquetRecordBatchReaderBuilder::try_new(
862 file.try_clone()
863 .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
864 )?;
865 let parquet_schema = probe.parquet_schema().clone();
866 let arrow_schema = probe.schema().clone();
867 let metadata = probe.metadata().clone();
868 drop(probe);
869
870 let projection = if let Some(w) = &wanted {
872 let indices: Vec<usize> = arrow_schema
873 .fields()
874 .iter()
875 .enumerate()
876 .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
877 .map(|(i, _)| i)
878 .collect();
879 if indices.is_empty() {
880 return Err(AppError::Internal(format!(
881 "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
882 d.name,
883 d.columns,
884 f.display()
885 )));
886 }
887 ProjectionMask::roots(&parquet_schema, indices)
888 } else {
889 ProjectionMask::all()
890 };
891
892 let mut new_fields: Vec<Field> = arrow_schema
900 .fields()
901 .iter()
902 .map(|f| f.as_ref().clone())
903 .collect();
904 if d.dict_encode
905 && let Some(rg0) = metadata.row_groups().first()
906 {
907 for (i, fld) in arrow_schema.fields().iter().enumerate() {
908 if !matches!(
909 fld.data_type(),
910 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
911 ) {
912 continue;
913 }
914 if let Some(col) = rg0.columns().get(i)
915 && col.dictionary_page_offset().is_some()
916 {
917 new_fields[i] = Field::new(
918 fld.name(),
919 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
920 fld.is_nullable(),
921 );
922 }
923 }
924 }
925 let forced_schema = Arc::new(Schema::new(new_fields));
926
927 let opts = ArrowReaderOptions::new().with_schema(forced_schema);
928 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
929 .with_batch_size(65_536)
930 .with_projection(projection)
931 .build()?;
932 let pairs = hive_pairs(f);
936 for batch in reader {
937 let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
938 all.push(if pairs.is_empty() {
939 batch
940 } else {
941 append_partition_cols(&batch, &pairs)?
942 });
943 }
944 }
945 if all.is_empty() {
946 return Err(AppError::Internal(format!(
947 "dataset '{}': parquet source is empty",
948 d.name
949 )));
950 }
951 Ok(all)
952}
953
954fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
957 path.components()
958 .filter_map(|c| c.as_os_str().to_str())
959 .filter_map(|seg| {
960 let (k, v) = seg.split_once('=')?;
961 if k.is_empty() || v.is_empty() || v.contains('=') {
962 return None;
963 }
964 Some((k.to_string(), v.to_string()))
965 })
966 .collect()
967}
968
969fn append_partition_cols(
972 batch: &RecordBatch,
973 pairs: &[(String, String)],
974) -> Result<RecordBatch, AppError> {
975 let n = batch.num_rows();
976 let mut fields: Vec<Field> = batch
977 .schema()
978 .fields()
979 .iter()
980 .map(|f| f.as_ref().clone())
981 .collect();
982 let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
983 for (k, v) in pairs {
984 if fields.iter().any(|f| f.name() == k) {
985 continue;
986 }
987 fields.push(Field::new(k, DataType::Utf8, false));
988 cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
989 }
990 RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
991 .map_err(|e| AppError::Internal(e.to_string()))
992}
993
994async fn read_s3_parquet(
998 d: &DatasetConfig,
999 ctx: &SessionContext,
1000) -> Result<Vec<RecordBatch>, AppError> {
1001 register_s3_object_store(d, ctx)?;
1002 let df = ctx
1003 .read_parquet(d.source.location.clone(), ParquetReadOptions::default())
1004 .await?;
1005 Ok(df.collect().await?)
1006}
1007
1008async fn read_delta(
1012 d: &DatasetConfig,
1013 opts: HashMap<String, String>,
1014) -> Result<Vec<RecordBatch>, AppError> {
1015 let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1016 AppError::Internal(format!(
1017 "dataset '{}': bad delta location '{}': {e}",
1018 d.name, d.source.location
1019 ))
1020 })?;
1021 let table = deltalake::open_table_with_storage_options(url, opts)
1022 .await
1023 .map_err(|e| {
1024 AppError::Internal(format!(
1025 "dataset '{}': delta open '{}': {e}",
1026 d.name, d.source.location
1027 ))
1028 })?;
1029 let provider = table.table_provider().await.map_err(|e| {
1030 AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1031 })?;
1032 let scan_ctx = SessionContext::new();
1035 let df = scan_ctx
1036 .read_table(provider)
1037 .map_err(|e| AppError::Internal(format!("dataset '{}': delta read_table: {e}", d.name)))?;
1038 Ok(df.collect().await?)
1039}
1040
1041fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1045 let creds = d.resolved_creds();
1046 let region = d.resolved_region();
1047 let s3 = d.s3.clone().unwrap_or_default();
1048
1049 let mut opts = HashMap::new();
1050 opts.insert("AWS_REGION".into(), region);
1051 if let Some(ep) = s3.endpoint.as_deref().filter(|s| !s.is_empty()) {
1052 opts.insert("AWS_ENDPOINT_URL".into(), ep.to_string());
1053 }
1054 if s3.allow_http {
1055 opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1056 }
1057 opts.insert(
1058 "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1059 (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1060 );
1061 if let Some(k) = creds.access_key_id {
1062 opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1063 }
1064 if let Some(s) = creds.secret_access_key {
1065 opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1066 }
1067 if let Some(t) = creds.session_token {
1068 opts.insert("AWS_SESSION_TOKEN".into(), t);
1069 }
1070 opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1072 Ok(opts)
1073}
1074
1075fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1079 let (bucket, _key) = d.source.s3_bucket()?;
1080 let creds = d.resolved_creds();
1081 let region = d.resolved_region();
1082 let s3 = d.s3.clone().unwrap_or_default();
1083
1084 let store = build_s3(bucket, ®ion, &s3, &creds).map_err(|e| {
1085 AppError::Internal(format!(
1086 "dataset '{}': build S3 store for '{bucket}': {e}",
1087 d.name
1088 ))
1089 })?;
1090
1091 let url = Url::parse(&format!("s3://{bucket}"))
1092 .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1093 ctx.register_object_store(&url, Arc::new(store));
1094 Ok(())
1095}
1096
1097fn build_s3(
1098 bucket: &str,
1099 region: &str,
1100 s3: &S3Config,
1101 creds: &ResolvedCreds,
1102) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1103 let mut b = AmazonS3Builder::new()
1104 .with_bucket_name(bucket)
1105 .with_region(region)
1106 .with_allow_http(s3.allow_http)
1107 .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1108 if let Some(ep) = s3.endpoint.as_deref().filter(|s| !s.is_empty()) {
1109 b = b.with_endpoint(ep);
1110 }
1111 if let Some(k) = creds.access_key_id.as_deref() {
1112 b = b.with_access_key_id(k);
1113 }
1114 if let Some(s) = creds.secret_access_key.as_deref() {
1115 b = b.with_secret_access_key(s);
1116 }
1117 if let Some(t) = creds.session_token.as_deref() {
1118 b = b.with_token(t);
1119 }
1120 b.build()
1121}
1122
1123fn arrow_to_logical(dt: &DataType) -> LogicalType {
1124 match dt {
1125 DataType::Boolean => LogicalType::Bool,
1126 DataType::Int8
1127 | DataType::Int16
1128 | DataType::Int32
1129 | DataType::Int64
1130 | DataType::UInt8
1131 | DataType::UInt16
1132 | DataType::UInt32
1133 | DataType::UInt64 => LogicalType::Int,
1134 DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1135 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1136 DataType::Dictionary(_, v)
1140 if matches!(
1141 v.as_ref(),
1142 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1143 ) =>
1144 {
1145 LogicalType::Utf8
1146 }
1147 DataType::Date32
1148 | DataType::Date64
1149 | DataType::Time32(_)
1150 | DataType::Time64(_)
1151 | DataType::Timestamp(_, _)
1152 | DataType::Duration(_)
1153 | DataType::Interval(_) => LogicalType::Temporal,
1154 _ => LogicalType::Other,
1155 }
1156}
1157
1158fn project(
1163 schema: &DatasetSchema,
1164 batch: RecordBatch,
1165 columns: &[String],
1166) -> Result<RecordBatch, AppError> {
1167 if columns.is_empty() {
1168 return Ok(batch);
1169 }
1170 let indices: Vec<usize> = columns
1171 .iter()
1172 .map(|c| {
1173 schema
1174 .find(c)
1175 .map(|info| schema.by_name[&info.name.to_lowercase()])
1176 })
1177 .collect::<Result<_, _>>()?;
1178 let fields: Vec<Field> = indices
1179 .iter()
1180 .map(|&i| batch.schema().field(i).clone())
1181 .collect();
1182 let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
1183 Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
1184}
1185
1186#[derive(Default)]
1199struct Params {
1200 values: Vec<ScalarValue>,
1201}
1202
1203impl Params {
1204 fn new() -> Self {
1205 Self::default()
1206 }
1207
1208 fn bind(&mut self, v: ScalarValue) -> String {
1210 self.values.push(v);
1211 format!("${}", self.values.len())
1212 }
1213
1214 fn into_values(self) -> Vec<ScalarValue> {
1215 self.values
1216 }
1217}
1218
1219fn build_query_sql(
1220 schema: &DatasetSchema,
1221 req: &QueryRequest,
1222 max_page_size: u64,
1223) -> Result<(String, Vec<ScalarValue>), AppError> {
1224 let (limit, offset) = req.effective_limit_offset(max_page_size);
1225 build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
1226}
1227
1228fn build_query_stream_sql(
1229 schema: &DatasetSchema,
1230 req: &QueryRequest,
1231) -> Result<(String, Vec<ScalarValue>), AppError> {
1232 let suffix = req
1233 .limit
1234 .map(|limit| format!(" LIMIT {limit}"))
1235 .unwrap_or_default();
1236 build_query_sql_with_suffix(schema, req, &suffix)
1237}
1238
1239fn build_query_sql_with_suffix(
1240 schema: &DatasetSchema,
1241 req: &QueryRequest,
1242 suffix: &str,
1243) -> Result<(String, Vec<ScalarValue>), AppError> {
1244 let agg_plan = req.agg_plan(schema)?;
1245
1246 let cols = if let Some(plan) = &agg_plan {
1247 let mut parts: Vec<String> = plan
1249 .group_cols
1250 .iter()
1251 .map(|c| DatasetSchema::quote_ident(c))
1252 .collect();
1253 for a in &plan.aggs {
1254 let expr = a.sql_expr()?;
1255 parts.push(format!(
1256 "{expr} AS {}",
1257 DatasetSchema::quote_ident(&a.alias)
1258 ));
1259 }
1260 parts.join(", ")
1261 } else if req.columns.is_empty() {
1262 if req.distinct {
1263 "DISTINCT *".to_string()
1264 } else {
1265 "*".to_string()
1266 }
1267 } else {
1268 let list = req
1269 .columns
1270 .iter()
1271 .map(|c| {
1272 schema
1273 .find(c)
1274 .map(|info| DatasetSchema::quote_ident(&info.name))
1275 })
1276 .collect::<Result<Vec<_>, _>>()?
1277 .join(", ");
1278 if req.distinct {
1279 format!("DISTINCT {list}")
1280 } else {
1281 list
1282 }
1283 };
1284
1285 let mut params = Params::new();
1286 let clauses: Vec<String> = req
1287 .predicates
1288 .iter()
1289 .map(|p| pred_to_sql(schema, p, &mut params))
1290 .collect::<Result<_, _>>()?;
1291
1292 let table = DatasetSchema::quote_ident(&schema.name);
1293 let where_clause = if clauses.is_empty() {
1294 String::new()
1295 } else {
1296 format!(" WHERE {}", clauses.join(" AND "))
1297 };
1298 let group_clause = match &agg_plan {
1299 Some(p) => format!(
1300 " GROUP BY {}",
1301 p.group_cols
1302 .iter()
1303 .map(|c| DatasetSchema::quote_ident(c))
1304 .collect::<Vec<_>>()
1305 .join(", "),
1306 ),
1307 None => String::new(),
1308 };
1309 let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
1310 Some(s) => format!(" ORDER BY {s}"),
1311 None => String::new(),
1312 };
1313 let sql =
1314 format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{order_clause}{suffix}");
1315 Ok((sql, params.into_values()))
1316}
1317
1318fn build_count_sql(
1319 schema: &DatasetSchema,
1320 predicates: &[Predicate],
1321) -> Result<(String, Vec<ScalarValue>), AppError> {
1322 let mut params = Params::new();
1323 let clauses: Vec<String> = predicates
1324 .iter()
1325 .map(|p| pred_to_sql(schema, p, &mut params))
1326 .collect::<Result<_, _>>()?;
1327 let table = DatasetSchema::quote_ident(&schema.name);
1328 let where_clause = if clauses.is_empty() {
1329 String::new()
1330 } else {
1331 format!(" WHERE {}", clauses.join(" AND "))
1332 };
1333 let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
1334 Ok((sql, params.into_values()))
1335}
1336
1337fn pred_to_sql(
1338 schema: &DatasetSchema,
1339 pred: &Predicate,
1340 params: &mut Params,
1341) -> Result<String, AppError> {
1342 let info = schema.find(&pred.col)?;
1343 let col = DatasetSchema::quote_ident(&info.name);
1344
1345 match pred.op.as_str() {
1346 "is_null" => return Ok(format!("{col} IS NULL")),
1347 "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
1348 _ => {}
1349 }
1350
1351 let val = pred
1352 .val
1353 .as_ref()
1354 .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
1355
1356 if pred.op == "in" {
1357 let items = val
1358 .as_array()
1359 .filter(|a| !a.is_empty())
1360 .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
1361 let placeholders: Vec<String> = items
1362 .iter()
1363 .map(|item| Ok(params.bind(json_to_scalar(item)?)))
1364 .collect::<Result<_, AppError>>()?;
1365 return Ok(format!("{col} IN ({})", placeholders.join(", ")));
1366 }
1367
1368 let sql_op = match pred.op.as_str() {
1369 "eq" => "=",
1370 "neq" => "!=",
1371 "gt" => ">",
1372 "gte" => ">=",
1373 "lt" => "<",
1374 "lte" => "<=",
1375 "like" => "LIKE",
1376 "ilike" => "ILIKE",
1377 other => return Err(AppError::UnknownOperator(other.into())),
1378 };
1379 let placeholder = params.bind(json_to_scalar(val)?);
1380 Ok(format!("{col} {sql_op} {placeholder}"))
1381}
1382
1383fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
1387 match val {
1388 JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
1389 JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
1390 JsonValue::Null => Ok(ScalarValue::Null),
1391 JsonValue::Number(n) => {
1392 if let Some(i) = n.as_i64() {
1393 Ok(ScalarValue::Int64(Some(i)))
1394 } else if let Some(u) = n.as_u64() {
1395 Ok(ScalarValue::UInt64(Some(u)))
1396 } else if let Some(f) = n.as_f64() {
1397 Ok(ScalarValue::Float64(Some(f)))
1398 } else {
1399 Err(AppError::InvalidValue(
1400 "unsupported numeric literal in predicate".into(),
1401 ))
1402 }
1403 }
1404 _ => Err(AppError::InvalidValue(
1405 "unsupported literal type in predicate".into(),
1406 )),
1407 }
1408}
1409
1410fn json_index_key(val: &JsonValue) -> Option<String> {
1415 match val {
1416 JsonValue::String(s) => Some(s.clone()),
1417 JsonValue::Number(n) => Some(n.to_string()),
1418 JsonValue::Bool(b) => Some(b.to_string()),
1419 _ => None,
1420 }
1421}
1422
1423fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1424 let mut out = Vec::new();
1425 let (mut i, mut j) = (0, 0);
1426 while i < a.len() && j < b.len() {
1427 match a[i].cmp(&b[j]) {
1428 Ordering::Equal => {
1429 out.push(a[i]);
1430 i += 1;
1431 j += 1;
1432 }
1433 Ordering::Less => i += 1,
1434 Ordering::Greater => j += 1,
1435 }
1436 }
1437 out
1438}
1439
1440fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1441 let mut out = Vec::with_capacity(a.len() + b.len());
1442 let (mut i, mut j) = (0, 0);
1443 while i < a.len() && j < b.len() {
1444 match a[i].cmp(&b[j]) {
1445 Ordering::Less => {
1446 out.push(a[i]);
1447 i += 1;
1448 }
1449 Ordering::Greater => {
1450 out.push(b[j]);
1451 j += 1;
1452 }
1453 Ordering::Equal => {
1454 out.push(a[i]);
1455 i += 1;
1456 j += 1;
1457 }
1458 }
1459 }
1460 out.extend_from_slice(&a[i..]);
1461 out.extend_from_slice(&b[j..]);
1462 out
1463}
1464
1465fn try_index(index: &EqIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
1466 if predicates.is_empty() || index.is_empty() {
1467 return None;
1468 }
1469
1470 let mut result: Option<Vec<u32>> = None;
1471 for pred in predicates {
1472 let col_lower = pred.col.to_lowercase();
1473 let col_map = index.get(&col_lower)?;
1474
1475 let rows: Vec<u32> = match pred.op.as_str() {
1476 "eq" => {
1477 let key = json_index_key(pred.val.as_ref()?)?;
1478 col_map.get(&key).cloned().unwrap_or_default()
1479 }
1480 "in" => {
1481 let items = pred.val.as_ref()?.as_array()?;
1482 let mut merged: Vec<u32> = Vec::new();
1483 for item in items {
1484 if let Some(r) = col_map.get(&json_index_key(item)?) {
1485 merged = union_sorted(&merged, r);
1486 }
1487 }
1488 merged
1489 }
1490 _ => return None,
1491 };
1492
1493 result = Some(match result {
1494 None => rows,
1495 Some(r) => intersect_sorted(&r, &rows),
1496 });
1497 }
1498 result
1499}
1500
1501fn slice_global(
1504 chunks: &[RecordBatch],
1505 schema: &Arc<Schema>,
1506 offset: usize,
1507 limit: usize,
1508) -> Result<RecordBatch, AppError> {
1509 if limit == 0 || chunks.is_empty() {
1510 return Ok(RecordBatch::new_empty(schema.clone()));
1511 }
1512 let mut out = Vec::new();
1513 let mut to_skip = offset;
1514 let mut remaining = limit;
1515 for b in chunks {
1516 if remaining == 0 {
1517 break;
1518 }
1519 let n = b.num_rows();
1520 if to_skip >= n {
1521 to_skip -= n;
1522 continue;
1523 }
1524 let take = remaining.min(n - to_skip);
1525 out.push(b.slice(to_skip, take));
1526 to_skip = 0;
1527 remaining -= take;
1528 }
1529 if out.is_empty() {
1530 return Ok(RecordBatch::new_empty(schema.clone()));
1531 }
1532 compute::concat_batches(schema, out.iter()).map_err(AppError::from)
1533}
1534
1535fn take_page(
1540 chunks: &[RecordBatch],
1541 schema: &Arc<Schema>,
1542 rows: &[u32],
1543 offset: usize,
1544 limit: usize,
1545) -> Result<RecordBatch, AppError> {
1546 let start = offset.min(rows.len());
1547 let len = limit.min(rows.len() - start);
1548 if len == 0 || chunks.is_empty() {
1549 return Ok(RecordBatch::new_empty(schema.clone()));
1550 }
1551
1552 let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
1555 let mut acc: u32 = 0;
1556 offsets.push(0);
1557 for b in chunks {
1558 acc = acc
1559 .checked_add(b.num_rows() as u32)
1560 .expect("row count exceeds u32::MAX");
1561 offsets.push(acc);
1562 }
1563
1564 let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
1567 for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
1568 let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
1569 let local = gid - offsets[bi];
1570 buckets[bi].push((out_pos as u32, local));
1571 }
1572
1573 let mut takens: Vec<RecordBatch> = Vec::new();
1575 let mut dest: Vec<u32> = Vec::with_capacity(len);
1576 for (bi, bucket) in buckets.iter().enumerate() {
1577 if bucket.is_empty() {
1578 continue;
1579 }
1580 let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
1581 let cols: Vec<ArrayRef> = chunks[bi]
1582 .columns()
1583 .iter()
1584 .map(|c| {
1585 arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
1586 .map_err(AppError::from)
1587 })
1588 .collect::<Result<_, _>>()?;
1589 takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
1590 dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
1591 }
1592
1593 let stitched = compute::concat_batches(schema, takens.iter())?;
1595 let mut inv = vec![0u32; len];
1596 for (i, &d) in dest.iter().enumerate() {
1597 inv[d as usize] = i as u32;
1598 }
1599 let perm = UInt32Array::from(inv);
1600 let cols: Vec<ArrayRef> = stitched
1601 .columns()
1602 .iter()
1603 .map(|c| {
1604 arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
1605 .map_err(AppError::from)
1606 })
1607 .collect::<Result<_, _>>()?;
1608 RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
1609}
1610
1611fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
1615 use rayon::prelude::*;
1616
1617 if cfg.mode == IndexMode::None || chunks.is_empty() {
1618 return EqIndex::new();
1619 }
1620
1621 let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
1622 Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
1623 } else {
1624 None
1625 };
1626
1627 let max_card = if cfg.mode == IndexMode::Auto {
1628 Some(cfg.max_cardinality)
1629 } else {
1630 None
1631 };
1632
1633 let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
1635 let mut acc: u32 = 0;
1636 for b in chunks {
1637 batch_offsets.push(acc);
1638 acc = acc
1639 .checked_add(b.num_rows() as u32)
1640 .expect("row count exceeds u32::MAX");
1641 }
1642
1643 let schema = chunks[0].schema();
1644
1645 schema
1646 .fields()
1647 .par_iter()
1648 .enumerate()
1649 .filter_map(|(ci, field)| {
1650 let col_lower = field.name().to_lowercase();
1651 if let Some(a) = &allow
1652 && !a.contains_key(&col_lower)
1653 {
1654 return None;
1655 }
1656
1657 let dtype = field.data_type();
1660 let dict_utf8 = matches!(dtype,
1661 DataType::Dictionary(k, v)
1662 if matches!(k.as_ref(), DataType::Int32)
1663 && matches!(v.as_ref(), DataType::Utf8));
1664 match dtype {
1665 DataType::Utf8
1666 | DataType::Utf8View
1667 | DataType::Boolean
1668 | DataType::Int8
1669 | DataType::Int16
1670 | DataType::Int32
1671 | DataType::Int64 => {}
1672 _ if dict_utf8 => {}
1673 _ => return None,
1674 }
1675
1676 let mut map: HashMap<String, Vec<u32>> = HashMap::new();
1677
1678 for (bi, batch) in chunks.iter().enumerate() {
1679 let base = batch_offsets[bi];
1680 let col = batch.column(ci);
1681
1682 macro_rules! index_col {
1683 ($arr_ty:ty) => {{
1684 let arr = col.as_any().downcast_ref::<$arr_ty>()?;
1685 for row in 0..arr.len() {
1686 if arr.is_null(row) {
1687 continue;
1688 }
1689 let key = arr.value(row).to_string();
1690 let gid = base + row as u32;
1691 if let Some(v) = map.get_mut(&key) {
1692 v.push(gid);
1693 } else {
1694 if let Some(mc) = max_card {
1695 if map.len() >= mc {
1696 return None;
1697 }
1698 }
1699 map.insert(key, vec![gid]);
1700 }
1701 }
1702 }};
1703 }
1704
1705 if dict_utf8 {
1706 let arr = col
1713 .as_any()
1714 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
1715 )?;
1716 let keys = arr.keys();
1717 let values = arr.values().as_any().downcast_ref::<StringArray>()?;
1718 for row in 0..arr.len() {
1719 if arr.is_null(row) {
1720 continue;
1721 }
1722 let k = keys.value(row) as usize;
1723 let s = values.value(k);
1724 let gid = base + row as u32;
1725 if let Some(v) = map.get_mut(s) {
1726 v.push(gid);
1727 } else {
1728 if let Some(mc) = max_card
1729 && map.len() >= mc
1730 {
1731 return None;
1732 }
1733 map.insert(s.to_string(), vec![gid]);
1734 }
1735 }
1736 } else {
1737 match dtype {
1738 DataType::Utf8 => index_col!(StringArray),
1739 DataType::Utf8View => index_col!(StringViewArray),
1740 DataType::Boolean => index_col!(BooleanArray),
1741 DataType::Int8 => index_col!(Int8Array),
1742 DataType::Int16 => index_col!(Int16Array),
1743 DataType::Int32 => index_col!(Int32Array),
1744 DataType::Int64 => index_col!(Int64Array),
1745 _ => unreachable!(),
1746 }
1747 }
1748 }
1749
1750 Some((col_lower, map))
1751 })
1752 .collect()
1753}
1754
1755fn writable_inline(dt: &DataType) -> bool {
1768 match dt {
1769 DataType::Utf8
1770 | DataType::LargeUtf8
1771 | DataType::Utf8View
1772 | DataType::Boolean
1773 | DataType::Int8
1774 | DataType::Int16
1775 | DataType::Int32
1776 | DataType::Int64
1777 | DataType::UInt8
1778 | DataType::UInt16
1779 | DataType::UInt32
1780 | DataType::UInt64
1781 | DataType::Float32
1782 | DataType::Float64
1783 | DataType::Decimal128(_, _)
1784 | DataType::Decimal256(_, _) => true,
1785 DataType::Dictionary(k, v)
1786 if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
1787 {
1788 true
1789 }
1790 _ => false,
1791 }
1792}
1793
1794fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
1800 let schema = batch.schema();
1801 let to_cast: Vec<usize> = schema
1802 .fields()
1803 .iter()
1804 .enumerate()
1805 .filter_map(|(i, f)| {
1806 if writable_inline(f.data_type()) {
1807 None
1808 } else {
1809 Some(i)
1810 }
1811 })
1812 .collect();
1813 if to_cast.is_empty() {
1814 return Ok(batch.clone());
1815 }
1816 let new_fields: Vec<Field> = schema
1817 .fields()
1818 .iter()
1819 .enumerate()
1820 .map(|(i, f)| {
1821 if to_cast.contains(&i) {
1822 Field::new(f.name(), DataType::Utf8, f.is_nullable())
1823 } else {
1824 f.as_ref().clone()
1825 }
1826 })
1827 .collect();
1828 let new_schema = Arc::new(Schema::new(new_fields));
1829 let cols: Vec<ArrayRef> = batch
1830 .columns()
1831 .iter()
1832 .enumerate()
1833 .map(|(i, c)| {
1834 if to_cast.contains(&i) {
1835 compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
1836 } else {
1837 Ok(c.clone())
1838 }
1839 })
1840 .collect::<Result<_, _>>()?;
1841 RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
1842}
1843
1844#[allow(dead_code)]
1850#[derive(Clone, Copy)]
1851enum CmpOp {
1852 Eq,
1853 Neq,
1854 Gt,
1855 Gte,
1856 Lt,
1857 Lte,
1858 Like,
1859 ILike,
1860}
1861
1862#[allow(dead_code)]
1863fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
1864 let arr = col
1865 .as_any()
1866 .downcast_ref::<StringArray>()
1867 .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
1868 let s = Scalar::new(StringArray::from(vec![val]));
1869 Ok(eq(arr, &s)?)
1870}
1871
1872#[allow(dead_code)]
1873fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
1874 macro_rules! num_cmp {
1875 ($arr_type:ty, $cast:ty) => {{
1876 let n = val
1877 .as_f64()
1878 .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
1879 as $cast;
1880 let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
1881 let s = Scalar::new(<$arr_type>::from(vec![n]));
1882 Ok(match op {
1883 CmpOp::Eq => eq(arr, &s)?,
1884 CmpOp::Neq => neq(arr, &s)?,
1885 CmpOp::Gt => gt(arr, &s)?,
1886 CmpOp::Gte => gt_eq(arr, &s)?,
1887 CmpOp::Lt => lt(arr, &s)?,
1888 CmpOp::Lte => lt_eq(arr, &s)?,
1889 CmpOp::Like | CmpOp::ILike => {
1890 return Err(AppError::InvalidValue(
1891 "LIKE requires a string column".into(),
1892 ));
1893 }
1894 })
1895 }};
1896 }
1897 match col.data_type() {
1898 DataType::Utf8 => {
1899 let s = val
1900 .as_str()
1901 .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
1902 let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1903 let sc = Scalar::new(StringArray::from(vec![s]));
1904 Ok(match op {
1905 CmpOp::Eq => eq(arr, &sc)?,
1906 CmpOp::Neq => neq(arr, &sc)?,
1907 CmpOp::Gt => gt(arr, &sc)?,
1908 CmpOp::Gte => gt_eq(arr, &sc)?,
1909 CmpOp::Lt => lt(arr, &sc)?,
1910 CmpOp::Lte => lt_eq(arr, &sc)?,
1911 CmpOp::Like => compute::like(arr, &sc)?,
1912 CmpOp::ILike => compute::ilike(arr, &sc)?,
1913 })
1914 }
1915 DataType::Int8 => num_cmp!(Int8Array, i8),
1916 DataType::Int16 => num_cmp!(Int16Array, i16),
1917 DataType::Int32 => num_cmp!(Int32Array, i32),
1918 DataType::Int64 => num_cmp!(Int64Array, i64),
1919 DataType::Float32 => num_cmp!(Float32Array, f32),
1920 DataType::Float64 => num_cmp!(Float64Array, f64),
1921 dt => Err(AppError::InvalidValue(format!(
1922 "unsupported type for comparison: {dt:?}"
1923 ))),
1924 }
1925}
1926
1927pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
1932 let batch = cast_for_serialize(batch)?;
1937 let schema = batch.schema();
1938 let n_rows = batch.num_rows();
1939
1940 let keys: Vec<Vec<u8>> = schema
1941 .fields()
1942 .iter()
1943 .map(|f| {
1944 let mut k = Vec::with_capacity(f.name().len() + 3);
1945 k.push(b'"');
1946 k.extend_from_slice(f.name().as_bytes());
1947 k.extend_from_slice(b"\":");
1948 k
1949 })
1950 .collect();
1951
1952 let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
1953 buf.push(b'[');
1954
1955 for row in 0..n_rows {
1956 if row > 0 {
1957 buf.push(b',');
1958 }
1959 buf.push(b'{');
1960 for (i, key) in keys.iter().enumerate() {
1961 if i > 0 {
1962 buf.push(b',');
1963 }
1964 buf.extend_from_slice(key);
1965 let col = batch.column(i);
1966 if col.is_null(row) {
1967 buf.extend_from_slice(b"null");
1968 } else {
1969 write_value(&mut buf, col.as_ref(), row);
1970 }
1971 }
1972 buf.push(b'}');
1973 }
1974
1975 buf.push(b']');
1976 Ok(unsafe { String::from_utf8_unchecked(buf) })
1977}
1978
1979#[inline]
1980fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
1981 match col.data_type() {
1982 DataType::Utf8 => write_str(
1983 buf,
1984 col.as_any()
1985 .downcast_ref::<StringArray>()
1986 .unwrap()
1987 .value(row),
1988 ),
1989 DataType::LargeUtf8 => write_str(
1990 buf,
1991 col.as_any()
1992 .downcast_ref::<LargeStringArray>()
1993 .unwrap()
1994 .value(row),
1995 ),
1996 DataType::Utf8View => write_str(
1997 buf,
1998 col.as_any()
1999 .downcast_ref::<StringViewArray>()
2000 .unwrap()
2001 .value(row),
2002 ),
2003 DataType::Dictionary(key, value)
2004 if matches!(key.as_ref(), DataType::Int32)
2005 && matches!(value.as_ref(), DataType::Utf8) =>
2006 {
2007 let dict = col
2008 .as_any()
2009 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
2010 .unwrap();
2011 let keys = dict.keys();
2012 let values = dict
2013 .values()
2014 .as_any()
2015 .downcast_ref::<StringArray>()
2016 .unwrap();
2017 let k = keys.value(row) as usize;
2018 write_str(buf, values.value(k));
2019 }
2020 DataType::Boolean => {
2021 let v = col
2022 .as_any()
2023 .downcast_ref::<BooleanArray>()
2024 .unwrap()
2025 .value(row);
2026 buf.extend_from_slice(if v { b"true" } else { b"false" });
2027 }
2028 DataType::Int8 => {
2029 let mut b = itoa::Buffer::new();
2030 buf.extend_from_slice(
2031 b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
2032 .as_bytes(),
2033 );
2034 }
2035 DataType::Int16 => {
2036 let mut b = itoa::Buffer::new();
2037 buf.extend_from_slice(
2038 b.format(
2039 col.as_any()
2040 .downcast_ref::<Int16Array>()
2041 .unwrap()
2042 .value(row),
2043 )
2044 .as_bytes(),
2045 );
2046 }
2047 DataType::Int32 => {
2048 let mut b = itoa::Buffer::new();
2049 buf.extend_from_slice(
2050 b.format(
2051 col.as_any()
2052 .downcast_ref::<Int32Array>()
2053 .unwrap()
2054 .value(row),
2055 )
2056 .as_bytes(),
2057 );
2058 }
2059 DataType::Int64 => {
2060 let mut b = itoa::Buffer::new();
2061 buf.extend_from_slice(
2062 b.format(
2063 col.as_any()
2064 .downcast_ref::<Int64Array>()
2065 .unwrap()
2066 .value(row),
2067 )
2068 .as_bytes(),
2069 );
2070 }
2071 DataType::UInt8 => {
2072 let mut b = itoa::Buffer::new();
2073 buf.extend_from_slice(
2074 b.format(
2075 col.as_any()
2076 .downcast_ref::<UInt8Array>()
2077 .unwrap()
2078 .value(row),
2079 )
2080 .as_bytes(),
2081 );
2082 }
2083 DataType::UInt16 => {
2084 let mut b = itoa::Buffer::new();
2085 buf.extend_from_slice(
2086 b.format(
2087 col.as_any()
2088 .downcast_ref::<UInt16Array>()
2089 .unwrap()
2090 .value(row),
2091 )
2092 .as_bytes(),
2093 );
2094 }
2095 DataType::UInt32 => {
2096 let mut b = itoa::Buffer::new();
2097 buf.extend_from_slice(
2098 b.format(
2099 col.as_any()
2100 .downcast_ref::<UInt32Array>()
2101 .unwrap()
2102 .value(row),
2103 )
2104 .as_bytes(),
2105 );
2106 }
2107 DataType::UInt64 => {
2108 let mut b = itoa::Buffer::new();
2109 buf.extend_from_slice(
2110 b.format(
2111 col.as_any()
2112 .downcast_ref::<UInt64Array>()
2113 .unwrap()
2114 .value(row),
2115 )
2116 .as_bytes(),
2117 );
2118 }
2119 DataType::Decimal128(_, _) => {
2120 let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
2121 write_str(buf, &arr.value_as_string(row));
2122 }
2123 DataType::Decimal256(_, _) => {
2124 let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
2125 write_str(buf, &arr.value_as_string(row));
2126 }
2127 DataType::Float32 => {
2128 let v = col
2129 .as_any()
2130 .downcast_ref::<Float32Array>()
2131 .unwrap()
2132 .value(row);
2133 if v.is_finite() {
2134 let mut b = ryu::Buffer::new();
2135 buf.extend_from_slice(b.format_finite(v).as_bytes());
2136 } else {
2137 buf.extend_from_slice(b"null");
2138 }
2139 }
2140 DataType::Float64 => {
2141 let v = col
2142 .as_any()
2143 .downcast_ref::<Float64Array>()
2144 .unwrap()
2145 .value(row);
2146 if v.is_finite() {
2147 let mut b = ryu::Buffer::new();
2148 buf.extend_from_slice(b.format_finite(v).as_bytes());
2149 } else {
2150 buf.extend_from_slice(b"null");
2151 }
2152 }
2153 other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
2158 }
2159}
2160
2161#[inline]
2162fn write_str(buf: &mut Vec<u8>, s: &str) {
2163 buf.push(b'"');
2164 for &byte in s.as_bytes() {
2165 match byte {
2166 b'"' => buf.extend_from_slice(b"\\\""),
2167 b'\\' => buf.extend_from_slice(b"\\\\"),
2168 b'\n' => buf.extend_from_slice(b"\\n"),
2169 b'\r' => buf.extend_from_slice(b"\\r"),
2170 b'\t' => buf.extend_from_slice(b"\\t"),
2171 0x00..=0x1f => {
2172 buf.extend_from_slice(b"\\u00");
2173 const HEX: &[u8] = b"0123456789abcdef";
2174 buf.push(HEX[(byte >> 4) as usize]);
2175 buf.push(HEX[(byte & 0xf) as usize]);
2176 }
2177 b => buf.push(b),
2178 }
2179 }
2180 buf.push(b'"');
2181}
2182
2183#[async_trait]
2188impl Backend for Store {
2189 fn names(&self) -> Vec<String> {
2190 Store::names(self)
2191 }
2192
2193 fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
2194 let st = self.dataset(name)?;
2195 Ok(DatasetSummary {
2196 name: st.schema.name.clone(),
2197 columns: st.schema.columns.len(),
2198 rows: st.num_rows(),
2199 })
2200 }
2201
2202 fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
2203 let st = self.dataset(name)?;
2204 Ok(Arc::new(st.schema.clone()))
2205 }
2206
2207 fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
2208 let st = self.dataset(name)?;
2209 let mut cols: Vec<String> = st
2212 .schema
2213 .columns
2214 .iter()
2215 .map(|c| c.name.clone())
2216 .filter(|n| st.index.contains_key(n))
2217 .collect();
2218 let mut extras: Vec<String> = st
2221 .index
2222 .keys()
2223 .filter(|n| !cols.iter().any(|c| c == *n))
2224 .cloned()
2225 .collect();
2226 extras.sort();
2227 cols.extend(extras);
2228 Ok(cols)
2229 }
2230
2231 async fn sample(&self, name: &str) -> Result<String, AppError> {
2232 Store::sample(self, name).await
2233 }
2234
2235 async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
2236 Store::query(self, name, req).await
2237 }
2238
2239 async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
2240 Store::query_arrow(self, name, req).await
2241 }
2242
2243 async fn query_arrow_stream(
2244 &self,
2245 name: &str,
2246 req: &QueryRequest,
2247 ) -> Result<ArrowIpcStream, AppError> {
2248 Store::query_arrow_stream(self, name, req).await
2249 }
2250
2251 async fn query_arrow_stream_all(
2252 &self,
2253 name: &str,
2254 req: &QueryRequest,
2255 ) -> Result<ArrowIpcStream, AppError> {
2256 Store::query_arrow_stream_all(self, name, req).await
2257 }
2258
2259 async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
2260 Store::count(self, name, req).await
2261 }
2262
2263 async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
2264 Store::reload(self, name).await
2265 }
2266}