1use std::borrow::Cow;
2use std::cmp::Ordering;
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex, RwLock};
5use std::time::Duration;
6
7use arc_swap::ArcSwap;
8use arrow::array::{
9 Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
10 Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
11 StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
12};
13use arrow::compute;
14use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
15use arrow::datatypes::{DataType, Field, Schema};
16use async_trait::async_trait;
17use parquet::arrow::ProjectionMask;
18use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
19use serde_json::Value as JsonValue;
20
21use datafusion::datasource::file_format::parquet::ParquetFormat;
22use datafusion::datasource::listing::{
23 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
24};
25use datafusion::datasource::{MemTable, TableProvider};
26use datafusion::execution::cache::DefaultListFilesCache;
27use datafusion::execution::cache::cache_manager::CacheManagerConfig;
28use datafusion::execution::runtime_env::RuntimeEnvBuilder;
29use datafusion::prelude::{SessionConfig, SessionContext};
30use datafusion::scalar::ScalarValue;
31
32use object_store::aws::AmazonS3Builder;
33use url::Url;
34
35use datapress_core::backend::{
36 ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
37};
38use datapress_core::config::{
39 AddressingStyle, AppConfig, DataFusionConfig, DatasetConfig, IndexConfig, IndexMode,
40 Partitioning, ResolvedCreds, S3Config, ServerConfig, SourceKind,
41};
42use datapress_core::errors::AppError;
43use datapress_core::models::{CountRequest, Predicate, QueryRequest};
44use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
45
46type FastMap<K, V> = HashMap<K, V, ahash::RandomState>;
55
56type EqIndex = FastMap<String, FastMap<String, Vec<u32>>>;
58
59pub struct DatasetState {
74 pub schema: DatasetSchema,
75 pub data: Vec<RecordBatch>,
76 pub arrow_schema: Arc<Schema>,
77 pub index: EqIndex,
78 pub lazy: bool,
79}
80
81impl DatasetState {
82 pub fn num_rows(&self) -> usize {
84 self.data.iter().map(|b| b.num_rows()).sum()
85 }
86}
87
88pub struct Store {
93 ctx: SessionContext,
94 max_page_size: u64,
95 configs: RwLock<HashMap<String, DatasetConfig>>,
100 datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
102 reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
105}
106
107impl Store {
108 pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
110 if cfg
113 .datasets
114 .iter()
115 .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
116 {
117 deltalake::aws::register_handlers(None);
118 }
119
120 let ctx = build_tuned_context(&cfg.datafusion);
126 let mut datasets = HashMap::with_capacity(cfg.datasets.len());
127 let mut configs = HashMap::with_capacity(cfg.datasets.len());
128
129 for d in &cfg.datasets {
130 log::info!(
131 "Loading dataset '{}' ({} @ {})",
132 d.name,
133 d.source.kind.as_str(),
134 d.source.location
135 );
136 let d: std::borrow::Cow<'_, DatasetConfig> = match should_force_lazy(d, &cfg.server)
140 .await
141 {
142 Some(bytes) => {
143 log::info!(
144 "dataset '{}': {:.1} MiB exceeds force_lazy_above_mb = {} → forcing lazy",
145 d.name,
146 bytes as f64 / (1024.0 * 1024.0),
147 cfg.server.force_lazy_above_mb
148 );
149 let mut forced = d.clone();
150 forced.lazy = true;
151 std::borrow::Cow::Owned(forced)
152 }
153 None => std::borrow::Cow::Borrowed(d),
154 };
155 let d = d.as_ref();
156 let (state, provider) = match build_dataset(d, &ctx).await {
157 Ok(built) => built,
158 Err(AppError::EmptyDataset(msg)) => {
159 log::warn!("skipping empty dataset '{}': {msg}", d.name);
160 continue;
161 }
162 Err(e) if d.source.is_s3() && is_s3_access_denied(&e.to_string()) => {
168 log::warn!(
169 "skipping dataset '{}': S3 access denied — check credentials \
170 and bucket policy ({e})",
171 d.name
172 );
173 continue;
174 }
175 Err(e) => return Err(e),
176 };
177 ctx.register_table(d.name.as_str(), provider)?;
178 datasets.insert(d.name.clone(), Arc::new(state));
179 configs.insert(d.name.clone(), d.clone());
180 }
181 Ok(Self {
182 ctx,
183 max_page_size: cfg.server.max_page_size.max(1),
184 configs: RwLock::new(configs),
185 datasets: ArcSwap::from_pointee(datasets),
186 reload_locks: Mutex::new(HashMap::new()),
187 })
188 }
189
190 pub fn names(&self) -> Vec<String> {
192 let snap = self.datasets.load();
193 let mut v: Vec<String> = snap.keys().cloned().collect();
194 v.sort();
195 v
196 }
197
198 pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
199 self.datasets
200 .load()
201 .get(name)
202 .cloned()
203 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
204 }
205
206 pub async fn sample(&self, name: &str) -> Result<String, AppError> {
209 let st = self.dataset(name)?;
210
211 if st.lazy {
213 let table = DatasetSchema::quote_ident(&st.schema.name);
214 let sql = format!("SELECT * FROM {table} LIMIT 1");
215 let df = self.ctx.sql(&sql).await?;
216 let batches = df.collect().await?;
217 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
218 return Ok("null".into());
219 }
220 let arr = serialize(&batches[0].slice(0, 1))?;
221 let trimmed = arr.trim();
222 let inner = trimmed
223 .strip_prefix('[')
224 .and_then(|s| s.strip_suffix(']'))
225 .unwrap_or(trimmed);
226 return Ok(inner.to_string());
227 }
228
229 let first = match st.data.iter().find(|b| b.num_rows() > 0) {
230 Some(b) => b,
231 None => return Ok("null".into()),
232 };
233 let arr = serialize(&first.slice(0, 1))?;
234 let trimmed = arr.trim();
236 let inner = trimmed
237 .strip_prefix('[')
238 .and_then(|s| s.strip_suffix(']'))
239 .unwrap_or(trimmed);
240 Ok(inner.to_string())
241 }
242
243 pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
248 let cfg = self
250 .configs
251 .read()
252 .unwrap()
253 .get(name)
254 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
255 .clone();
256
257 let lock = {
259 let mut locks = self.reload_locks.lock().unwrap();
260 locks
261 .entry(name.to_string())
262 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
263 .clone()
264 };
265 let _guard = lock.lock().await;
266
267 let started = std::time::Instant::now();
268
269 if let Some(cache) = self.ctx.runtime_env().cache_manager.get_list_files_cache() {
285 cache.clear();
286 }
287
288 let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
291 let rows = state.num_rows();
292
293 let _ = self.ctx.deregister_table(name)?;
299 self.ctx.register_table(name, provider)?;
300
301 let mut new_map = (**self.datasets.load()).clone();
302 new_map.insert(name.to_string(), Arc::new(state));
303 self.datasets.store(Arc::new(new_map));
304
305 let elapsed_ms = started.elapsed().as_millis();
306 log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
307 Ok(ReloadStats { rows, elapsed_ms })
308 }
309
310 pub async fn register(&self, cfg: DatasetConfig) -> Result<DatasetSummary, AppError> {
314 cfg.validate_for_register()?;
315
316 if self.datasets.load().contains_key(&cfg.name) {
318 return Err(AppError::InvalidValue(format!(
319 "dataset '{}' already exists",
320 cfg.name
321 )));
322 }
323
324 let lock = {
326 let mut locks = self.reload_locks.lock().unwrap();
327 locks
328 .entry(cfg.name.clone())
329 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
330 .clone()
331 };
332 let _guard = lock.lock().await;
333
334 if self.datasets.load().contains_key(&cfg.name) {
336 return Err(AppError::InvalidValue(format!(
337 "dataset '{}' already exists",
338 cfg.name
339 )));
340 }
341
342 if cfg.source.kind == SourceKind::Delta && cfg.source.is_s3() {
345 deltalake::aws::register_handlers(None);
346 }
347
348 let started = std::time::Instant::now();
349 let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
350 let rows = state.num_rows();
351 let columns = state.schema.columns.len();
352
353 self.ctx.register_table(cfg.name.as_str(), provider)?;
354
355 let mut new_map = (**self.datasets.load()).clone();
356 new_map.insert(cfg.name.clone(), Arc::new(state));
357 self.datasets.store(Arc::new(new_map));
358 self.configs
359 .write()
360 .unwrap()
361 .insert(cfg.name.clone(), cfg.clone());
362
363 let elapsed_ms = started.elapsed().as_millis();
364 log::info!(
365 "registered dataset '{}' ({} @ {}): {rows} rows in {elapsed_ms} ms",
366 cfg.name,
367 cfg.source.kind.as_str(),
368 cfg.source.location
369 );
370 Ok(DatasetSummary {
371 name: cfg.name,
372 columns,
373 rows,
374 })
375 }
376
377 pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
381 let batch = self.query_batch(name, req).await?;
382 if batch.num_rows() == 0 {
383 return Ok("[]".to_string());
384 }
385 serialize(&batch)
386 }
387
388 fn canonicalize_sql(&self, sql: &str) -> String {
394 let snap = self.datasets.load();
395 let mut tables: HashMap<String, String> = HashMap::with_capacity(snap.len());
396 let mut columns: HashMap<String, String> = HashMap::new();
397 for (name, state) in snap.iter() {
398 tables.insert(name.to_lowercase(), name.clone());
399 for col in &state.schema.columns {
400 columns
401 .entry(col.name.to_lowercase())
402 .or_insert_with(|| col.name.clone());
403 }
404 }
405 datapress_core::sql::canonicalize_identifiers(sql, &tables, &columns)
406 }
407
408 pub async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
415 let cap = max_rows.max(1);
416 let sql = self.canonicalize_sql(sql);
417 let wrapped = if datapress_core::sql::is_describe(&sql) {
422 sql
423 } else {
424 format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
425 };
426 let df = self.ctx.sql(&wrapped).await?;
427 let batches = df.collect().await?;
428 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
429 return Ok("[]".to_string());
430 }
431 let batch = if batches.len() == 1 {
432 batches.into_iter().next().expect("checked len")
433 } else {
434 compute::concat_batches(&batches[0].schema(), batches.iter())?
435 };
436 let batch = if batch.num_rows() as u64 > cap {
439 batch.slice(0, cap as usize)
440 } else {
441 batch
442 };
443 serialize(&batch)
444 }
445
446 pub async fn query_sql_arrow_stream(
450 &self,
451 sql: &str,
452 max_rows: u64,
453 ) -> Result<ArrowIpcStream, AppError> {
454 let cap = max_rows.max(1);
455 let sql = self.canonicalize_sql(sql);
456 let wrapped = if datapress_core::sql::is_describe(&sql) {
460 sql
461 } else {
462 format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
463 };
464 let df = self.ctx.sql(&wrapped).await?;
465 let batches = df.collect().await?;
466 Ok(stream_arrow_batches(batches))
467 }
468
469 pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
473 let batch = self.query_batch(name, req).await?;
474 let schema = batch.schema();
475 let mut buf = Vec::with_capacity(8 * 1024);
476 {
477 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
478 if batch.num_rows() > 0 {
479 w.write(&batch)?;
480 }
481 w.finish()?;
482 }
483 Ok(buf)
484 }
485
486 pub async fn query_arrow_stream(
487 &self,
488 name: &str,
489 req: &QueryRequest,
490 ) -> Result<ArrowIpcStream, AppError> {
491 let batches = self.query_batches(name, req).await?;
492 Ok(stream_arrow_batches(batches))
493 } pub async fn query_arrow_stream_all(
494 &self,
495 name: &str,
496 req: &QueryRequest,
497 ) -> Result<ArrowIpcStream, AppError> {
498 let batches = self.query_batches_all(name, req).await?;
499 Ok(stream_arrow_batches(batches))
500 }
501
502 pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
510 let req = QueryRequest {
512 columns: Vec::new(),
513 predicates: Vec::new(),
514 group_by: Vec::new(),
515 aggregations: Vec::new(),
516 having: Vec::new(),
517 distinct: false,
518 order_by: Vec::new(),
519 limit: None,
520 page: 1,
521 page_size: 1,
522 };
523 let st = self.dataset(name)?;
524 let batches = self.query_batches_all(name, &req).await?;
525 let schema = batches
529 .first()
530 .map(|b| b.schema())
531 .unwrap_or_else(|| st.arrow_schema.clone());
532
533 let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
534 {
535 let props = parquet::file::properties::WriterProperties::builder()
536 .set_compression(parquet::basic::Compression::SNAPPY)
537 .build();
538 let mut writer =
539 parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
540 .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
541 for batch in &batches {
542 if batch.num_rows() > 0 {
543 writer
544 .write(batch)
545 .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
546 }
547 }
548 writer
549 .close()
550 .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
551 }
552 Ok(bytes::Bytes::from(buf))
553 }
554
555 async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
558 let batches = self.query_batches(name, req).await?;
559 if batches.is_empty() {
560 return Ok(RecordBatch::new_empty(Arc::new(
561 arrow::datatypes::Schema::empty(),
562 )));
563 }
564 if batches.len() == 1 {
565 return Ok(batches.into_iter().next().expect("checked len"));
566 }
567 if batches.iter().all(|b| b.num_rows() == 0) {
568 return Ok(RecordBatch::new_empty(batches[0].schema()));
569 }
570 let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
571 Ok(batch)
572 }
573
574 async fn query_batches(
578 &self,
579 name: &str,
580 req: &QueryRequest,
581 ) -> Result<Vec<RecordBatch>, AppError> {
582 let st = self.dataset(name)?;
583
584 let page = req.page.max(1);
585 let page_size = req.page_size.clamp(1, self.max_page_size);
586 let offset = ((page - 1) * page_size) as usize;
587 let limit = page_size as usize;
588
589 self.query_batches_inner(st, req, Some((offset, limit)))
590 .await
591 }
592
593 async fn query_batches_all(
597 &self,
598 name: &str,
599 req: &QueryRequest,
600 ) -> Result<Vec<RecordBatch>, AppError> {
601 let st = self.dataset(name)?;
602 self.query_batches_inner(st, req, None).await
603 }
604
605 async fn query_batches_inner(
606 &self,
607 st: Arc<DatasetState>,
608 req: &QueryRequest,
609 page_window: Option<(usize, usize)>,
610 ) -> Result<Vec<RecordBatch>, AppError> {
611 let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
612
613 let can_fast_path = !st.lazy
620 && req.order_by.is_empty()
621 && (page_window.is_none() || req.limit.is_none())
622 && req.group_by.is_empty()
623 && !req.distinct;
624
625 if can_fast_path {
626 let total = st.num_rows();
627
628 if req.predicates.is_empty() {
631 if page_window.is_none() && req.limit.is_none() {
632 return st
633 .data
634 .iter()
635 .cloned()
636 .map(|batch| project(&st.schema, batch, &req.columns))
637 .collect();
638 }
639 let start = offset.min(total);
640 let len = limit.min(total - start);
641 let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
642 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
643 }
644
645 if let Some(rows) = try_index(&st.index, &req.predicates) {
648 let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
649 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
650 }
651 }
652
653 let (sql, params) = match page_window {
655 Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
656 None => build_query_stream_sql(&st.schema, req)?,
657 };
658 let mut df = self.ctx.sql(&sql).await?;
659 if !params.is_empty() {
660 df = df.with_param_values(params)?;
661 }
662 let batches = df.collect().await?;
663 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
664 let schema = batches
665 .first()
666 .map(|b| b.schema())
667 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
668 return Ok(vec![RecordBatch::new_empty(schema)]);
669 }
670 Ok(batches)
671 }
672}
673
674fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
675 let schema = batches
676 .first()
677 .map(|batch| batch.schema())
678 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
679 let (mut writer, stream) = arrow_ipc_stream_channel(8);
680
681 tokio::task::spawn_blocking(move || {
682 let result = (|| -> Result<(), AppError> {
683 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
684 for batch in batches {
685 if batch.num_rows() > 0 {
686 w.write(&batch)?;
687 }
688 }
689 w.finish()?;
690 Ok(())
691 })();
692 if let Err(err) = result {
693 log::error!("datafusion arrow stream failed: {err}");
694 writer.send_error(err);
695 }
696 });
697
698 stream
699}
700
701impl Store {
702 pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
706 let st = self.dataset(name)?;
707
708 if !st.lazy {
709 if req.predicates.is_empty() {
711 return Ok(st.num_rows() as i64);
712 }
713 if let Some(rows) = try_index(&st.index, &req.predicates) {
715 return Ok(rows.len() as i64);
716 }
717 }
718
719 let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
722 let mut df = self.ctx.sql(&sql).await?;
723 if !params.is_empty() {
724 df = df.with_param_values(params)?;
725 }
726 let batches = df.collect().await?;
727 let n = batches
728 .first()
729 .and_then(|b| {
730 b.column(0)
731 .as_any()
732 .downcast_ref::<arrow::array::Int64Array>()
733 })
734 .filter(|a| !a.is_empty())
735 .map(|a| a.value(0))
736 .unwrap_or(0);
737 Ok(n)
738 }
739}
740
741fn build_tuned_context(cfg: &DataFusionConfig) -> SessionContext {
768 let mut config = SessionConfig::new();
769 {
770 let opts = config.options_mut();
771 opts.execution.parquet.pushdown_filters = cfg.pushdown_filters;
772 opts.execution.parquet.reorder_filters = cfg.reorder_filters;
773 }
774
775 if !cfg.list_files_cache {
776 return SessionContext::new_with_config(config);
777 }
778
779 let ttl = (cfg.list_files_cache_ttl_secs > 0)
782 .then(|| Duration::from_secs(cfg.list_files_cache_ttl_secs));
783 let list_cache = Arc::new(DefaultListFilesCache::new(
784 cfg.list_files_cache_mb.saturating_mul(1024 * 1024),
785 ttl,
786 ));
787 let cache_manager = CacheManagerConfig::default().with_list_files_cache(Some(list_cache));
788
789 let runtime = RuntimeEnvBuilder::new()
790 .with_cache_manager(cache_manager)
791 .build_arc()
792 .expect("failed to build DataFusion runtime env");
793
794 SessionContext::new_with_config_rt(config, runtime)
795}
796
797async fn build_dataset(
798 d: &DatasetConfig,
799 ctx: &SessionContext,
800) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
801 if d.lazy {
808 match (d.source.kind, d.source.is_s3()) {
809 (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
810 (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
811 (SourceKind::Delta, _) => return build_lazy_delta(d, ctx).await,
812 }
813 }
814
815 let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
820 (SourceKind::Parquet, false) => read_local_parquet(d)?,
821 (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
822 (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
823 (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
824 };
825 if raw_batches.is_empty() {
826 return Err(AppError::EmptyDataset(format!(
827 "dataset '{}': source produced no batches",
828 d.name
829 )));
830 }
831 if raw_batches.iter().all(|b| b.num_rows() == 0) {
836 return Err(AppError::EmptyDataset(format!(
837 "dataset '{}': source has a schema but no rows",
838 d.name
839 )));
840 }
841
842 let chunks = raw_batches;
843 let arrow_sch = chunks[0].schema();
844
845 let columns: Vec<ColumnInfo> = arrow_sch
847 .fields()
848 .iter()
849 .map(|f| {
850 let dt = f.data_type();
851 ColumnInfo {
852 name: f.name().clone(),
853 logical: arrow_to_logical(dt),
854 sql_type: format!("{dt:?}"),
855 nullable: f.is_nullable(),
856 }
857 })
858 .collect();
859 let schema = DatasetSchema::new(&d.name, columns)
860 .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
861
862 let index = build_eq_index_with_policy(&chunks, &d.index);
867
868 let n_parts = std::thread::available_parallelism()
873 .map(|n| n.get())
874 .unwrap_or(4);
875 let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
876 for (i, b) in chunks.iter().enumerate() {
877 if b.num_rows() == 0 {
878 continue;
879 }
880 parts[i % n_parts].push(b.clone());
881 }
882 parts.retain(|p| !p.is_empty());
883 let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
884
885 let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
886 let mem_mb: usize = chunks
887 .iter()
888 .flat_map(|b| b.columns().iter())
889 .map(|c| c.get_buffer_memory_size())
890 .sum::<usize>()
891 / 1_048_576;
892 log::info!(
893 "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
894 d.name,
895 d.source.kind.as_str(),
896 total_rows,
897 schema.columns.len(),
898 mem_mb,
899 chunks.len(),
900 index.len()
901 );
902
903 Ok((
904 DatasetState {
905 schema,
906 data: chunks,
907 arrow_schema: arrow_sch,
908 index,
909 lazy: false,
910 },
911 provider,
912 ))
913}
914
915async fn build_lazy_local_parquet(
920 d: &DatasetConfig,
921 ctx: &SessionContext,
922) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
923 let (url, part_keys) = lazy_local_listing(d)?;
924
925 let mut opts =
926 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
927 if !part_keys.is_empty() {
928 opts = opts.with_table_partition_cols(
929 part_keys
930 .iter()
931 .map(|k| (k.clone(), DataType::Utf8))
932 .collect(),
933 );
934 }
935
936 let session_state = ctx.state();
937 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
940 AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
941 })?;
942
943 if file_schema.fields().is_empty() {
947 return Err(AppError::EmptyDataset(format!(
948 "dataset '{}': no .parquet files at {}",
949 d.name, d.source.location
950 )));
951 }
952
953 let cfg = ListingTableConfig::new(url)
954 .with_listing_options(opts)
955 .with_schema(file_schema.clone());
956 let table = ListingTable::try_new(cfg).map_err(|e| {
957 AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
958 })?;
959 let provider: Arc<dyn TableProvider> = Arc::new(table);
960
961 let mut fields: Vec<Field> = file_schema
963 .fields()
964 .iter()
965 .map(|f| f.as_ref().clone())
966 .collect();
967 for k in &part_keys {
968 if !fields.iter().any(|f| f.name() == k) {
969 fields.push(Field::new(k, DataType::Utf8, false));
970 }
971 }
972 let arrow_sch = Arc::new(Schema::new(fields));
973
974 let columns: Vec<ColumnInfo> = arrow_sch
975 .fields()
976 .iter()
977 .map(|f| {
978 let dt = f.data_type();
979 ColumnInfo {
980 name: f.name().clone(),
981 logical: arrow_to_logical(dt),
982 sql_type: format!("{dt:?}"),
983 nullable: f.is_nullable(),
984 }
985 })
986 .collect();
987 let schema = DatasetSchema::new(&d.name, columns)
988 .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
989
990 log::info!(
991 "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
992 d.name,
993 d.source.kind.as_str(),
994 schema.columns.len(),
995 part_keys.len()
996 );
997
998 Ok((
999 DatasetState {
1000 schema,
1001 data: Vec::new(),
1002 arrow_schema: arrow_sch,
1003 index: EqIndex::default(),
1004 lazy: true,
1005 },
1006 provider,
1007 ))
1008}
1009
1010fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1015 let loc = &d.source.location;
1016
1017 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1018 let parts: Vec<&str> = loc.split('/').collect();
1019 let first_wild = parts
1020 .iter()
1021 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1022 .unwrap_or(parts.len());
1023 let base = parts[..first_wild].join("/");
1024 let base = if base.is_empty() {
1025 "/".to_string()
1026 } else {
1027 base
1028 };
1029 let upper = parts.len().saturating_sub(1);
1032 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1033 .iter()
1034 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1035 .filter(|k| !k.is_empty())
1036 .collect();
1037 return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
1038 }
1039
1040 let path = std::path::Path::new(loc);
1041 if path.is_dir() {
1042 let keys = discover_hive_keys(path);
1043 return Ok((dir_url(path, d)?, keys));
1044 }
1045
1046 let url = ListingTableUrl::parse(loc)
1047 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
1048 Ok((url, Vec::new()))
1049}
1050
1051fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
1054 let s = path.to_str().ok_or_else(|| {
1055 AppError::Internal(format!(
1056 "dataset '{}': non-utf8 path {}",
1057 d.name,
1058 path.display()
1059 ))
1060 })?;
1061 let s = if s.ends_with('/') {
1062 s.to_string()
1063 } else {
1064 format!("{s}/")
1065 };
1066 ListingTableUrl::parse(&s)
1067 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
1068}
1069
1070fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
1074 let mut keys = Vec::new();
1075 let mut cur = base.to_path_buf();
1076 loop {
1077 let Ok(rd) = std::fs::read_dir(&cur) else {
1078 break;
1079 };
1080 let mut next: Option<(String, std::path::PathBuf)> = None;
1081 for entry in rd.flatten() {
1082 let p = entry.path();
1083 if !p.is_dir() {
1084 continue;
1085 }
1086 let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
1087 continue;
1088 };
1089 if let Some((k, v)) = name.split_once('=')
1090 && !k.is_empty()
1091 && !v.is_empty()
1092 {
1093 next = Some((k.to_string(), p));
1094 break;
1095 }
1096 }
1097 match next {
1098 Some((k, p)) => {
1099 keys.push(k);
1100 cur = p;
1101 }
1102 None => break,
1103 }
1104 }
1105 keys
1106}
1107
1108async fn build_lazy_s3_parquet(
1114 d: &DatasetConfig,
1115 ctx: &SessionContext,
1116) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1117 register_s3_object_store(d, ctx)?;
1118
1119 let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
1120
1121 if file_schema.fields().is_empty() {
1124 return Err(AppError::EmptyDataset(format!(
1125 "dataset '{}': no .parquet files at {}",
1126 d.name, d.source.location
1127 )));
1128 }
1129
1130 let mut fields: Vec<Field> = file_schema
1132 .fields()
1133 .iter()
1134 .map(|f| f.as_ref().clone())
1135 .collect();
1136 for k in &part_keys {
1137 if !fields.iter().any(|f| f.name() == k) {
1138 fields.push(Field::new(k, DataType::Utf8, false));
1139 }
1140 }
1141 let arrow_sch = Arc::new(Schema::new(fields));
1142
1143 let columns: Vec<ColumnInfo> = arrow_sch
1144 .fields()
1145 .iter()
1146 .map(|f| {
1147 let dt = f.data_type();
1148 ColumnInfo {
1149 name: f.name().clone(),
1150 logical: arrow_to_logical(dt),
1151 sql_type: format!("{dt:?}"),
1152 nullable: f.is_nullable(),
1153 }
1154 })
1155 .collect();
1156 let schema = DatasetSchema::new(&d.name, columns)
1157 .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1158
1159 log::info!(
1160 "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
1161 d.name,
1162 d.source.kind.as_str(),
1163 schema.columns.len(),
1164 part_keys.len()
1165 );
1166
1167 Ok((
1168 DatasetState {
1169 schema,
1170 data: Vec::new(),
1171 arrow_schema: arrow_sch,
1172 index: EqIndex::default(),
1173 lazy: true,
1174 },
1175 provider,
1176 ))
1177}
1178
1179async fn build_s3_listing_table(
1185 d: &DatasetConfig,
1186 ctx: &SessionContext,
1187) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
1188 let (url, part_keys) = s3_listing(d, ctx).await?;
1189
1190 let mut opts =
1191 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
1192 if !part_keys.is_empty() {
1193 opts = opts.with_table_partition_cols(
1194 part_keys
1195 .iter()
1196 .map(|k| (k.clone(), DataType::Utf8))
1197 .collect(),
1198 );
1199 }
1200
1201 let session_state = ctx.state();
1202 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
1203 AppError::Internal(format!(
1204 "dataset '{}': infer parquet schema on s3: {e}",
1205 d.name
1206 ))
1207 })?;
1208
1209 let cfg = ListingTableConfig::new(url)
1210 .with_listing_options(opts)
1211 .with_schema(file_schema.clone());
1212 let table = ListingTable::try_new(cfg).map_err(|e| {
1213 AppError::Internal(format!(
1214 "dataset '{}': ListingTable::try_new (s3): {e}",
1215 d.name
1216 ))
1217 })?;
1218 Ok((Arc::new(table), file_schema, part_keys))
1219}
1220
1221async fn s3_listing(
1227 d: &DatasetConfig,
1228 ctx: &SessionContext,
1229) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1230 let s3 = d.s3.clone().unwrap_or_default();
1231 let want_partitions = !matches!(s3.partitioning, Partitioning::None);
1232 let loc = &d.source.location;
1233
1234 if d.source.has_glob() {
1235 let (base, keys) = split_glob_base_keys(loc);
1236 let base = format!("{}/", base.trim_end_matches('/'));
1237 let url = ListingTableUrl::parse(&base).map_err(|e| {
1238 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1239 })?;
1240 let keys = if want_partitions { keys } else { Vec::new() };
1241 return Ok((url, keys));
1242 }
1243
1244 let base = if loc.ends_with('/') {
1245 loc.clone()
1246 } else {
1247 format!("{loc}/")
1248 };
1249 let url = ListingTableUrl::parse(&base).map_err(|e| {
1250 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1251 })?;
1252 let keys = if want_partitions {
1253 discover_s3_hive_keys(ctx, &url).await
1254 } else {
1255 Vec::new()
1256 };
1257 Ok((url, keys))
1258}
1259
1260fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
1264 let parts: Vec<&str> = loc.split('/').collect();
1265 let first_wild = parts
1266 .iter()
1267 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1268 .unwrap_or(parts.len());
1269 let base = parts[..first_wild].join("/");
1270 let base = if base.is_empty() {
1271 "/".to_string()
1272 } else {
1273 base
1274 };
1275 let upper = parts.len().saturating_sub(1);
1276 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1277 .iter()
1278 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1279 .filter(|k| !k.is_empty())
1280 .collect();
1281 (base, keys)
1282}
1283
1284async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
1289 let store = match ctx.runtime_env().object_store(url.object_store()) {
1290 Ok(s) => s,
1291 Err(_) => return Vec::new(),
1292 };
1293 let mut keys = Vec::new();
1294 let mut prefix = url.prefix().clone();
1295 loop {
1296 let listing = match store.list_with_delimiter(Some(&prefix)).await {
1297 Ok(l) => l,
1298 Err(_) => break,
1299 };
1300 let mut next: Option<object_store::path::Path> = None;
1301 for cp in &listing.common_prefixes {
1302 if let Some(seg) = cp.parts().next_back() {
1303 let seg = seg.as_ref().to_string();
1304 if let Some((k, v)) = seg.split_once('=')
1305 && !k.is_empty()
1306 && !v.is_empty()
1307 {
1308 keys.push(k.to_string());
1309 next = Some(cp.clone());
1310 break;
1311 }
1312 }
1313 }
1314 match next {
1315 Some(p) => prefix = p,
1316 None => break,
1317 }
1318 }
1319 keys
1320}
1321
1322fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1339 let files = d.resolve_local_parquet_files()?;
1340 let mut all = Vec::new();
1341 let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1342 None
1343 } else {
1344 Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1345 };
1346
1347 for f in &files {
1348 let file = std::fs::File::open(f)
1349 .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1350
1351 let probe = ParquetRecordBatchReaderBuilder::try_new(
1356 file.try_clone()
1357 .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1358 )?;
1359 let parquet_schema = probe.parquet_schema().clone();
1360 let arrow_schema = probe.schema().clone();
1361 let metadata = probe.metadata().clone();
1362 drop(probe);
1363
1364 let projection = if let Some(w) = &wanted {
1366 let indices: Vec<usize> = arrow_schema
1367 .fields()
1368 .iter()
1369 .enumerate()
1370 .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1371 .map(|(i, _)| i)
1372 .collect();
1373 if indices.is_empty() {
1374 return Err(AppError::Internal(format!(
1375 "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1376 d.name,
1377 d.columns,
1378 f.display()
1379 )));
1380 }
1381 ProjectionMask::roots(&parquet_schema, indices)
1382 } else {
1383 ProjectionMask::all()
1384 };
1385
1386 let mut new_fields: Vec<Field> = arrow_schema
1394 .fields()
1395 .iter()
1396 .map(|f| f.as_ref().clone())
1397 .collect();
1398 if d.dict_encode
1399 && let Some(rg0) = metadata.row_groups().first()
1400 {
1401 for (i, fld) in arrow_schema.fields().iter().enumerate() {
1402 if !matches!(
1403 fld.data_type(),
1404 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1405 ) {
1406 continue;
1407 }
1408 if let Some(col) = rg0.columns().get(i)
1409 && col.dictionary_page_offset().is_some()
1410 {
1411 new_fields[i] = Field::new(
1412 fld.name(),
1413 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1414 fld.is_nullable(),
1415 );
1416 }
1417 }
1418 }
1419 let forced_schema = Arc::new(Schema::new(new_fields));
1420
1421 let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1422 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1423 .with_batch_size(65_536)
1424 .with_projection(projection)
1425 .build()?;
1426 let pairs = hive_pairs(f);
1430 for batch in reader {
1431 let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1432 all.push(if pairs.is_empty() {
1433 batch
1434 } else {
1435 append_partition_cols(&batch, &pairs)?
1436 });
1437 }
1438 }
1439 if all.is_empty() {
1440 return Err(AppError::Internal(format!(
1441 "dataset '{}': parquet source is empty",
1442 d.name
1443 )));
1444 }
1445 Ok(all)
1446}
1447
1448fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1451 path.components()
1452 .filter_map(|c| c.as_os_str().to_str())
1453 .filter_map(|seg| {
1454 let (k, v) = seg.split_once('=')?;
1455 if k.is_empty() || v.is_empty() || v.contains('=') {
1456 return None;
1457 }
1458 Some((k.to_string(), v.to_string()))
1459 })
1460 .collect()
1461}
1462
1463fn append_partition_cols(
1466 batch: &RecordBatch,
1467 pairs: &[(String, String)],
1468) -> Result<RecordBatch, AppError> {
1469 let n = batch.num_rows();
1470 let mut fields: Vec<Field> = batch
1471 .schema()
1472 .fields()
1473 .iter()
1474 .map(|f| f.as_ref().clone())
1475 .collect();
1476 let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1477 for (k, v) in pairs {
1478 if fields.iter().any(|f| f.name() == k) {
1479 continue;
1480 }
1481 fields.push(Field::new(k, DataType::Utf8, false));
1482 cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1483 }
1484 RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1485 .map_err(|e| AppError::Internal(e.to_string()))
1486}
1487
1488async fn read_s3_parquet(
1494 d: &DatasetConfig,
1495 ctx: &SessionContext,
1496) -> Result<Vec<RecordBatch>, AppError> {
1497 register_s3_object_store(d, ctx)?;
1498 let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1499 let df = ctx
1500 .read_table(provider)
1501 .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1502 Ok(df.collect().await?)
1503}
1504
1505async fn open_delta_table(
1512 d: &DatasetConfig,
1513 opts: HashMap<String, String>,
1514) -> Result<deltalake::DeltaTable, AppError> {
1515 let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1516 AppError::Internal(format!(
1517 "dataset '{}': bad delta location '{}': {e}",
1518 d.name, d.source.location
1519 ))
1520 })?;
1521 deltalake::open_table_with_storage_options(url, opts)
1522 .await
1523 .map_err(|e| {
1524 let msg = e.to_string();
1534 let low = msg.to_lowercase();
1535 if low.contains("no files in log segment") || low.contains("not a delta table") {
1536 AppError::EmptyDataset(format!(
1537 "delta location '{}' has no committed files ({msg})",
1538 d.source.location
1539 ))
1540 } else {
1541 AppError::Internal(format!(
1542 "dataset '{}': delta open '{}': {msg}",
1543 d.name, d.source.location
1544 ))
1545 }
1546 })
1547}
1548
1549async fn open_delta_provider(
1555 d: &DatasetConfig,
1556 opts: HashMap<String, String>,
1557) -> Result<Arc<dyn TableProvider>, AppError> {
1558 let table = open_delta_table(d, opts).await?;
1559 table.table_provider().await.map_err(|e| {
1560 AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1561 })
1562}
1563
1564fn delta_storage_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1567 if d.source.is_s3() {
1568 delta_s3_options(d)
1569 } else {
1570 Ok(HashMap::new())
1571 }
1572}
1573
1574async fn read_delta(
1578 d: &DatasetConfig,
1579 opts: HashMap<String, String>,
1580) -> Result<Vec<RecordBatch>, AppError> {
1581 let provider = open_delta_provider(d, opts).await?;
1582 let scan_ctx = SessionContext::new();
1592 let df = scan_ctx.read_table(provider).map_err(|e| {
1593 AppError::EmptyDataset(format!(
1594 "delta location '{}' could not be scanned, skipping ({e})",
1595 d.source.location
1596 ))
1597 })?;
1598 df.collect().await.map_err(|e| {
1599 AppError::EmptyDataset(format!(
1600 "delta location '{}' could not be scanned, skipping ({e})",
1601 d.source.location
1602 ))
1603 })
1604}
1605
1606async fn build_lazy_delta(
1612 d: &DatasetConfig,
1613 _ctx: &SessionContext,
1614) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1615 let table = open_delta_table(d, delta_storage_options(d)?).await?;
1616
1617 let file_count = table
1624 .get_file_uris()
1625 .map(|it| it.count())
1626 .map_err(|e| AppError::Internal(format!("dataset '{}': delta file list: {e}", d.name)))?;
1627 if file_count == 0 {
1628 return Err(AppError::EmptyDataset(format!(
1629 "delta location '{}' has a schema but no data files",
1630 d.source.location
1631 )));
1632 }
1633
1634 let provider = table.table_provider().await.map_err(|e| {
1635 AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1636 })?;
1637
1638 {
1651 let probe_ctx = SessionContext::new();
1652 let probe = probe_ctx
1653 .read_table(provider.clone())
1654 .and_then(|df| df.limit(0, Some(1)));
1655 match probe {
1656 Ok(df) => match df.collect().await {
1657 Ok(batches) if batches.iter().all(|b| b.num_rows() == 0) => {
1658 return Err(AppError::EmptyDataset(format!(
1659 "delta location '{}' resolves to no rows",
1660 d.source.location
1661 )));
1662 }
1663 Ok(_) => {}
1664 Err(e) => {
1665 return Err(AppError::EmptyDataset(format!(
1666 "delta location '{}' could not be scanned, skipping ({e})",
1667 d.source.location
1668 )));
1669 }
1670 },
1671 Err(e) => {
1672 return Err(AppError::EmptyDataset(format!(
1673 "delta location '{}' could not be scanned, skipping ({e})",
1674 d.source.location
1675 )));
1676 }
1677 }
1678 }
1679
1680 let arrow_sch = provider.schema();
1683 let columns: Vec<ColumnInfo> = arrow_sch
1684 .fields()
1685 .iter()
1686 .map(|f| {
1687 let dt = f.data_type();
1688 ColumnInfo {
1689 name: f.name().clone(),
1690 logical: arrow_to_logical(dt),
1691 sql_type: format!("{dt:?}"),
1692 nullable: f.is_nullable(),
1693 }
1694 })
1695 .collect();
1696 let schema = DatasetSchema::new(&d.name, columns)
1697 .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1698
1699 log::info!(
1700 "dataset '{}' [{}, lazy]: {} cols, no materialise, no index",
1701 d.name,
1702 d.source.kind.as_str(),
1703 schema.columns.len()
1704 );
1705
1706 Ok((
1707 DatasetState {
1708 schema,
1709 data: Vec::new(),
1710 arrow_schema: arrow_sch,
1711 index: EqIndex::default(),
1712 lazy: true,
1713 },
1714 provider,
1715 ))
1716}
1717
1718fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1722 let creds = d.resolved_creds();
1723 let region = d.resolved_region();
1724 let s3 = d.s3.clone().unwrap_or_default();
1725 let (bucket, _) = d.source.s3_bucket()?;
1726
1727 let mut opts = HashMap::new();
1728 opts.insert("AWS_REGION".into(), region);
1729 if let Some(ep) = s3.effective_endpoint(bucket) {
1730 opts.insert("AWS_ENDPOINT_URL".into(), ep);
1731 }
1732 if s3.allow_http {
1733 opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1734 }
1735 opts.insert(
1736 "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1737 (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1738 );
1739 if let Some(k) = creds.access_key_id {
1740 opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1741 }
1742 if let Some(s) = creds.secret_access_key {
1743 opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1744 }
1745 if let Some(t) = creds.session_token {
1746 opts.insert("AWS_SESSION_TOKEN".into(), t);
1747 }
1748 opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1750 Ok(opts)
1751}
1752
1753fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1757 let (bucket, _key) = d.source.s3_bucket()?;
1758 let creds = d.resolved_creds();
1759 let region = d.resolved_region();
1760 let s3 = d.s3.clone().unwrap_or_default();
1761
1762 let store = build_s3(bucket, ®ion, &s3, &creds).map_err(|e| {
1763 AppError::Internal(format!(
1764 "dataset '{}': build S3 store for '{bucket}': {e}",
1765 d.name
1766 ))
1767 })?;
1768
1769 let url = Url::parse(&format!("s3://{bucket}"))
1770 .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1771 ctx.register_object_store(&url, Arc::new(store));
1772 Ok(())
1773}
1774
1775fn is_s3_access_denied(msg: &str) -> bool {
1782 let low = msg.to_lowercase();
1783 low.contains("access denied")
1784 || low.contains("accessdenied")
1785 || low.contains("forbidden")
1786 || low.contains("403")
1787}
1788
1789
1790async fn should_force_lazy(d: &DatasetConfig, server: &ServerConfig) -> Option<u64> {
1796 if d.lazy || server.force_lazy_above_mb == 0 {
1797 return None;
1798 }
1799 let threshold = server.force_lazy_above_mb.saturating_mul(1024 * 1024);
1800
1801 let bytes = if d.source.is_s3() {
1802 match estimate_s3_bytes(d).await {
1803 Ok(b) => b,
1804 Err(e) => {
1805 log::warn!(
1806 "dataset '{}': could not measure S3 size for force_lazy_above_mb: {e}",
1807 d.name
1808 );
1809 return None;
1810 }
1811 }
1812 } else {
1813 d.estimate_local_bytes()?
1814 };
1815
1816 (bytes > threshold).then_some(bytes)
1817}
1818
1819async fn estimate_s3_bytes(d: &DatasetConfig) -> Result<u64, AppError> {
1828 use futures_util::StreamExt;
1829 use object_store::ObjectStore;
1830
1831 let (bucket, _key) = d.source.s3_bucket()?;
1832 let creds = d.resolved_creds();
1833 let region = d.resolved_region();
1834 let s3 = d.s3.clone().unwrap_or_default();
1835 let store = build_s3(bucket, ®ion, &s3, &creds).map_err(|e| {
1836 AppError::Internal(format!(
1837 "dataset '{}': build S3 store for '{bucket}': {e}",
1838 d.name
1839 ))
1840 })?;
1841
1842 let (base, _keys) = split_glob_base_keys(&d.source.location);
1845 let prefix_key = base
1846 .strip_prefix("s3://")
1847 .and_then(|rest| rest.split_once('/').map(|(_bucket, key)| key))
1848 .unwrap_or("")
1849 .trim_end_matches('/');
1850 let prefix =
1851 (!prefix_key.is_empty()).then(|| object_store::path::Path::from(prefix_key));
1852
1853 let mut total: u64 = 0;
1854 let mut stream = store.list(prefix.as_ref());
1855 while let Some(meta) = stream.next().await {
1856 let meta = meta.map_err(|e| {
1857 AppError::Internal(format!(
1858 "dataset '{}': s3 list under '{prefix_key}': {e}",
1859 d.name
1860 ))
1861 })?;
1862 if meta.location.as_ref().ends_with(".parquet") {
1863 total = total.saturating_add(meta.size);
1864 }
1865 }
1866 Ok(total)
1867}
1868
1869fn build_s3(
1870 bucket: &str,
1871 region: &str,
1872 s3: &S3Config,
1873 creds: &ResolvedCreds,
1874) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1875 let mut b = AmazonS3Builder::new()
1876 .with_bucket_name(bucket)
1877 .with_region(region)
1878 .with_allow_http(s3.allow_http)
1879 .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1880 if let Some(ep) = s3.effective_endpoint(bucket) {
1881 b = b.with_endpoint(ep);
1882 }
1883 if let Some(k) = creds.access_key_id.as_deref() {
1884 b = b.with_access_key_id(k);
1885 }
1886 if let Some(s) = creds.secret_access_key.as_deref() {
1887 b = b.with_secret_access_key(s);
1888 }
1889 if let Some(t) = creds.session_token.as_deref() {
1890 b = b.with_token(t);
1891 }
1892 b.build()
1893}
1894
1895fn arrow_to_logical(dt: &DataType) -> LogicalType {
1896 match dt {
1897 DataType::Boolean => LogicalType::Bool,
1898 DataType::Int8
1899 | DataType::Int16
1900 | DataType::Int32
1901 | DataType::Int64
1902 | DataType::UInt8
1903 | DataType::UInt16
1904 | DataType::UInt32
1905 | DataType::UInt64 => LogicalType::Int,
1906 DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1907 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1908 DataType::Dictionary(_, v)
1912 if matches!(
1913 v.as_ref(),
1914 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1915 ) =>
1916 {
1917 LogicalType::Utf8
1918 }
1919 DataType::Date32
1920 | DataType::Date64
1921 | DataType::Time32(_)
1922 | DataType::Time64(_)
1923 | DataType::Timestamp(_, _)
1924 | DataType::Duration(_)
1925 | DataType::Interval(_) => LogicalType::Temporal,
1926 _ => LogicalType::Other,
1927 }
1928}
1929
1930fn project(
1935 schema: &DatasetSchema,
1936 batch: RecordBatch,
1937 columns: &[String],
1938) -> Result<RecordBatch, AppError> {
1939 if columns.is_empty() {
1940 return Ok(batch);
1941 }
1942 let indices: Vec<usize> = columns
1943 .iter()
1944 .map(|c| {
1945 schema
1946 .find(c)
1947 .map(|info| schema.by_name[&info.name.to_lowercase()])
1948 })
1949 .collect::<Result<_, _>>()?;
1950 let fields: Vec<Field> = indices
1951 .iter()
1952 .map(|&i| batch.schema().field(i).clone())
1953 .collect();
1954 let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
1955 Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
1956}
1957
1958#[derive(Default)]
1971struct Params {
1972 values: Vec<ScalarValue>,
1973}
1974
1975impl Params {
1976 fn new() -> Self {
1977 Self::default()
1978 }
1979
1980 fn bind(&mut self, v: ScalarValue) -> String {
1982 self.values.push(v);
1983 format!("${}", self.values.len())
1984 }
1985
1986 fn into_values(self) -> Vec<ScalarValue> {
1987 self.values
1988 }
1989}
1990
1991fn build_query_sql(
1992 schema: &DatasetSchema,
1993 req: &QueryRequest,
1994 max_page_size: u64,
1995) -> Result<(String, Vec<ScalarValue>), AppError> {
1996 let (limit, offset) = req.effective_limit_offset(max_page_size);
1997 build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
1998}
1999
2000fn build_query_stream_sql(
2001 schema: &DatasetSchema,
2002 req: &QueryRequest,
2003) -> Result<(String, Vec<ScalarValue>), AppError> {
2004 let suffix = req
2005 .limit
2006 .map(|limit| format!(" LIMIT {limit}"))
2007 .unwrap_or_default();
2008 build_query_sql_with_suffix(schema, req, &suffix)
2009}
2010
2011fn build_query_sql_with_suffix(
2012 schema: &DatasetSchema,
2013 req: &QueryRequest,
2014 suffix: &str,
2015) -> Result<(String, Vec<ScalarValue>), AppError> {
2016 let agg_plan = req.agg_plan(schema)?;
2017
2018 let cols = if let Some(plan) = &agg_plan {
2019 let mut parts: Vec<String> = plan
2021 .group_cols
2022 .iter()
2023 .map(|c| DatasetSchema::quote_ident(c))
2024 .collect();
2025 for a in &plan.aggs {
2026 let expr = a.sql_expr()?;
2027 parts.push(format!(
2028 "{expr} AS {}",
2029 DatasetSchema::quote_ident(&a.alias)
2030 ));
2031 }
2032 parts.join(", ")
2033 } else if req.columns.is_empty() {
2034 if req.distinct {
2035 "DISTINCT *".to_string()
2036 } else {
2037 "*".to_string()
2038 }
2039 } else {
2040 let list = req
2041 .columns
2042 .iter()
2043 .map(|c| {
2044 schema
2045 .find(c)
2046 .map(|info| DatasetSchema::quote_ident(&info.name))
2047 })
2048 .collect::<Result<Vec<_>, _>>()?
2049 .join(", ");
2050 if req.distinct {
2051 format!("DISTINCT {list}")
2052 } else {
2053 list
2054 }
2055 };
2056
2057 let mut params = Params::new();
2058 let clauses: Vec<String> = req
2059 .predicates
2060 .iter()
2061 .map(|p| pred_to_sql(schema, p, &mut params))
2062 .collect::<Result<_, _>>()?;
2063
2064 let table = DatasetSchema::quote_ident(&schema.name);
2065 let where_clause = if clauses.is_empty() {
2066 String::new()
2067 } else {
2068 format!(" WHERE {}", clauses.join(" AND "))
2069 };
2070 let group_clause = match &agg_plan {
2071 Some(p) => format!(
2072 " GROUP BY {}",
2073 p.group_cols
2074 .iter()
2075 .map(|c| DatasetSchema::quote_ident(c))
2076 .collect::<Vec<_>>()
2077 .join(", "),
2078 ),
2079 None => String::new(),
2080 };
2081 let having_clause = {
2082 let resolved = req.having_plan(agg_plan.as_ref())?;
2083 if resolved.is_empty() {
2084 String::new()
2085 } else {
2086 let clauses: Vec<String> = resolved
2087 .iter()
2088 .map(|(lhs, p)| pred_to_sql_with_lhs(lhs, p, &mut params))
2089 .collect::<Result<_, _>>()?;
2090 format!(" HAVING {}", clauses.join(" AND "))
2091 }
2092 };
2093 let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
2094 Some(s) => format!(" ORDER BY {s}"),
2095 None => String::new(),
2096 };
2097 let sql =
2098 format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{having_clause}{order_clause}{suffix}");
2099 Ok((sql, params.into_values()))
2100}
2101
2102fn build_count_sql(
2103 schema: &DatasetSchema,
2104 predicates: &[Predicate],
2105) -> Result<(String, Vec<ScalarValue>), AppError> {
2106 let mut params = Params::new();
2107 let clauses: Vec<String> = predicates
2108 .iter()
2109 .map(|p| pred_to_sql(schema, p, &mut params))
2110 .collect::<Result<_, _>>()?;
2111 let table = DatasetSchema::quote_ident(&schema.name);
2112 let where_clause = if clauses.is_empty() {
2113 String::new()
2114 } else {
2115 format!(" WHERE {}", clauses.join(" AND "))
2116 };
2117 let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
2118 Ok((sql, params.into_values()))
2119}
2120
2121fn pred_to_sql(
2122 schema: &DatasetSchema,
2123 pred: &Predicate,
2124 params: &mut Params,
2125) -> Result<String, AppError> {
2126 let info = schema.find(&pred.col)?;
2127 let col = DatasetSchema::quote_ident(&info.name);
2128 pred_to_sql_with_lhs(&col, pred, params)
2129}
2130
2131fn pred_to_sql_with_lhs(
2137 col: &str,
2138 pred: &Predicate,
2139 params: &mut Params,
2140) -> Result<String, AppError> {
2141 match pred.op.as_str() {
2142 "is_null" => return Ok(format!("{col} IS NULL")),
2143 "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
2144 _ => {}
2145 }
2146
2147 let val = pred
2148 .val
2149 .as_ref()
2150 .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
2151
2152 if pred.op == "in" {
2153 let items = val
2154 .as_array()
2155 .filter(|a| !a.is_empty())
2156 .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
2157 let placeholders: Vec<String> = items
2158 .iter()
2159 .map(|item| Ok(params.bind(json_to_scalar(item)?)))
2160 .collect::<Result<_, AppError>>()?;
2161 return Ok(format!("{col} IN ({})", placeholders.join(", ")));
2162 }
2163
2164 let sql_op = match pred.op.as_str() {
2165 "eq" => "=",
2166 "neq" => "!=",
2167 "gt" => ">",
2168 "gte" => ">=",
2169 "lt" => "<",
2170 "lte" => "<=",
2171 "like" => "LIKE",
2172 "ilike" => "ILIKE",
2173 other => return Err(AppError::UnknownOperator(other.into())),
2174 };
2175 let placeholder = params.bind(json_to_scalar(val)?);
2176 Ok(format!("{col} {sql_op} {placeholder}"))
2177}
2178
2179fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
2183 match val {
2184 JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
2185 JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
2186 JsonValue::Null => Ok(ScalarValue::Null),
2187 JsonValue::Number(n) => {
2188 if let Some(i) = n.as_i64() {
2189 Ok(ScalarValue::Int64(Some(i)))
2190 } else if let Some(u) = n.as_u64() {
2191 Ok(ScalarValue::UInt64(Some(u)))
2192 } else if let Some(f) = n.as_f64() {
2193 Ok(ScalarValue::Float64(Some(f)))
2194 } else {
2195 Err(AppError::InvalidValue(
2196 "unsupported numeric literal in predicate".into(),
2197 ))
2198 }
2199 }
2200 _ => Err(AppError::InvalidValue(
2201 "unsupported literal type in predicate".into(),
2202 )),
2203 }
2204}
2205
2206fn json_index_key(val: &JsonValue) -> Option<String> {
2211 match val {
2212 JsonValue::String(s) => Some(s.clone()),
2213 JsonValue::Number(n) => Some(n.to_string()),
2214 JsonValue::Bool(b) => Some(b.to_string()),
2215 _ => None,
2216 }
2217}
2218
2219fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2220 let mut out = Vec::new();
2221 let (mut i, mut j) = (0, 0);
2222 while i < a.len() && j < b.len() {
2223 match a[i].cmp(&b[j]) {
2224 Ordering::Equal => {
2225 out.push(a[i]);
2226 i += 1;
2227 j += 1;
2228 }
2229 Ordering::Less => i += 1,
2230 Ordering::Greater => j += 1,
2231 }
2232 }
2233 out
2234}
2235
2236fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2237 let mut out = Vec::with_capacity(a.len() + b.len());
2238 let (mut i, mut j) = (0, 0);
2239 while i < a.len() && j < b.len() {
2240 match a[i].cmp(&b[j]) {
2241 Ordering::Less => {
2242 out.push(a[i]);
2243 i += 1;
2244 }
2245 Ordering::Greater => {
2246 out.push(b[j]);
2247 j += 1;
2248 }
2249 Ordering::Equal => {
2250 out.push(a[i]);
2251 i += 1;
2252 j += 1;
2253 }
2254 }
2255 }
2256 out.extend_from_slice(&a[i..]);
2257 out.extend_from_slice(&b[j..]);
2258 out
2259}
2260
2261fn try_index<'a>(index: &'a EqIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2262 if predicates.is_empty() || index.is_empty() {
2263 return None;
2264 }
2265
2266 if let [pred] = predicates
2269 && pred.op.as_str() == "eq"
2270 {
2271 let col_lower = pred.col.to_lowercase();
2272 let col_map = index.get(&col_lower)?;
2273 let key = json_index_key(pred.val.as_ref()?)?;
2274 return Some(match col_map.get(&key) {
2275 Some(rows) => Cow::Borrowed(rows.as_slice()),
2276 None => Cow::Owned(Vec::new()),
2277 });
2278 }
2279
2280 let mut result: Option<Vec<u32>> = None;
2281 for pred in predicates {
2282 let col_lower = pred.col.to_lowercase();
2283 let col_map = index.get(&col_lower)?;
2284
2285 let rows: Vec<u32> = match pred.op.as_str() {
2286 "eq" => {
2287 let key = json_index_key(pred.val.as_ref()?)?;
2288 col_map.get(&key).cloned().unwrap_or_default()
2289 }
2290 "in" => {
2291 let items = pred.val.as_ref()?.as_array()?;
2292 let mut merged: Vec<u32> = Vec::new();
2293 for item in items {
2294 if let Some(r) = col_map.get(&json_index_key(item)?) {
2295 merged = union_sorted(&merged, r);
2296 }
2297 }
2298 merged
2299 }
2300 _ => return None,
2301 };
2302
2303 result = Some(match result {
2304 None => rows,
2305 Some(r) => intersect_sorted(&r, &rows),
2306 });
2307 }
2308 result.map(Cow::Owned)
2309}
2310
2311#[doc(hidden)]
2315pub mod bench {
2316 use super::{EqIndex, FastMap, json_index_key, try_index};
2317 use datapress_core::models::Predicate;
2318 use serde_json::Value as JsonValue;
2319 use std::borrow::Cow;
2320
2321 pub struct BenchIndex(EqIndex);
2323
2324 pub fn single_bucket_index(col: &str, val: &JsonValue, rows: Vec<u32>) -> BenchIndex {
2328 let key = json_index_key(val).expect("benchable index key");
2329 let mut col_map: FastMap<String, Vec<u32>> = FastMap::default();
2330 col_map.insert(key, rows);
2331 let mut index: EqIndex = EqIndex::default();
2332 index.insert(col.to_string(), col_map);
2333 BenchIndex(index)
2334 }
2335
2336 pub fn lookup<'a>(idx: &'a BenchIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2338 try_index(&idx.0, predicates)
2339 }
2340
2341 pub fn lookup_cloning(idx: &BenchIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
2346 let [pred] = predicates else { return None };
2347 if pred.op.as_str() != "eq" {
2348 return None;
2349 }
2350 let col_lower = pred.col.to_lowercase();
2351 let col_map = idx.0.get(&col_lower)?;
2352 let key = json_index_key(pred.val.as_ref()?)?;
2353 Some(col_map.get(&key).cloned().unwrap_or_default())
2354 }
2355}
2356
2357fn slice_global(
2360 chunks: &[RecordBatch],
2361 schema: &Arc<Schema>,
2362 offset: usize,
2363 limit: usize,
2364) -> Result<RecordBatch, AppError> {
2365 if limit == 0 || chunks.is_empty() {
2366 return Ok(RecordBatch::new_empty(schema.clone()));
2367 }
2368 let mut out = Vec::new();
2369 let mut to_skip = offset;
2370 let mut remaining = limit;
2371 for b in chunks {
2372 if remaining == 0 {
2373 break;
2374 }
2375 let n = b.num_rows();
2376 if to_skip >= n {
2377 to_skip -= n;
2378 continue;
2379 }
2380 let take = remaining.min(n - to_skip);
2381 out.push(b.slice(to_skip, take));
2382 to_skip = 0;
2383 remaining -= take;
2384 }
2385 if out.is_empty() {
2386 return Ok(RecordBatch::new_empty(schema.clone()));
2387 }
2388 compute::concat_batches(schema, out.iter()).map_err(AppError::from)
2389}
2390
2391fn take_page(
2396 chunks: &[RecordBatch],
2397 schema: &Arc<Schema>,
2398 rows: &[u32],
2399 offset: usize,
2400 limit: usize,
2401) -> Result<RecordBatch, AppError> {
2402 let start = offset.min(rows.len());
2403 let len = limit.min(rows.len() - start);
2404 if len == 0 || chunks.is_empty() {
2405 return Ok(RecordBatch::new_empty(schema.clone()));
2406 }
2407
2408 let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
2411 let mut acc: u32 = 0;
2412 offsets.push(0);
2413 for b in chunks {
2414 acc = acc
2415 .checked_add(b.num_rows() as u32)
2416 .expect("row count exceeds u32::MAX");
2417 offsets.push(acc);
2418 }
2419
2420 let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
2423 for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
2424 let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
2425 let local = gid - offsets[bi];
2426 buckets[bi].push((out_pos as u32, local));
2427 }
2428
2429 let mut takens: Vec<RecordBatch> = Vec::new();
2431 let mut dest: Vec<u32> = Vec::with_capacity(len);
2432 for (bi, bucket) in buckets.iter().enumerate() {
2433 if bucket.is_empty() {
2434 continue;
2435 }
2436 let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
2437 let cols: Vec<ArrayRef> = chunks[bi]
2438 .columns()
2439 .iter()
2440 .map(|c| {
2441 arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
2442 .map_err(AppError::from)
2443 })
2444 .collect::<Result<_, _>>()?;
2445 takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
2446 dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
2447 }
2448
2449 let stitched = compute::concat_batches(schema, takens.iter())?;
2451 let mut inv = vec![0u32; len];
2452 for (i, &d) in dest.iter().enumerate() {
2453 inv[d as usize] = i as u32;
2454 }
2455 let perm = UInt32Array::from(inv);
2456 let cols: Vec<ArrayRef> = stitched
2457 .columns()
2458 .iter()
2459 .map(|c| {
2460 arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
2461 .map_err(AppError::from)
2462 })
2463 .collect::<Result<_, _>>()?;
2464 RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
2465}
2466
2467fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
2471 use rayon::prelude::*;
2472
2473 if cfg.mode == IndexMode::None || chunks.is_empty() {
2474 return EqIndex::default();
2475 }
2476
2477 let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
2478 Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
2479 } else {
2480 None
2481 };
2482
2483 let max_card = if cfg.mode == IndexMode::Auto {
2484 Some(cfg.max_cardinality)
2485 } else {
2486 None
2487 };
2488
2489 let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
2491 let mut acc: u32 = 0;
2492 for b in chunks {
2493 batch_offsets.push(acc);
2494 acc = acc
2495 .checked_add(b.num_rows() as u32)
2496 .expect("row count exceeds u32::MAX");
2497 }
2498
2499 let schema = chunks[0].schema();
2500
2501 schema
2502 .fields()
2503 .par_iter()
2504 .enumerate()
2505 .filter_map(|(ci, field)| {
2506 let col_lower = field.name().to_lowercase();
2507 if let Some(a) = &allow
2508 && !a.contains_key(&col_lower)
2509 {
2510 return None;
2511 }
2512
2513 let dtype = field.data_type();
2516 let dict_utf8 = matches!(dtype,
2517 DataType::Dictionary(k, v)
2518 if matches!(k.as_ref(), DataType::Int32)
2519 && matches!(v.as_ref(), DataType::Utf8));
2520 match dtype {
2521 DataType::Utf8
2522 | DataType::Utf8View
2523 | DataType::Boolean
2524 | DataType::Int8
2525 | DataType::Int16
2526 | DataType::Int32
2527 | DataType::Int64 => {}
2528 _ if dict_utf8 => {}
2529 _ => return None,
2530 }
2531
2532 let mut map: FastMap<String, Vec<u32>> = FastMap::default();
2533
2534 for (bi, batch) in chunks.iter().enumerate() {
2535 let base = batch_offsets[bi];
2536 let col = batch.column(ci);
2537
2538 macro_rules! index_col {
2539 ($arr_ty:ty) => {{
2540 let arr = col.as_any().downcast_ref::<$arr_ty>()?;
2541 for row in 0..arr.len() {
2542 if arr.is_null(row) {
2543 continue;
2544 }
2545 let key = arr.value(row).to_string();
2546 let gid = base + row as u32;
2547 if let Some(v) = map.get_mut(&key) {
2548 v.push(gid);
2549 } else {
2550 if let Some(mc) = max_card {
2551 if map.len() >= mc {
2552 return None;
2553 }
2554 }
2555 map.insert(key, vec![gid]);
2556 }
2557 }
2558 }};
2559 }
2560
2561 if dict_utf8 {
2562 let arr = col
2569 .as_any()
2570 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
2571 )?;
2572 let keys = arr.keys();
2573 let values = arr.values().as_any().downcast_ref::<StringArray>()?;
2574 for row in 0..arr.len() {
2575 if arr.is_null(row) {
2576 continue;
2577 }
2578 let k = keys.value(row) as usize;
2579 let s = values.value(k);
2580 let gid = base + row as u32;
2581 if let Some(v) = map.get_mut(s) {
2582 v.push(gid);
2583 } else {
2584 if let Some(mc) = max_card
2585 && map.len() >= mc
2586 {
2587 return None;
2588 }
2589 map.insert(s.to_string(), vec![gid]);
2590 }
2591 }
2592 } else {
2593 match dtype {
2594 DataType::Utf8 => index_col!(StringArray),
2595 DataType::Utf8View => index_col!(StringViewArray),
2596 DataType::Boolean => index_col!(BooleanArray),
2597 DataType::Int8 => index_col!(Int8Array),
2598 DataType::Int16 => index_col!(Int16Array),
2599 DataType::Int32 => index_col!(Int32Array),
2600 DataType::Int64 => index_col!(Int64Array),
2601 _ => unreachable!(),
2602 }
2603 }
2604 }
2605
2606 Some((col_lower, map))
2607 })
2608 .collect()
2609}
2610
2611fn writable_inline(dt: &DataType) -> bool {
2624 match dt {
2625 DataType::Utf8
2626 | DataType::LargeUtf8
2627 | DataType::Utf8View
2628 | DataType::Boolean
2629 | DataType::Int8
2630 | DataType::Int16
2631 | DataType::Int32
2632 | DataType::Int64
2633 | DataType::UInt8
2634 | DataType::UInt16
2635 | DataType::UInt32
2636 | DataType::UInt64
2637 | DataType::Float32
2638 | DataType::Float64
2639 | DataType::Decimal128(_, _)
2640 | DataType::Decimal256(_, _) => true,
2641 DataType::Dictionary(k, v)
2642 if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
2643 {
2644 true
2645 }
2646 _ => false,
2647 }
2648}
2649
2650fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
2656 let schema = batch.schema();
2657 let to_cast: Vec<usize> = schema
2658 .fields()
2659 .iter()
2660 .enumerate()
2661 .filter_map(|(i, f)| {
2662 if writable_inline(f.data_type()) {
2663 None
2664 } else {
2665 Some(i)
2666 }
2667 })
2668 .collect();
2669 if to_cast.is_empty() {
2670 return Ok(batch.clone());
2671 }
2672 let new_fields: Vec<Field> = schema
2673 .fields()
2674 .iter()
2675 .enumerate()
2676 .map(|(i, f)| {
2677 if to_cast.contains(&i) {
2678 Field::new(f.name(), DataType::Utf8, f.is_nullable())
2679 } else {
2680 f.as_ref().clone()
2681 }
2682 })
2683 .collect();
2684 let new_schema = Arc::new(Schema::new(new_fields));
2685 let cols: Vec<ArrayRef> = batch
2686 .columns()
2687 .iter()
2688 .enumerate()
2689 .map(|(i, c)| {
2690 if to_cast.contains(&i) {
2691 compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2692 } else {
2693 Ok(c.clone())
2694 }
2695 })
2696 .collect::<Result<_, _>>()?;
2697 RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2698}
2699
2700#[allow(dead_code)]
2706#[derive(Clone, Copy)]
2707enum CmpOp {
2708 Eq,
2709 Neq,
2710 Gt,
2711 Gte,
2712 Lt,
2713 Lte,
2714 Like,
2715 ILike,
2716}
2717
2718#[allow(dead_code)]
2719fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2720 let arr = col
2721 .as_any()
2722 .downcast_ref::<StringArray>()
2723 .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2724 let s = Scalar::new(StringArray::from(vec![val]));
2725 Ok(eq(arr, &s)?)
2726}
2727
2728#[allow(dead_code)]
2729fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2730 macro_rules! num_cmp {
2731 ($arr_type:ty, $cast:ty) => {{
2732 let n = val
2733 .as_f64()
2734 .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2735 as $cast;
2736 let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2737 let s = Scalar::new(<$arr_type>::from(vec![n]));
2738 Ok(match op {
2739 CmpOp::Eq => eq(arr, &s)?,
2740 CmpOp::Neq => neq(arr, &s)?,
2741 CmpOp::Gt => gt(arr, &s)?,
2742 CmpOp::Gte => gt_eq(arr, &s)?,
2743 CmpOp::Lt => lt(arr, &s)?,
2744 CmpOp::Lte => lt_eq(arr, &s)?,
2745 CmpOp::Like | CmpOp::ILike => {
2746 return Err(AppError::InvalidValue(
2747 "LIKE requires a string column".into(),
2748 ));
2749 }
2750 })
2751 }};
2752 }
2753 match col.data_type() {
2754 DataType::Utf8 => {
2755 let s = val
2756 .as_str()
2757 .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
2758 let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
2759 let sc = Scalar::new(StringArray::from(vec![s]));
2760 Ok(match op {
2761 CmpOp::Eq => eq(arr, &sc)?,
2762 CmpOp::Neq => neq(arr, &sc)?,
2763 CmpOp::Gt => gt(arr, &sc)?,
2764 CmpOp::Gte => gt_eq(arr, &sc)?,
2765 CmpOp::Lt => lt(arr, &sc)?,
2766 CmpOp::Lte => lt_eq(arr, &sc)?,
2767 CmpOp::Like => compute::like(arr, &sc)?,
2768 CmpOp::ILike => compute::ilike(arr, &sc)?,
2769 })
2770 }
2771 DataType::Int8 => num_cmp!(Int8Array, i8),
2772 DataType::Int16 => num_cmp!(Int16Array, i16),
2773 DataType::Int32 => num_cmp!(Int32Array, i32),
2774 DataType::Int64 => num_cmp!(Int64Array, i64),
2775 DataType::Float32 => num_cmp!(Float32Array, f32),
2776 DataType::Float64 => num_cmp!(Float64Array, f64),
2777 dt => Err(AppError::InvalidValue(format!(
2778 "unsupported type for comparison: {dt:?}"
2779 ))),
2780 }
2781}
2782
2783pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
2788 let batch = cast_for_serialize(batch)?;
2793 let schema = batch.schema();
2794 let n_rows = batch.num_rows();
2795
2796 let keys: Vec<Vec<u8>> = schema
2797 .fields()
2798 .iter()
2799 .map(|f| {
2800 let mut k = Vec::with_capacity(f.name().len() + 3);
2801 k.push(b'"');
2802 k.extend_from_slice(f.name().as_bytes());
2803 k.extend_from_slice(b"\":");
2804 k
2805 })
2806 .collect();
2807
2808 let encoders: Vec<ColEnc> = batch
2813 .columns()
2814 .iter()
2815 .map(|c| ColEnc::new(c.as_ref()))
2816 .collect();
2817
2818 let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
2819 let mut itoa_buf = itoa::Buffer::new();
2820 let mut ryu_buf = ryu::Buffer::new();
2821 buf.push(b'[');
2822
2823 for row in 0..n_rows {
2824 if row > 0 {
2825 buf.push(b',');
2826 }
2827 buf.push(b'{');
2828 for (i, (key, enc)) in keys.iter().zip(encoders.iter()).enumerate() {
2829 if i > 0 {
2830 buf.push(b',');
2831 }
2832 buf.extend_from_slice(key);
2833 enc.write(&mut buf, row, &mut itoa_buf, &mut ryu_buf);
2834 }
2835 buf.push(b'}');
2836 }
2837
2838 buf.push(b']');
2839 Ok(unsafe { String::from_utf8_unchecked(buf) })
2840}
2841
2842enum ColEnc<'a> {
2847 Utf8(&'a StringArray),
2848 LargeUtf8(&'a LargeStringArray),
2849 Utf8View(&'a StringViewArray),
2850 DictI32Utf8(
2853 &'a arrow::array::DictionaryArray<arrow::datatypes::Int32Type>,
2854 &'a StringArray,
2855 ),
2856 Bool(&'a BooleanArray),
2857 I8(&'a Int8Array),
2858 I16(&'a Int16Array),
2859 I32(&'a Int32Array),
2860 I64(&'a Int64Array),
2861 U8(&'a UInt8Array),
2862 U16(&'a UInt16Array),
2863 U32(&'a UInt32Array),
2864 U64(&'a UInt64Array),
2865 Dec128(&'a Decimal128Array),
2866 Dec256(&'a Decimal256Array),
2867 F32(&'a Float32Array),
2868 F64(&'a Float64Array),
2869 Other(&'a dyn Array),
2871}
2872
2873impl<'a> ColEnc<'a> {
2874 fn new(col: &'a dyn Array) -> ColEnc<'a> {
2875 macro_rules! dc {
2876 ($t:ty) => {
2877 col.as_any().downcast_ref::<$t>().unwrap()
2878 };
2879 }
2880 match col.data_type() {
2881 DataType::Utf8 => ColEnc::Utf8(dc!(StringArray)),
2882 DataType::LargeUtf8 => ColEnc::LargeUtf8(dc!(LargeStringArray)),
2883 DataType::Utf8View => ColEnc::Utf8View(dc!(StringViewArray)),
2884 DataType::Dictionary(key, value)
2885 if matches!(key.as_ref(), DataType::Int32)
2886 && matches!(value.as_ref(), DataType::Utf8) =>
2887 {
2888 let dict = dc!(arrow::array::DictionaryArray<arrow::datatypes::Int32Type>);
2889 let values = dict
2890 .values()
2891 .as_any()
2892 .downcast_ref::<StringArray>()
2893 .unwrap();
2894 ColEnc::DictI32Utf8(dict, values)
2895 }
2896 DataType::Boolean => ColEnc::Bool(dc!(BooleanArray)),
2897 DataType::Int8 => ColEnc::I8(dc!(Int8Array)),
2898 DataType::Int16 => ColEnc::I16(dc!(Int16Array)),
2899 DataType::Int32 => ColEnc::I32(dc!(Int32Array)),
2900 DataType::Int64 => ColEnc::I64(dc!(Int64Array)),
2901 DataType::UInt8 => ColEnc::U8(dc!(UInt8Array)),
2902 DataType::UInt16 => ColEnc::U16(dc!(UInt16Array)),
2903 DataType::UInt32 => ColEnc::U32(dc!(UInt32Array)),
2904 DataType::UInt64 => ColEnc::U64(dc!(UInt64Array)),
2905 DataType::Decimal128(_, _) => ColEnc::Dec128(dc!(Decimal128Array)),
2906 DataType::Decimal256(_, _) => ColEnc::Dec256(dc!(Decimal256Array)),
2907 DataType::Float32 => ColEnc::F32(dc!(Float32Array)),
2908 DataType::Float64 => ColEnc::F64(dc!(Float64Array)),
2909 _ => ColEnc::Other(col),
2910 }
2911 }
2912
2913 #[inline]
2914 fn write(
2915 &self,
2916 buf: &mut Vec<u8>,
2917 row: usize,
2918 itoa_buf: &mut itoa::Buffer,
2919 ryu_buf: &mut ryu::Buffer,
2920 ) {
2921 macro_rules! int {
2922 ($arr:expr) => {{
2923 if $arr.is_null(row) {
2924 buf.extend_from_slice(b"null");
2925 } else {
2926 buf.extend_from_slice(itoa_buf.format($arr.value(row)).as_bytes());
2927 }
2928 }};
2929 }
2930 match self {
2931 ColEnc::Utf8(a) => {
2932 if a.is_null(row) {
2933 buf.extend_from_slice(b"null");
2934 } else {
2935 write_str(buf, a.value(row));
2936 }
2937 }
2938 ColEnc::LargeUtf8(a) => {
2939 if a.is_null(row) {
2940 buf.extend_from_slice(b"null");
2941 } else {
2942 write_str(buf, a.value(row));
2943 }
2944 }
2945 ColEnc::Utf8View(a) => {
2946 if a.is_null(row) {
2947 buf.extend_from_slice(b"null");
2948 } else {
2949 write_str(buf, a.value(row));
2950 }
2951 }
2952 ColEnc::DictI32Utf8(keys, values) => {
2953 if keys.is_null(row) {
2954 buf.extend_from_slice(b"null");
2955 } else {
2956 let k = keys.keys().value(row) as usize;
2957 write_str(buf, values.value(k));
2958 }
2959 }
2960 ColEnc::Bool(a) => {
2961 if a.is_null(row) {
2962 buf.extend_from_slice(b"null");
2963 } else {
2964 buf.extend_from_slice(if a.value(row) { b"true" } else { b"false" });
2965 }
2966 }
2967 ColEnc::I8(a) => int!(a),
2968 ColEnc::I16(a) => int!(a),
2969 ColEnc::I32(a) => int!(a),
2970 ColEnc::I64(a) => int!(a),
2971 ColEnc::U8(a) => int!(a),
2972 ColEnc::U16(a) => int!(a),
2973 ColEnc::U32(a) => int!(a),
2974 ColEnc::U64(a) => int!(a),
2975 ColEnc::Dec128(a) => {
2976 if a.is_null(row) {
2977 buf.extend_from_slice(b"null");
2978 } else {
2979 write_str(buf, &a.value_as_string(row));
2980 }
2981 }
2982 ColEnc::Dec256(a) => {
2983 if a.is_null(row) {
2984 buf.extend_from_slice(b"null");
2985 } else {
2986 write_str(buf, &a.value_as_string(row));
2987 }
2988 }
2989 ColEnc::F32(a) => {
2990 if a.is_null(row) {
2991 buf.extend_from_slice(b"null");
2992 } else {
2993 let v = a.value(row);
2994 if v.is_finite() {
2995 buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2996 } else {
2997 buf.extend_from_slice(b"null");
2998 }
2999 }
3000 }
3001 ColEnc::F64(a) => {
3002 if a.is_null(row) {
3003 buf.extend_from_slice(b"null");
3004 } else {
3005 let v = a.value(row);
3006 if v.is_finite() {
3007 buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
3008 } else {
3009 buf.extend_from_slice(b"null");
3010 }
3011 }
3012 }
3013 ColEnc::Other(col) => {
3014 if col.is_null(row) {
3015 buf.extend_from_slice(b"null");
3016 } else {
3017 write_value(buf, *col, row);
3018 }
3019 }
3020 }
3021 }
3022}
3023
3024#[inline]
3025fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
3026 match col.data_type() {
3027 DataType::Utf8 => write_str(
3028 buf,
3029 col.as_any()
3030 .downcast_ref::<StringArray>()
3031 .unwrap()
3032 .value(row),
3033 ),
3034 DataType::LargeUtf8 => write_str(
3035 buf,
3036 col.as_any()
3037 .downcast_ref::<LargeStringArray>()
3038 .unwrap()
3039 .value(row),
3040 ),
3041 DataType::Utf8View => write_str(
3042 buf,
3043 col.as_any()
3044 .downcast_ref::<StringViewArray>()
3045 .unwrap()
3046 .value(row),
3047 ),
3048 DataType::Dictionary(key, value)
3049 if matches!(key.as_ref(), DataType::Int32)
3050 && matches!(value.as_ref(), DataType::Utf8) =>
3051 {
3052 let dict = col
3053 .as_any()
3054 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
3055 .unwrap();
3056 let keys = dict.keys();
3057 let values = dict
3058 .values()
3059 .as_any()
3060 .downcast_ref::<StringArray>()
3061 .unwrap();
3062 let k = keys.value(row) as usize;
3063 write_str(buf, values.value(k));
3064 }
3065 DataType::Boolean => {
3066 let v = col
3067 .as_any()
3068 .downcast_ref::<BooleanArray>()
3069 .unwrap()
3070 .value(row);
3071 buf.extend_from_slice(if v { b"true" } else { b"false" });
3072 }
3073 DataType::Int8 => {
3074 let mut b = itoa::Buffer::new();
3075 buf.extend_from_slice(
3076 b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
3077 .as_bytes(),
3078 );
3079 }
3080 DataType::Int16 => {
3081 let mut b = itoa::Buffer::new();
3082 buf.extend_from_slice(
3083 b.format(
3084 col.as_any()
3085 .downcast_ref::<Int16Array>()
3086 .unwrap()
3087 .value(row),
3088 )
3089 .as_bytes(),
3090 );
3091 }
3092 DataType::Int32 => {
3093 let mut b = itoa::Buffer::new();
3094 buf.extend_from_slice(
3095 b.format(
3096 col.as_any()
3097 .downcast_ref::<Int32Array>()
3098 .unwrap()
3099 .value(row),
3100 )
3101 .as_bytes(),
3102 );
3103 }
3104 DataType::Int64 => {
3105 let mut b = itoa::Buffer::new();
3106 buf.extend_from_slice(
3107 b.format(
3108 col.as_any()
3109 .downcast_ref::<Int64Array>()
3110 .unwrap()
3111 .value(row),
3112 )
3113 .as_bytes(),
3114 );
3115 }
3116 DataType::UInt8 => {
3117 let mut b = itoa::Buffer::new();
3118 buf.extend_from_slice(
3119 b.format(
3120 col.as_any()
3121 .downcast_ref::<UInt8Array>()
3122 .unwrap()
3123 .value(row),
3124 )
3125 .as_bytes(),
3126 );
3127 }
3128 DataType::UInt16 => {
3129 let mut b = itoa::Buffer::new();
3130 buf.extend_from_slice(
3131 b.format(
3132 col.as_any()
3133 .downcast_ref::<UInt16Array>()
3134 .unwrap()
3135 .value(row),
3136 )
3137 .as_bytes(),
3138 );
3139 }
3140 DataType::UInt32 => {
3141 let mut b = itoa::Buffer::new();
3142 buf.extend_from_slice(
3143 b.format(
3144 col.as_any()
3145 .downcast_ref::<UInt32Array>()
3146 .unwrap()
3147 .value(row),
3148 )
3149 .as_bytes(),
3150 );
3151 }
3152 DataType::UInt64 => {
3153 let mut b = itoa::Buffer::new();
3154 buf.extend_from_slice(
3155 b.format(
3156 col.as_any()
3157 .downcast_ref::<UInt64Array>()
3158 .unwrap()
3159 .value(row),
3160 )
3161 .as_bytes(),
3162 );
3163 }
3164 DataType::Decimal128(_, _) => {
3165 let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
3166 write_str(buf, &arr.value_as_string(row));
3167 }
3168 DataType::Decimal256(_, _) => {
3169 let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
3170 write_str(buf, &arr.value_as_string(row));
3171 }
3172 DataType::Float32 => {
3173 let v = col
3174 .as_any()
3175 .downcast_ref::<Float32Array>()
3176 .unwrap()
3177 .value(row);
3178 if v.is_finite() {
3179 let mut b = ryu::Buffer::new();
3180 buf.extend_from_slice(b.format_finite(v).as_bytes());
3181 } else {
3182 buf.extend_from_slice(b"null");
3183 }
3184 }
3185 DataType::Float64 => {
3186 let v = col
3187 .as_any()
3188 .downcast_ref::<Float64Array>()
3189 .unwrap()
3190 .value(row);
3191 if v.is_finite() {
3192 let mut b = ryu::Buffer::new();
3193 buf.extend_from_slice(b.format_finite(v).as_bytes());
3194 } else {
3195 buf.extend_from_slice(b"null");
3196 }
3197 }
3198 other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
3203 }
3204}
3205
3206#[inline]
3207fn write_str(buf: &mut Vec<u8>, s: &str) {
3208 buf.push(b'"');
3209 for &byte in s.as_bytes() {
3210 match byte {
3211 b'"' => buf.extend_from_slice(b"\\\""),
3212 b'\\' => buf.extend_from_slice(b"\\\\"),
3213 b'\n' => buf.extend_from_slice(b"\\n"),
3214 b'\r' => buf.extend_from_slice(b"\\r"),
3215 b'\t' => buf.extend_from_slice(b"\\t"),
3216 0x00..=0x1f => {
3217 buf.extend_from_slice(b"\\u00");
3218 const HEX: &[u8] = b"0123456789abcdef";
3219 buf.push(HEX[(byte >> 4) as usize]);
3220 buf.push(HEX[(byte & 0xf) as usize]);
3221 }
3222 b => buf.push(b),
3223 }
3224 }
3225 buf.push(b'"');
3226}
3227
3228#[async_trait]
3233impl Backend for Store {
3234 fn names(&self) -> Vec<String> {
3235 Store::names(self)
3236 }
3237
3238 fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
3239 let st = self.dataset(name)?;
3240 Ok(DatasetSummary {
3241 name: st.schema.name.clone(),
3242 columns: st.schema.columns.len(),
3243 rows: st.num_rows(),
3244 })
3245 }
3246
3247 fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
3248 let st = self.dataset(name)?;
3249 Ok(Arc::new(st.schema.clone()))
3250 }
3251
3252 fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
3253 let st = self.dataset(name)?;
3254 let mut cols: Vec<String> = st
3257 .schema
3258 .columns
3259 .iter()
3260 .map(|c| c.name.clone())
3261 .filter(|n| st.index.contains_key(n))
3262 .collect();
3263 let mut extras: Vec<String> = st
3266 .index
3267 .keys()
3268 .filter(|n| !cols.iter().any(|c| c == *n))
3269 .cloned()
3270 .collect();
3271 extras.sort();
3272 cols.extend(extras);
3273 Ok(cols)
3274 }
3275
3276 async fn sample(&self, name: &str) -> Result<String, AppError> {
3277 Store::sample(self, name).await
3278 }
3279
3280 async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
3281 Store::query(self, name, req).await
3282 }
3283
3284 async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
3285 Store::query_arrow(self, name, req).await
3286 }
3287
3288 async fn query_arrow_stream(
3289 &self,
3290 name: &str,
3291 req: &QueryRequest,
3292 ) -> Result<ArrowIpcStream, AppError> {
3293 Store::query_arrow_stream(self, name, req).await
3294 }
3295
3296 async fn query_arrow_stream_all(
3297 &self,
3298 name: &str,
3299 req: &QueryRequest,
3300 ) -> Result<ArrowIpcStream, AppError> {
3301 Store::query_arrow_stream_all(self, name, req).await
3302 }
3303
3304 async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
3305 Store::count(self, name, req).await
3306 }
3307
3308 async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
3309 Store::query_sql(self, sql, max_rows).await
3310 }
3311
3312 async fn query_sql_arrow_stream(
3313 &self,
3314 sql: &str,
3315 max_rows: u64,
3316 ) -> Result<ArrowIpcStream, AppError> {
3317 Store::query_sql_arrow_stream(self, sql, max_rows).await
3318 }
3319
3320 async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
3321 Store::parquet(self, name).await
3322 }
3323
3324 async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
3325 Store::reload(self, name).await
3326 }
3327
3328 async fn register(&self, cfg: DatasetConfig) -> Result<DatasetSummary, AppError> {
3329 Store::register(self, cfg).await
3330 }
3331}
3332
3333#[cfg(test)]
3334mod tests {
3335 use super::is_s3_access_denied;
3336
3337 #[test]
3338 fn detects_s3_access_denied_variants() {
3339 for msg in [
3341 "Generic S3 error: Error performing get request: response error \"<Error><Code>AccessDenied</Code></Error>\", status: 403",
3342 "Client error with status 403 Forbidden",
3343 "S3 error: Access Denied",
3344 "request failed: 403 Forbidden",
3345 ] {
3346 assert!(is_s3_access_denied(msg), "should flag: {msg}");
3347 }
3348 }
3349
3350 #[test]
3351 fn ignores_unrelated_errors() {
3352 for msg in [
3353 "Not a Delta table: Generic delta kernel error: No files in log segment",
3354 "object at location data/part.parquet not found",
3355 "failed to infer parquet schema: invalid magic bytes",
3356 ] {
3357 assert!(!is_s3_access_denied(msg), "should not flag: {msg}");
3358 }
3359 }
3360}
3361