1use std::borrow::Cow;
2use std::cmp::Ordering;
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use arc_swap::ArcSwap;
8use arrow::array::{
9 Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
10 Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
11 StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
12};
13use arrow::compute;
14use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
15use arrow::datatypes::{DataType, Field, Schema};
16use async_trait::async_trait;
17use parquet::arrow::ProjectionMask;
18use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
19use serde_json::Value as JsonValue;
20
21use datafusion::datasource::file_format::parquet::ParquetFormat;
22use datafusion::datasource::listing::{
23 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
24};
25use datafusion::datasource::{MemTable, TableProvider};
26use datafusion::execution::cache::DefaultListFilesCache;
27use datafusion::execution::cache::cache_manager::CacheManagerConfig;
28use datafusion::execution::runtime_env::RuntimeEnvBuilder;
29use datafusion::prelude::{SessionConfig, SessionContext};
30use datafusion::scalar::ScalarValue;
31
32use object_store::aws::AmazonS3Builder;
33use url::Url;
34
35use datapress_core::backend::{
36 ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
37};
38use datapress_core::config::{
39 AddressingStyle, AppConfig, DataFusionConfig, DatasetConfig, IndexConfig, IndexMode,
40 Partitioning, ResolvedCreds, S3Config, ServerConfig, SourceKind,
41};
42use datapress_core::errors::AppError;
43use datapress_core::models::{CountRequest, Predicate, QueryRequest};
44use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
45
46type FastMap<K, V> = HashMap<K, V, ahash::RandomState>;
55
56type EqIndex = FastMap<String, FastMap<String, Vec<u32>>>;
58
59pub struct DatasetState {
74 pub schema: DatasetSchema,
75 pub data: Vec<RecordBatch>,
76 pub arrow_schema: Arc<Schema>,
77 pub index: EqIndex,
78 pub lazy: bool,
79}
80
81impl DatasetState {
82 pub fn num_rows(&self) -> usize {
84 self.data.iter().map(|b| b.num_rows()).sum()
85 }
86}
87
88pub struct Store {
93 ctx: SessionContext,
94 max_page_size: u64,
95 configs: HashMap<String, DatasetConfig>,
98 datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
100 reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
103}
104
105impl Store {
106 pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
108 if cfg
111 .datasets
112 .iter()
113 .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
114 {
115 deltalake::aws::register_handlers(None);
116 }
117
118 let ctx = build_tuned_context(&cfg.datafusion);
124 let mut datasets = HashMap::with_capacity(cfg.datasets.len());
125 let mut configs = HashMap::with_capacity(cfg.datasets.len());
126
127 for d in &cfg.datasets {
128 log::info!(
129 "Loading dataset '{}' ({} @ {})",
130 d.name,
131 d.source.kind.as_str(),
132 d.source.location
133 );
134 let d: std::borrow::Cow<'_, DatasetConfig> = match should_force_lazy(d, &cfg.server)
138 .await
139 {
140 Some(bytes) => {
141 log::info!(
142 "dataset '{}': {:.1} MiB exceeds force_lazy_above_mb = {} → forcing lazy",
143 d.name,
144 bytes as f64 / (1024.0 * 1024.0),
145 cfg.server.force_lazy_above_mb
146 );
147 let mut forced = d.clone();
148 forced.lazy = true;
149 std::borrow::Cow::Owned(forced)
150 }
151 None => std::borrow::Cow::Borrowed(d),
152 };
153 let d = d.as_ref();
154 let (state, provider) = match build_dataset(d, &ctx).await {
155 Ok(built) => built,
156 Err(AppError::EmptyDataset(msg)) => {
157 log::warn!("skipping empty dataset '{}': {msg}", d.name);
158 continue;
159 }
160 Err(e) if d.source.is_s3() && is_s3_access_denied(&e.to_string()) => {
166 log::warn!(
167 "skipping dataset '{}': S3 access denied — check credentials \
168 and bucket policy ({e})",
169 d.name
170 );
171 continue;
172 }
173 Err(e) => return Err(e),
174 };
175 ctx.register_table(d.name.as_str(), provider)?;
176 datasets.insert(d.name.clone(), Arc::new(state));
177 configs.insert(d.name.clone(), d.clone());
178 }
179 Ok(Self {
180 ctx,
181 max_page_size: cfg.server.max_page_size.max(1),
182 configs,
183 datasets: ArcSwap::from_pointee(datasets),
184 reload_locks: Mutex::new(HashMap::new()),
185 })
186 }
187
188 pub fn names(&self) -> Vec<String> {
190 let snap = self.datasets.load();
191 let mut v: Vec<String> = snap.keys().cloned().collect();
192 v.sort();
193 v
194 }
195
196 pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
197 self.datasets
198 .load()
199 .get(name)
200 .cloned()
201 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
202 }
203
204 pub async fn sample(&self, name: &str) -> Result<String, AppError> {
207 let st = self.dataset(name)?;
208
209 if st.lazy {
211 let table = DatasetSchema::quote_ident(&st.schema.name);
212 let sql = format!("SELECT * FROM {table} LIMIT 1");
213 let df = self.ctx.sql(&sql).await?;
214 let batches = df.collect().await?;
215 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
216 return Ok("null".into());
217 }
218 let arr = serialize(&batches[0].slice(0, 1))?;
219 let trimmed = arr.trim();
220 let inner = trimmed
221 .strip_prefix('[')
222 .and_then(|s| s.strip_suffix(']'))
223 .unwrap_or(trimmed);
224 return Ok(inner.to_string());
225 }
226
227 let first = match st.data.iter().find(|b| b.num_rows() > 0) {
228 Some(b) => b,
229 None => return Ok("null".into()),
230 };
231 let arr = serialize(&first.slice(0, 1))?;
232 let trimmed = arr.trim();
234 let inner = trimmed
235 .strip_prefix('[')
236 .and_then(|s| s.strip_suffix(']'))
237 .unwrap_or(trimmed);
238 Ok(inner.to_string())
239 }
240
241 pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
246 let cfg = self
248 .configs
249 .get(name)
250 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
251 .clone();
252
253 let lock = {
255 let mut locks = self.reload_locks.lock().unwrap();
256 locks
257 .entry(name.to_string())
258 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
259 .clone()
260 };
261 let _guard = lock.lock().await;
262
263 let started = std::time::Instant::now();
264
265 if let Some(cache) = self.ctx.runtime_env().cache_manager.get_list_files_cache() {
281 cache.clear();
282 }
283
284 let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
287 let rows = state.num_rows();
288
289 let _ = self.ctx.deregister_table(name)?;
295 self.ctx.register_table(name, provider)?;
296
297 let mut new_map = (**self.datasets.load()).clone();
298 new_map.insert(name.to_string(), Arc::new(state));
299 self.datasets.store(Arc::new(new_map));
300
301 let elapsed_ms = started.elapsed().as_millis();
302 log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
303 Ok(ReloadStats { rows, elapsed_ms })
304 }
305
306 pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
310 let batch = self.query_batch(name, req).await?;
311 if batch.num_rows() == 0 {
312 return Ok("[]".to_string());
313 }
314 serialize(&batch)
315 }
316
317 fn canonicalize_sql(&self, sql: &str) -> String {
323 let snap = self.datasets.load();
324 let mut tables: HashMap<String, String> = HashMap::with_capacity(snap.len());
325 let mut columns: HashMap<String, String> = HashMap::new();
326 for (name, state) in snap.iter() {
327 tables.insert(name.to_lowercase(), name.clone());
328 for col in &state.schema.columns {
329 columns
330 .entry(col.name.to_lowercase())
331 .or_insert_with(|| col.name.clone());
332 }
333 }
334 datapress_core::sql::canonicalize_identifiers(sql, &tables, &columns)
335 }
336
337 pub async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
344 let cap = max_rows.max(1);
345 let sql = self.canonicalize_sql(sql);
346 let wrapped = if datapress_core::sql::is_describe(&sql) {
351 sql
352 } else {
353 format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
354 };
355 let df = self.ctx.sql(&wrapped).await?;
356 let batches = df.collect().await?;
357 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
358 return Ok("[]".to_string());
359 }
360 let batch = if batches.len() == 1 {
361 batches.into_iter().next().expect("checked len")
362 } else {
363 compute::concat_batches(&batches[0].schema(), batches.iter())?
364 };
365 let batch = if batch.num_rows() as u64 > cap {
368 batch.slice(0, cap as usize)
369 } else {
370 batch
371 };
372 serialize(&batch)
373 }
374
375 pub async fn query_sql_arrow_stream(
379 &self,
380 sql: &str,
381 max_rows: u64,
382 ) -> Result<ArrowIpcStream, AppError> {
383 let cap = max_rows.max(1);
384 let sql = self.canonicalize_sql(sql);
385 let wrapped = if datapress_core::sql::is_describe(&sql) {
389 sql
390 } else {
391 format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
392 };
393 let df = self.ctx.sql(&wrapped).await?;
394 let batches = df.collect().await?;
395 Ok(stream_arrow_batches(batches))
396 }
397
398 pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
402 let batch = self.query_batch(name, req).await?;
403 let schema = batch.schema();
404 let mut buf = Vec::with_capacity(8 * 1024);
405 {
406 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
407 if batch.num_rows() > 0 {
408 w.write(&batch)?;
409 }
410 w.finish()?;
411 }
412 Ok(buf)
413 }
414
415 pub async fn query_arrow_stream(
416 &self,
417 name: &str,
418 req: &QueryRequest,
419 ) -> Result<ArrowIpcStream, AppError> {
420 let batches = self.query_batches(name, req).await?;
421 Ok(stream_arrow_batches(batches))
422 } pub async fn query_arrow_stream_all(
423 &self,
424 name: &str,
425 req: &QueryRequest,
426 ) -> Result<ArrowIpcStream, AppError> {
427 let batches = self.query_batches_all(name, req).await?;
428 Ok(stream_arrow_batches(batches))
429 }
430
431 pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
439 let req = QueryRequest {
441 columns: Vec::new(),
442 predicates: Vec::new(),
443 group_by: Vec::new(),
444 aggregations: Vec::new(),
445 having: Vec::new(),
446 distinct: false,
447 order_by: Vec::new(),
448 limit: None,
449 page: 1,
450 page_size: 1,
451 };
452 let st = self.dataset(name)?;
453 let batches = self.query_batches_all(name, &req).await?;
454 let schema = batches
458 .first()
459 .map(|b| b.schema())
460 .unwrap_or_else(|| st.arrow_schema.clone());
461
462 let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
463 {
464 let props = parquet::file::properties::WriterProperties::builder()
465 .set_compression(parquet::basic::Compression::SNAPPY)
466 .build();
467 let mut writer =
468 parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
469 .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
470 for batch in &batches {
471 if batch.num_rows() > 0 {
472 writer
473 .write(batch)
474 .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
475 }
476 }
477 writer
478 .close()
479 .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
480 }
481 Ok(bytes::Bytes::from(buf))
482 }
483
484 async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
487 let batches = self.query_batches(name, req).await?;
488 if batches.is_empty() {
489 return Ok(RecordBatch::new_empty(Arc::new(
490 arrow::datatypes::Schema::empty(),
491 )));
492 }
493 if batches.len() == 1 {
494 return Ok(batches.into_iter().next().expect("checked len"));
495 }
496 if batches.iter().all(|b| b.num_rows() == 0) {
497 return Ok(RecordBatch::new_empty(batches[0].schema()));
498 }
499 let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
500 Ok(batch)
501 }
502
503 async fn query_batches(
507 &self,
508 name: &str,
509 req: &QueryRequest,
510 ) -> Result<Vec<RecordBatch>, AppError> {
511 let st = self.dataset(name)?;
512
513 let page = req.page.max(1);
514 let page_size = req.page_size.clamp(1, self.max_page_size);
515 let offset = ((page - 1) * page_size) as usize;
516 let limit = page_size as usize;
517
518 self.query_batches_inner(st, req, Some((offset, limit)))
519 .await
520 }
521
522 async fn query_batches_all(
526 &self,
527 name: &str,
528 req: &QueryRequest,
529 ) -> Result<Vec<RecordBatch>, AppError> {
530 let st = self.dataset(name)?;
531 self.query_batches_inner(st, req, None).await
532 }
533
534 async fn query_batches_inner(
535 &self,
536 st: Arc<DatasetState>,
537 req: &QueryRequest,
538 page_window: Option<(usize, usize)>,
539 ) -> Result<Vec<RecordBatch>, AppError> {
540 let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
541
542 let can_fast_path = !st.lazy
549 && req.order_by.is_empty()
550 && (page_window.is_none() || req.limit.is_none())
551 && req.group_by.is_empty()
552 && !req.distinct;
553
554 if can_fast_path {
555 let total = st.num_rows();
556
557 if req.predicates.is_empty() {
560 if page_window.is_none() && req.limit.is_none() {
561 return st
562 .data
563 .iter()
564 .cloned()
565 .map(|batch| project(&st.schema, batch, &req.columns))
566 .collect();
567 }
568 let start = offset.min(total);
569 let len = limit.min(total - start);
570 let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
571 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
572 }
573
574 if let Some(rows) = try_index(&st.index, &req.predicates) {
577 let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
578 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
579 }
580 }
581
582 let (sql, params) = match page_window {
584 Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
585 None => build_query_stream_sql(&st.schema, req)?,
586 };
587 let mut df = self.ctx.sql(&sql).await?;
588 if !params.is_empty() {
589 df = df.with_param_values(params)?;
590 }
591 let batches = df.collect().await?;
592 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
593 let schema = batches
594 .first()
595 .map(|b| b.schema())
596 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
597 return Ok(vec![RecordBatch::new_empty(schema)]);
598 }
599 Ok(batches)
600 }
601}
602
603fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
604 let schema = batches
605 .first()
606 .map(|batch| batch.schema())
607 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
608 let (mut writer, stream) = arrow_ipc_stream_channel(8);
609
610 tokio::task::spawn_blocking(move || {
611 let result = (|| -> Result<(), AppError> {
612 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
613 for batch in batches {
614 if batch.num_rows() > 0 {
615 w.write(&batch)?;
616 }
617 }
618 w.finish()?;
619 Ok(())
620 })();
621 if let Err(err) = result {
622 log::error!("datafusion arrow stream failed: {err}");
623 writer.send_error(err);
624 }
625 });
626
627 stream
628}
629
630impl Store {
631 pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
635 let st = self.dataset(name)?;
636
637 if !st.lazy {
638 if req.predicates.is_empty() {
640 return Ok(st.num_rows() as i64);
641 }
642 if let Some(rows) = try_index(&st.index, &req.predicates) {
644 return Ok(rows.len() as i64);
645 }
646 }
647
648 let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
651 let mut df = self.ctx.sql(&sql).await?;
652 if !params.is_empty() {
653 df = df.with_param_values(params)?;
654 }
655 let batches = df.collect().await?;
656 let n = batches
657 .first()
658 .and_then(|b| {
659 b.column(0)
660 .as_any()
661 .downcast_ref::<arrow::array::Int64Array>()
662 })
663 .filter(|a| !a.is_empty())
664 .map(|a| a.value(0))
665 .unwrap_or(0);
666 Ok(n)
667 }
668}
669
670fn build_tuned_context(cfg: &DataFusionConfig) -> SessionContext {
697 let mut config = SessionConfig::new();
698 {
699 let opts = config.options_mut();
700 opts.execution.parquet.pushdown_filters = cfg.pushdown_filters;
701 opts.execution.parquet.reorder_filters = cfg.reorder_filters;
702 }
703
704 if !cfg.list_files_cache {
705 return SessionContext::new_with_config(config);
706 }
707
708 let ttl = (cfg.list_files_cache_ttl_secs > 0)
711 .then(|| Duration::from_secs(cfg.list_files_cache_ttl_secs));
712 let list_cache = Arc::new(DefaultListFilesCache::new(
713 cfg.list_files_cache_mb.saturating_mul(1024 * 1024),
714 ttl,
715 ));
716 let cache_manager = CacheManagerConfig::default().with_list_files_cache(Some(list_cache));
717
718 let runtime = RuntimeEnvBuilder::new()
719 .with_cache_manager(cache_manager)
720 .build_arc()
721 .expect("failed to build DataFusion runtime env");
722
723 SessionContext::new_with_config_rt(config, runtime)
724}
725
726async fn build_dataset(
727 d: &DatasetConfig,
728 ctx: &SessionContext,
729) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
730 if d.lazy {
737 match (d.source.kind, d.source.is_s3()) {
738 (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
739 (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
740 (SourceKind::Delta, _) => return build_lazy_delta(d, ctx).await,
741 }
742 }
743
744 let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
749 (SourceKind::Parquet, false) => read_local_parquet(d)?,
750 (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
751 (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
752 (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
753 };
754 if raw_batches.is_empty() {
755 return Err(AppError::EmptyDataset(format!(
756 "dataset '{}': source produced no batches",
757 d.name
758 )));
759 }
760 if raw_batches.iter().all(|b| b.num_rows() == 0) {
765 return Err(AppError::EmptyDataset(format!(
766 "dataset '{}': source has a schema but no rows",
767 d.name
768 )));
769 }
770
771 let chunks = raw_batches;
772 let arrow_sch = chunks[0].schema();
773
774 let columns: Vec<ColumnInfo> = arrow_sch
776 .fields()
777 .iter()
778 .map(|f| {
779 let dt = f.data_type();
780 ColumnInfo {
781 name: f.name().clone(),
782 logical: arrow_to_logical(dt),
783 sql_type: format!("{dt:?}"),
784 nullable: f.is_nullable(),
785 }
786 })
787 .collect();
788 let schema = DatasetSchema::new(&d.name, columns);
789
790 let index = build_eq_index_with_policy(&chunks, &d.index);
795
796 let n_parts = std::thread::available_parallelism()
801 .map(|n| n.get())
802 .unwrap_or(4);
803 let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
804 for (i, b) in chunks.iter().enumerate() {
805 if b.num_rows() == 0 {
806 continue;
807 }
808 parts[i % n_parts].push(b.clone());
809 }
810 parts.retain(|p| !p.is_empty());
811 let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
812
813 let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
814 let mem_mb: usize = chunks
815 .iter()
816 .flat_map(|b| b.columns().iter())
817 .map(|c| c.get_buffer_memory_size())
818 .sum::<usize>()
819 / 1_048_576;
820 log::info!(
821 "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
822 d.name,
823 d.source.kind.as_str(),
824 total_rows,
825 schema.columns.len(),
826 mem_mb,
827 chunks.len(),
828 index.len()
829 );
830
831 Ok((
832 DatasetState {
833 schema,
834 data: chunks,
835 arrow_schema: arrow_sch,
836 index,
837 lazy: false,
838 },
839 provider,
840 ))
841}
842
843async fn build_lazy_local_parquet(
848 d: &DatasetConfig,
849 ctx: &SessionContext,
850) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
851 let (url, part_keys) = lazy_local_listing(d)?;
852
853 let mut opts =
854 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
855 if !part_keys.is_empty() {
856 opts = opts.with_table_partition_cols(
857 part_keys
858 .iter()
859 .map(|k| (k.clone(), DataType::Utf8))
860 .collect(),
861 );
862 }
863
864 let session_state = ctx.state();
865 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
868 AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
869 })?;
870
871 if file_schema.fields().is_empty() {
875 return Err(AppError::EmptyDataset(format!(
876 "dataset '{}': no .parquet files at {}",
877 d.name, d.source.location
878 )));
879 }
880
881 let cfg = ListingTableConfig::new(url)
882 .with_listing_options(opts)
883 .with_schema(file_schema.clone());
884 let table = ListingTable::try_new(cfg).map_err(|e| {
885 AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
886 })?;
887 let provider: Arc<dyn TableProvider> = Arc::new(table);
888
889 let mut fields: Vec<Field> = file_schema
891 .fields()
892 .iter()
893 .map(|f| f.as_ref().clone())
894 .collect();
895 for k in &part_keys {
896 if !fields.iter().any(|f| f.name() == k) {
897 fields.push(Field::new(k, DataType::Utf8, false));
898 }
899 }
900 let arrow_sch = Arc::new(Schema::new(fields));
901
902 let columns: Vec<ColumnInfo> = arrow_sch
903 .fields()
904 .iter()
905 .map(|f| {
906 let dt = f.data_type();
907 ColumnInfo {
908 name: f.name().clone(),
909 logical: arrow_to_logical(dt),
910 sql_type: format!("{dt:?}"),
911 nullable: f.is_nullable(),
912 }
913 })
914 .collect();
915 let schema = DatasetSchema::new(&d.name, columns);
916
917 log::info!(
918 "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
919 d.name,
920 d.source.kind.as_str(),
921 schema.columns.len(),
922 part_keys.len()
923 );
924
925 Ok((
926 DatasetState {
927 schema,
928 data: Vec::new(),
929 arrow_schema: arrow_sch,
930 index: EqIndex::default(),
931 lazy: true,
932 },
933 provider,
934 ))
935}
936
937fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
942 let loc = &d.source.location;
943
944 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
945 let parts: Vec<&str> = loc.split('/').collect();
946 let first_wild = parts
947 .iter()
948 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
949 .unwrap_or(parts.len());
950 let base = parts[..first_wild].join("/");
951 let base = if base.is_empty() {
952 "/".to_string()
953 } else {
954 base
955 };
956 let upper = parts.len().saturating_sub(1);
959 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
960 .iter()
961 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
962 .filter(|k| !k.is_empty())
963 .collect();
964 return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
965 }
966
967 let path = std::path::Path::new(loc);
968 if path.is_dir() {
969 let keys = discover_hive_keys(path);
970 return Ok((dir_url(path, d)?, keys));
971 }
972
973 let url = ListingTableUrl::parse(loc)
974 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
975 Ok((url, Vec::new()))
976}
977
978fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
981 let s = path.to_str().ok_or_else(|| {
982 AppError::Internal(format!(
983 "dataset '{}': non-utf8 path {}",
984 d.name,
985 path.display()
986 ))
987 })?;
988 let s = if s.ends_with('/') {
989 s.to_string()
990 } else {
991 format!("{s}/")
992 };
993 ListingTableUrl::parse(&s)
994 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
995}
996
997fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
1001 let mut keys = Vec::new();
1002 let mut cur = base.to_path_buf();
1003 loop {
1004 let Ok(rd) = std::fs::read_dir(&cur) else {
1005 break;
1006 };
1007 let mut next: Option<(String, std::path::PathBuf)> = None;
1008 for entry in rd.flatten() {
1009 let p = entry.path();
1010 if !p.is_dir() {
1011 continue;
1012 }
1013 let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
1014 continue;
1015 };
1016 if let Some((k, v)) = name.split_once('=')
1017 && !k.is_empty()
1018 && !v.is_empty()
1019 {
1020 next = Some((k.to_string(), p));
1021 break;
1022 }
1023 }
1024 match next {
1025 Some((k, p)) => {
1026 keys.push(k);
1027 cur = p;
1028 }
1029 None => break,
1030 }
1031 }
1032 keys
1033}
1034
1035async fn build_lazy_s3_parquet(
1041 d: &DatasetConfig,
1042 ctx: &SessionContext,
1043) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1044 register_s3_object_store(d, ctx)?;
1045
1046 let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
1047
1048 if file_schema.fields().is_empty() {
1051 return Err(AppError::EmptyDataset(format!(
1052 "dataset '{}': no .parquet files at {}",
1053 d.name, d.source.location
1054 )));
1055 }
1056
1057 let mut fields: Vec<Field> = file_schema
1059 .fields()
1060 .iter()
1061 .map(|f| f.as_ref().clone())
1062 .collect();
1063 for k in &part_keys {
1064 if !fields.iter().any(|f| f.name() == k) {
1065 fields.push(Field::new(k, DataType::Utf8, false));
1066 }
1067 }
1068 let arrow_sch = Arc::new(Schema::new(fields));
1069
1070 let columns: Vec<ColumnInfo> = arrow_sch
1071 .fields()
1072 .iter()
1073 .map(|f| {
1074 let dt = f.data_type();
1075 ColumnInfo {
1076 name: f.name().clone(),
1077 logical: arrow_to_logical(dt),
1078 sql_type: format!("{dt:?}"),
1079 nullable: f.is_nullable(),
1080 }
1081 })
1082 .collect();
1083 let schema = DatasetSchema::new(&d.name, columns);
1084
1085 log::info!(
1086 "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
1087 d.name,
1088 d.source.kind.as_str(),
1089 schema.columns.len(),
1090 part_keys.len()
1091 );
1092
1093 Ok((
1094 DatasetState {
1095 schema,
1096 data: Vec::new(),
1097 arrow_schema: arrow_sch,
1098 index: EqIndex::default(),
1099 lazy: true,
1100 },
1101 provider,
1102 ))
1103}
1104
1105async fn build_s3_listing_table(
1111 d: &DatasetConfig,
1112 ctx: &SessionContext,
1113) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
1114 let (url, part_keys) = s3_listing(d, ctx).await?;
1115
1116 let mut opts =
1117 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
1118 if !part_keys.is_empty() {
1119 opts = opts.with_table_partition_cols(
1120 part_keys
1121 .iter()
1122 .map(|k| (k.clone(), DataType::Utf8))
1123 .collect(),
1124 );
1125 }
1126
1127 let session_state = ctx.state();
1128 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
1129 AppError::Internal(format!(
1130 "dataset '{}': infer parquet schema on s3: {e}",
1131 d.name
1132 ))
1133 })?;
1134
1135 let cfg = ListingTableConfig::new(url)
1136 .with_listing_options(opts)
1137 .with_schema(file_schema.clone());
1138 let table = ListingTable::try_new(cfg).map_err(|e| {
1139 AppError::Internal(format!(
1140 "dataset '{}': ListingTable::try_new (s3): {e}",
1141 d.name
1142 ))
1143 })?;
1144 Ok((Arc::new(table), file_schema, part_keys))
1145}
1146
1147async fn s3_listing(
1153 d: &DatasetConfig,
1154 ctx: &SessionContext,
1155) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1156 let s3 = d.s3.clone().unwrap_or_default();
1157 let want_partitions = !matches!(s3.partitioning, Partitioning::None);
1158 let loc = &d.source.location;
1159
1160 if d.source.has_glob() {
1161 let (base, keys) = split_glob_base_keys(loc);
1162 let base = format!("{}/", base.trim_end_matches('/'));
1163 let url = ListingTableUrl::parse(&base).map_err(|e| {
1164 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1165 })?;
1166 let keys = if want_partitions { keys } else { Vec::new() };
1167 return Ok((url, keys));
1168 }
1169
1170 let base = if loc.ends_with('/') {
1171 loc.clone()
1172 } else {
1173 format!("{loc}/")
1174 };
1175 let url = ListingTableUrl::parse(&base).map_err(|e| {
1176 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1177 })?;
1178 let keys = if want_partitions {
1179 discover_s3_hive_keys(ctx, &url).await
1180 } else {
1181 Vec::new()
1182 };
1183 Ok((url, keys))
1184}
1185
1186fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
1190 let parts: Vec<&str> = loc.split('/').collect();
1191 let first_wild = parts
1192 .iter()
1193 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1194 .unwrap_or(parts.len());
1195 let base = parts[..first_wild].join("/");
1196 let base = if base.is_empty() {
1197 "/".to_string()
1198 } else {
1199 base
1200 };
1201 let upper = parts.len().saturating_sub(1);
1202 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1203 .iter()
1204 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1205 .filter(|k| !k.is_empty())
1206 .collect();
1207 (base, keys)
1208}
1209
1210async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
1215 let store = match ctx.runtime_env().object_store(url.object_store()) {
1216 Ok(s) => s,
1217 Err(_) => return Vec::new(),
1218 };
1219 let mut keys = Vec::new();
1220 let mut prefix = url.prefix().clone();
1221 loop {
1222 let listing = match store.list_with_delimiter(Some(&prefix)).await {
1223 Ok(l) => l,
1224 Err(_) => break,
1225 };
1226 let mut next: Option<object_store::path::Path> = None;
1227 for cp in &listing.common_prefixes {
1228 if let Some(seg) = cp.parts().next_back() {
1229 let seg = seg.as_ref().to_string();
1230 if let Some((k, v)) = seg.split_once('=')
1231 && !k.is_empty()
1232 && !v.is_empty()
1233 {
1234 keys.push(k.to_string());
1235 next = Some(cp.clone());
1236 break;
1237 }
1238 }
1239 }
1240 match next {
1241 Some(p) => prefix = p,
1242 None => break,
1243 }
1244 }
1245 keys
1246}
1247
1248fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1265 let files = d.resolve_local_parquet_files()?;
1266 let mut all = Vec::new();
1267 let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1268 None
1269 } else {
1270 Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1271 };
1272
1273 for f in &files {
1274 let file = std::fs::File::open(f)
1275 .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1276
1277 let probe = ParquetRecordBatchReaderBuilder::try_new(
1282 file.try_clone()
1283 .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1284 )?;
1285 let parquet_schema = probe.parquet_schema().clone();
1286 let arrow_schema = probe.schema().clone();
1287 let metadata = probe.metadata().clone();
1288 drop(probe);
1289
1290 let projection = if let Some(w) = &wanted {
1292 let indices: Vec<usize> = arrow_schema
1293 .fields()
1294 .iter()
1295 .enumerate()
1296 .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1297 .map(|(i, _)| i)
1298 .collect();
1299 if indices.is_empty() {
1300 return Err(AppError::Internal(format!(
1301 "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1302 d.name,
1303 d.columns,
1304 f.display()
1305 )));
1306 }
1307 ProjectionMask::roots(&parquet_schema, indices)
1308 } else {
1309 ProjectionMask::all()
1310 };
1311
1312 let mut new_fields: Vec<Field> = arrow_schema
1320 .fields()
1321 .iter()
1322 .map(|f| f.as_ref().clone())
1323 .collect();
1324 if d.dict_encode
1325 && let Some(rg0) = metadata.row_groups().first()
1326 {
1327 for (i, fld) in arrow_schema.fields().iter().enumerate() {
1328 if !matches!(
1329 fld.data_type(),
1330 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1331 ) {
1332 continue;
1333 }
1334 if let Some(col) = rg0.columns().get(i)
1335 && col.dictionary_page_offset().is_some()
1336 {
1337 new_fields[i] = Field::new(
1338 fld.name(),
1339 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1340 fld.is_nullable(),
1341 );
1342 }
1343 }
1344 }
1345 let forced_schema = Arc::new(Schema::new(new_fields));
1346
1347 let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1348 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1349 .with_batch_size(65_536)
1350 .with_projection(projection)
1351 .build()?;
1352 let pairs = hive_pairs(f);
1356 for batch in reader {
1357 let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1358 all.push(if pairs.is_empty() {
1359 batch
1360 } else {
1361 append_partition_cols(&batch, &pairs)?
1362 });
1363 }
1364 }
1365 if all.is_empty() {
1366 return Err(AppError::Internal(format!(
1367 "dataset '{}': parquet source is empty",
1368 d.name
1369 )));
1370 }
1371 Ok(all)
1372}
1373
1374fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1377 path.components()
1378 .filter_map(|c| c.as_os_str().to_str())
1379 .filter_map(|seg| {
1380 let (k, v) = seg.split_once('=')?;
1381 if k.is_empty() || v.is_empty() || v.contains('=') {
1382 return None;
1383 }
1384 Some((k.to_string(), v.to_string()))
1385 })
1386 .collect()
1387}
1388
1389fn append_partition_cols(
1392 batch: &RecordBatch,
1393 pairs: &[(String, String)],
1394) -> Result<RecordBatch, AppError> {
1395 let n = batch.num_rows();
1396 let mut fields: Vec<Field> = batch
1397 .schema()
1398 .fields()
1399 .iter()
1400 .map(|f| f.as_ref().clone())
1401 .collect();
1402 let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1403 for (k, v) in pairs {
1404 if fields.iter().any(|f| f.name() == k) {
1405 continue;
1406 }
1407 fields.push(Field::new(k, DataType::Utf8, false));
1408 cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1409 }
1410 RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1411 .map_err(|e| AppError::Internal(e.to_string()))
1412}
1413
1414async fn read_s3_parquet(
1420 d: &DatasetConfig,
1421 ctx: &SessionContext,
1422) -> Result<Vec<RecordBatch>, AppError> {
1423 register_s3_object_store(d, ctx)?;
1424 let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1425 let df = ctx
1426 .read_table(provider)
1427 .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1428 Ok(df.collect().await?)
1429}
1430
1431async fn open_delta_table(
1438 d: &DatasetConfig,
1439 opts: HashMap<String, String>,
1440) -> Result<deltalake::DeltaTable, AppError> {
1441 let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1442 AppError::Internal(format!(
1443 "dataset '{}': bad delta location '{}': {e}",
1444 d.name, d.source.location
1445 ))
1446 })?;
1447 deltalake::open_table_with_storage_options(url, opts)
1448 .await
1449 .map_err(|e| {
1450 let msg = e.to_string();
1460 let low = msg.to_lowercase();
1461 if low.contains("no files in log segment") || low.contains("not a delta table") {
1462 AppError::EmptyDataset(format!(
1463 "delta location '{}' has no committed files ({msg})",
1464 d.source.location
1465 ))
1466 } else {
1467 AppError::Internal(format!(
1468 "dataset '{}': delta open '{}': {msg}",
1469 d.name, d.source.location
1470 ))
1471 }
1472 })
1473}
1474
1475async fn open_delta_provider(
1481 d: &DatasetConfig,
1482 opts: HashMap<String, String>,
1483) -> Result<Arc<dyn TableProvider>, AppError> {
1484 let table = open_delta_table(d, opts).await?;
1485 table.table_provider().await.map_err(|e| {
1486 AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1487 })
1488}
1489
1490fn delta_storage_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1493 if d.source.is_s3() {
1494 delta_s3_options(d)
1495 } else {
1496 Ok(HashMap::new())
1497 }
1498}
1499
1500async fn read_delta(
1504 d: &DatasetConfig,
1505 opts: HashMap<String, String>,
1506) -> Result<Vec<RecordBatch>, AppError> {
1507 let provider = open_delta_provider(d, opts).await?;
1508 let scan_ctx = SessionContext::new();
1518 let df = scan_ctx.read_table(provider).map_err(|e| {
1519 AppError::EmptyDataset(format!(
1520 "delta location '{}' could not be scanned, skipping ({e})",
1521 d.source.location
1522 ))
1523 })?;
1524 df.collect().await.map_err(|e| {
1525 AppError::EmptyDataset(format!(
1526 "delta location '{}' could not be scanned, skipping ({e})",
1527 d.source.location
1528 ))
1529 })
1530}
1531
1532async fn build_lazy_delta(
1538 d: &DatasetConfig,
1539 _ctx: &SessionContext,
1540) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1541 let table = open_delta_table(d, delta_storage_options(d)?).await?;
1542
1543 let file_count = table
1550 .get_file_uris()
1551 .map(|it| it.count())
1552 .map_err(|e| AppError::Internal(format!("dataset '{}': delta file list: {e}", d.name)))?;
1553 if file_count == 0 {
1554 return Err(AppError::EmptyDataset(format!(
1555 "delta location '{}' has a schema but no data files",
1556 d.source.location
1557 )));
1558 }
1559
1560 let provider = table.table_provider().await.map_err(|e| {
1561 AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1562 })?;
1563
1564 {
1577 let probe_ctx = SessionContext::new();
1578 let probe = probe_ctx
1579 .read_table(provider.clone())
1580 .and_then(|df| df.limit(0, Some(1)));
1581 match probe {
1582 Ok(df) => match df.collect().await {
1583 Ok(batches) if batches.iter().all(|b| b.num_rows() == 0) => {
1584 return Err(AppError::EmptyDataset(format!(
1585 "delta location '{}' resolves to no rows",
1586 d.source.location
1587 )));
1588 }
1589 Ok(_) => {}
1590 Err(e) => {
1591 return Err(AppError::EmptyDataset(format!(
1592 "delta location '{}' could not be scanned, skipping ({e})",
1593 d.source.location
1594 )));
1595 }
1596 },
1597 Err(e) => {
1598 return Err(AppError::EmptyDataset(format!(
1599 "delta location '{}' could not be scanned, skipping ({e})",
1600 d.source.location
1601 )));
1602 }
1603 }
1604 }
1605
1606 let arrow_sch = provider.schema();
1609 let columns: Vec<ColumnInfo> = arrow_sch
1610 .fields()
1611 .iter()
1612 .map(|f| {
1613 let dt = f.data_type();
1614 ColumnInfo {
1615 name: f.name().clone(),
1616 logical: arrow_to_logical(dt),
1617 sql_type: format!("{dt:?}"),
1618 nullable: f.is_nullable(),
1619 }
1620 })
1621 .collect();
1622 let schema = DatasetSchema::new(&d.name, columns);
1623
1624 log::info!(
1625 "dataset '{}' [{}, lazy]: {} cols, no materialise, no index",
1626 d.name,
1627 d.source.kind.as_str(),
1628 schema.columns.len()
1629 );
1630
1631 Ok((
1632 DatasetState {
1633 schema,
1634 data: Vec::new(),
1635 arrow_schema: arrow_sch,
1636 index: EqIndex::default(),
1637 lazy: true,
1638 },
1639 provider,
1640 ))
1641}
1642
1643fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1647 let creds = d.resolved_creds();
1648 let region = d.resolved_region();
1649 let s3 = d.s3.clone().unwrap_or_default();
1650 let (bucket, _) = d.source.s3_bucket()?;
1651
1652 let mut opts = HashMap::new();
1653 opts.insert("AWS_REGION".into(), region);
1654 if let Some(ep) = s3.effective_endpoint(bucket) {
1655 opts.insert("AWS_ENDPOINT_URL".into(), ep);
1656 }
1657 if s3.allow_http {
1658 opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1659 }
1660 opts.insert(
1661 "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1662 (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1663 );
1664 if let Some(k) = creds.access_key_id {
1665 opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1666 }
1667 if let Some(s) = creds.secret_access_key {
1668 opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1669 }
1670 if let Some(t) = creds.session_token {
1671 opts.insert("AWS_SESSION_TOKEN".into(), t);
1672 }
1673 opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1675 Ok(opts)
1676}
1677
1678fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1682 let (bucket, _key) = d.source.s3_bucket()?;
1683 let creds = d.resolved_creds();
1684 let region = d.resolved_region();
1685 let s3 = d.s3.clone().unwrap_or_default();
1686
1687 let store = build_s3(bucket, ®ion, &s3, &creds).map_err(|e| {
1688 AppError::Internal(format!(
1689 "dataset '{}': build S3 store for '{bucket}': {e}",
1690 d.name
1691 ))
1692 })?;
1693
1694 let url = Url::parse(&format!("s3://{bucket}"))
1695 .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1696 ctx.register_object_store(&url, Arc::new(store));
1697 Ok(())
1698}
1699
1700fn is_s3_access_denied(msg: &str) -> bool {
1707 let low = msg.to_lowercase();
1708 low.contains("access denied")
1709 || low.contains("accessdenied")
1710 || low.contains("forbidden")
1711 || low.contains("403")
1712}
1713
1714
1715async fn should_force_lazy(d: &DatasetConfig, server: &ServerConfig) -> Option<u64> {
1721 if d.lazy || server.force_lazy_above_mb == 0 {
1722 return None;
1723 }
1724 let threshold = server.force_lazy_above_mb.saturating_mul(1024 * 1024);
1725
1726 let bytes = if d.source.is_s3() {
1727 match estimate_s3_bytes(d).await {
1728 Ok(b) => b,
1729 Err(e) => {
1730 log::warn!(
1731 "dataset '{}': could not measure S3 size for force_lazy_above_mb: {e}",
1732 d.name
1733 );
1734 return None;
1735 }
1736 }
1737 } else {
1738 d.estimate_local_bytes()?
1739 };
1740
1741 (bytes > threshold).then_some(bytes)
1742}
1743
1744async fn estimate_s3_bytes(d: &DatasetConfig) -> Result<u64, AppError> {
1753 use futures_util::StreamExt;
1754 use object_store::ObjectStore;
1755
1756 let (bucket, _key) = d.source.s3_bucket()?;
1757 let creds = d.resolved_creds();
1758 let region = d.resolved_region();
1759 let s3 = d.s3.clone().unwrap_or_default();
1760 let store = build_s3(bucket, ®ion, &s3, &creds).map_err(|e| {
1761 AppError::Internal(format!(
1762 "dataset '{}': build S3 store for '{bucket}': {e}",
1763 d.name
1764 ))
1765 })?;
1766
1767 let (base, _keys) = split_glob_base_keys(&d.source.location);
1770 let prefix_key = base
1771 .strip_prefix("s3://")
1772 .and_then(|rest| rest.split_once('/').map(|(_bucket, key)| key))
1773 .unwrap_or("")
1774 .trim_end_matches('/');
1775 let prefix =
1776 (!prefix_key.is_empty()).then(|| object_store::path::Path::from(prefix_key));
1777
1778 let mut total: u64 = 0;
1779 let mut stream = store.list(prefix.as_ref());
1780 while let Some(meta) = stream.next().await {
1781 let meta = meta.map_err(|e| {
1782 AppError::Internal(format!(
1783 "dataset '{}': s3 list under '{prefix_key}': {e}",
1784 d.name
1785 ))
1786 })?;
1787 if meta.location.as_ref().ends_with(".parquet") {
1788 total = total.saturating_add(meta.size);
1789 }
1790 }
1791 Ok(total)
1792}
1793
1794fn build_s3(
1795 bucket: &str,
1796 region: &str,
1797 s3: &S3Config,
1798 creds: &ResolvedCreds,
1799) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1800 let mut b = AmazonS3Builder::new()
1801 .with_bucket_name(bucket)
1802 .with_region(region)
1803 .with_allow_http(s3.allow_http)
1804 .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1805 if let Some(ep) = s3.effective_endpoint(bucket) {
1806 b = b.with_endpoint(ep);
1807 }
1808 if let Some(k) = creds.access_key_id.as_deref() {
1809 b = b.with_access_key_id(k);
1810 }
1811 if let Some(s) = creds.secret_access_key.as_deref() {
1812 b = b.with_secret_access_key(s);
1813 }
1814 if let Some(t) = creds.session_token.as_deref() {
1815 b = b.with_token(t);
1816 }
1817 b.build()
1818}
1819
1820fn arrow_to_logical(dt: &DataType) -> LogicalType {
1821 match dt {
1822 DataType::Boolean => LogicalType::Bool,
1823 DataType::Int8
1824 | DataType::Int16
1825 | DataType::Int32
1826 | DataType::Int64
1827 | DataType::UInt8
1828 | DataType::UInt16
1829 | DataType::UInt32
1830 | DataType::UInt64 => LogicalType::Int,
1831 DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1832 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1833 DataType::Dictionary(_, v)
1837 if matches!(
1838 v.as_ref(),
1839 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1840 ) =>
1841 {
1842 LogicalType::Utf8
1843 }
1844 DataType::Date32
1845 | DataType::Date64
1846 | DataType::Time32(_)
1847 | DataType::Time64(_)
1848 | DataType::Timestamp(_, _)
1849 | DataType::Duration(_)
1850 | DataType::Interval(_) => LogicalType::Temporal,
1851 _ => LogicalType::Other,
1852 }
1853}
1854
1855fn project(
1860 schema: &DatasetSchema,
1861 batch: RecordBatch,
1862 columns: &[String],
1863) -> Result<RecordBatch, AppError> {
1864 if columns.is_empty() {
1865 return Ok(batch);
1866 }
1867 let indices: Vec<usize> = columns
1868 .iter()
1869 .map(|c| {
1870 schema
1871 .find(c)
1872 .map(|info| schema.by_name[&info.name.to_lowercase()])
1873 })
1874 .collect::<Result<_, _>>()?;
1875 let fields: Vec<Field> = indices
1876 .iter()
1877 .map(|&i| batch.schema().field(i).clone())
1878 .collect();
1879 let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
1880 Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
1881}
1882
1883#[derive(Default)]
1896struct Params {
1897 values: Vec<ScalarValue>,
1898}
1899
1900impl Params {
1901 fn new() -> Self {
1902 Self::default()
1903 }
1904
1905 fn bind(&mut self, v: ScalarValue) -> String {
1907 self.values.push(v);
1908 format!("${}", self.values.len())
1909 }
1910
1911 fn into_values(self) -> Vec<ScalarValue> {
1912 self.values
1913 }
1914}
1915
1916fn build_query_sql(
1917 schema: &DatasetSchema,
1918 req: &QueryRequest,
1919 max_page_size: u64,
1920) -> Result<(String, Vec<ScalarValue>), AppError> {
1921 let (limit, offset) = req.effective_limit_offset(max_page_size);
1922 build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
1923}
1924
1925fn build_query_stream_sql(
1926 schema: &DatasetSchema,
1927 req: &QueryRequest,
1928) -> Result<(String, Vec<ScalarValue>), AppError> {
1929 let suffix = req
1930 .limit
1931 .map(|limit| format!(" LIMIT {limit}"))
1932 .unwrap_or_default();
1933 build_query_sql_with_suffix(schema, req, &suffix)
1934}
1935
1936fn build_query_sql_with_suffix(
1937 schema: &DatasetSchema,
1938 req: &QueryRequest,
1939 suffix: &str,
1940) -> Result<(String, Vec<ScalarValue>), AppError> {
1941 let agg_plan = req.agg_plan(schema)?;
1942
1943 let cols = if let Some(plan) = &agg_plan {
1944 let mut parts: Vec<String> = plan
1946 .group_cols
1947 .iter()
1948 .map(|c| DatasetSchema::quote_ident(c))
1949 .collect();
1950 for a in &plan.aggs {
1951 let expr = a.sql_expr()?;
1952 parts.push(format!(
1953 "{expr} AS {}",
1954 DatasetSchema::quote_ident(&a.alias)
1955 ));
1956 }
1957 parts.join(", ")
1958 } else if req.columns.is_empty() {
1959 if req.distinct {
1960 "DISTINCT *".to_string()
1961 } else {
1962 "*".to_string()
1963 }
1964 } else {
1965 let list = req
1966 .columns
1967 .iter()
1968 .map(|c| {
1969 schema
1970 .find(c)
1971 .map(|info| DatasetSchema::quote_ident(&info.name))
1972 })
1973 .collect::<Result<Vec<_>, _>>()?
1974 .join(", ");
1975 if req.distinct {
1976 format!("DISTINCT {list}")
1977 } else {
1978 list
1979 }
1980 };
1981
1982 let mut params = Params::new();
1983 let clauses: Vec<String> = req
1984 .predicates
1985 .iter()
1986 .map(|p| pred_to_sql(schema, p, &mut params))
1987 .collect::<Result<_, _>>()?;
1988
1989 let table = DatasetSchema::quote_ident(&schema.name);
1990 let where_clause = if clauses.is_empty() {
1991 String::new()
1992 } else {
1993 format!(" WHERE {}", clauses.join(" AND "))
1994 };
1995 let group_clause = match &agg_plan {
1996 Some(p) => format!(
1997 " GROUP BY {}",
1998 p.group_cols
1999 .iter()
2000 .map(|c| DatasetSchema::quote_ident(c))
2001 .collect::<Vec<_>>()
2002 .join(", "),
2003 ),
2004 None => String::new(),
2005 };
2006 let having_clause = {
2007 let resolved = req.having_plan(agg_plan.as_ref())?;
2008 if resolved.is_empty() {
2009 String::new()
2010 } else {
2011 let clauses: Vec<String> = resolved
2012 .iter()
2013 .map(|(lhs, p)| pred_to_sql_with_lhs(lhs, p, &mut params))
2014 .collect::<Result<_, _>>()?;
2015 format!(" HAVING {}", clauses.join(" AND "))
2016 }
2017 };
2018 let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
2019 Some(s) => format!(" ORDER BY {s}"),
2020 None => String::new(),
2021 };
2022 let sql =
2023 format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{having_clause}{order_clause}{suffix}");
2024 Ok((sql, params.into_values()))
2025}
2026
2027fn build_count_sql(
2028 schema: &DatasetSchema,
2029 predicates: &[Predicate],
2030) -> Result<(String, Vec<ScalarValue>), AppError> {
2031 let mut params = Params::new();
2032 let clauses: Vec<String> = predicates
2033 .iter()
2034 .map(|p| pred_to_sql(schema, p, &mut params))
2035 .collect::<Result<_, _>>()?;
2036 let table = DatasetSchema::quote_ident(&schema.name);
2037 let where_clause = if clauses.is_empty() {
2038 String::new()
2039 } else {
2040 format!(" WHERE {}", clauses.join(" AND "))
2041 };
2042 let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
2043 Ok((sql, params.into_values()))
2044}
2045
2046fn pred_to_sql(
2047 schema: &DatasetSchema,
2048 pred: &Predicate,
2049 params: &mut Params,
2050) -> Result<String, AppError> {
2051 let info = schema.find(&pred.col)?;
2052 let col = DatasetSchema::quote_ident(&info.name);
2053 pred_to_sql_with_lhs(&col, pred, params)
2054}
2055
2056fn pred_to_sql_with_lhs(
2062 col: &str,
2063 pred: &Predicate,
2064 params: &mut Params,
2065) -> Result<String, AppError> {
2066 match pred.op.as_str() {
2067 "is_null" => return Ok(format!("{col} IS NULL")),
2068 "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
2069 _ => {}
2070 }
2071
2072 let val = pred
2073 .val
2074 .as_ref()
2075 .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
2076
2077 if pred.op == "in" {
2078 let items = val
2079 .as_array()
2080 .filter(|a| !a.is_empty())
2081 .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
2082 let placeholders: Vec<String> = items
2083 .iter()
2084 .map(|item| Ok(params.bind(json_to_scalar(item)?)))
2085 .collect::<Result<_, AppError>>()?;
2086 return Ok(format!("{col} IN ({})", placeholders.join(", ")));
2087 }
2088
2089 let sql_op = match pred.op.as_str() {
2090 "eq" => "=",
2091 "neq" => "!=",
2092 "gt" => ">",
2093 "gte" => ">=",
2094 "lt" => "<",
2095 "lte" => "<=",
2096 "like" => "LIKE",
2097 "ilike" => "ILIKE",
2098 other => return Err(AppError::UnknownOperator(other.into())),
2099 };
2100 let placeholder = params.bind(json_to_scalar(val)?);
2101 Ok(format!("{col} {sql_op} {placeholder}"))
2102}
2103
2104fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
2108 match val {
2109 JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
2110 JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
2111 JsonValue::Null => Ok(ScalarValue::Null),
2112 JsonValue::Number(n) => {
2113 if let Some(i) = n.as_i64() {
2114 Ok(ScalarValue::Int64(Some(i)))
2115 } else if let Some(u) = n.as_u64() {
2116 Ok(ScalarValue::UInt64(Some(u)))
2117 } else if let Some(f) = n.as_f64() {
2118 Ok(ScalarValue::Float64(Some(f)))
2119 } else {
2120 Err(AppError::InvalidValue(
2121 "unsupported numeric literal in predicate".into(),
2122 ))
2123 }
2124 }
2125 _ => Err(AppError::InvalidValue(
2126 "unsupported literal type in predicate".into(),
2127 )),
2128 }
2129}
2130
2131fn json_index_key(val: &JsonValue) -> Option<String> {
2136 match val {
2137 JsonValue::String(s) => Some(s.clone()),
2138 JsonValue::Number(n) => Some(n.to_string()),
2139 JsonValue::Bool(b) => Some(b.to_string()),
2140 _ => None,
2141 }
2142}
2143
2144fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2145 let mut out = Vec::new();
2146 let (mut i, mut j) = (0, 0);
2147 while i < a.len() && j < b.len() {
2148 match a[i].cmp(&b[j]) {
2149 Ordering::Equal => {
2150 out.push(a[i]);
2151 i += 1;
2152 j += 1;
2153 }
2154 Ordering::Less => i += 1,
2155 Ordering::Greater => j += 1,
2156 }
2157 }
2158 out
2159}
2160
2161fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2162 let mut out = Vec::with_capacity(a.len() + b.len());
2163 let (mut i, mut j) = (0, 0);
2164 while i < a.len() && j < b.len() {
2165 match a[i].cmp(&b[j]) {
2166 Ordering::Less => {
2167 out.push(a[i]);
2168 i += 1;
2169 }
2170 Ordering::Greater => {
2171 out.push(b[j]);
2172 j += 1;
2173 }
2174 Ordering::Equal => {
2175 out.push(a[i]);
2176 i += 1;
2177 j += 1;
2178 }
2179 }
2180 }
2181 out.extend_from_slice(&a[i..]);
2182 out.extend_from_slice(&b[j..]);
2183 out
2184}
2185
2186fn try_index<'a>(index: &'a EqIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2187 if predicates.is_empty() || index.is_empty() {
2188 return None;
2189 }
2190
2191 if let [pred] = predicates
2194 && pred.op.as_str() == "eq"
2195 {
2196 let col_lower = pred.col.to_lowercase();
2197 let col_map = index.get(&col_lower)?;
2198 let key = json_index_key(pred.val.as_ref()?)?;
2199 return Some(match col_map.get(&key) {
2200 Some(rows) => Cow::Borrowed(rows.as_slice()),
2201 None => Cow::Owned(Vec::new()),
2202 });
2203 }
2204
2205 let mut result: Option<Vec<u32>> = None;
2206 for pred in predicates {
2207 let col_lower = pred.col.to_lowercase();
2208 let col_map = index.get(&col_lower)?;
2209
2210 let rows: Vec<u32> = match pred.op.as_str() {
2211 "eq" => {
2212 let key = json_index_key(pred.val.as_ref()?)?;
2213 col_map.get(&key).cloned().unwrap_or_default()
2214 }
2215 "in" => {
2216 let items = pred.val.as_ref()?.as_array()?;
2217 let mut merged: Vec<u32> = Vec::new();
2218 for item in items {
2219 if let Some(r) = col_map.get(&json_index_key(item)?) {
2220 merged = union_sorted(&merged, r);
2221 }
2222 }
2223 merged
2224 }
2225 _ => return None,
2226 };
2227
2228 result = Some(match result {
2229 None => rows,
2230 Some(r) => intersect_sorted(&r, &rows),
2231 });
2232 }
2233 result.map(Cow::Owned)
2234}
2235
2236#[doc(hidden)]
2240pub mod bench {
2241 use super::{EqIndex, FastMap, json_index_key, try_index};
2242 use datapress_core::models::Predicate;
2243 use serde_json::Value as JsonValue;
2244 use std::borrow::Cow;
2245
2246 pub struct BenchIndex(EqIndex);
2248
2249 pub fn single_bucket_index(col: &str, val: &JsonValue, rows: Vec<u32>) -> BenchIndex {
2253 let key = json_index_key(val).expect("benchable index key");
2254 let mut col_map: FastMap<String, Vec<u32>> = FastMap::default();
2255 col_map.insert(key, rows);
2256 let mut index: EqIndex = EqIndex::default();
2257 index.insert(col.to_string(), col_map);
2258 BenchIndex(index)
2259 }
2260
2261 pub fn lookup<'a>(idx: &'a BenchIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2263 try_index(&idx.0, predicates)
2264 }
2265
2266 pub fn lookup_cloning(idx: &BenchIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
2271 let [pred] = predicates else { return None };
2272 if pred.op.as_str() != "eq" {
2273 return None;
2274 }
2275 let col_lower = pred.col.to_lowercase();
2276 let col_map = idx.0.get(&col_lower)?;
2277 let key = json_index_key(pred.val.as_ref()?)?;
2278 Some(col_map.get(&key).cloned().unwrap_or_default())
2279 }
2280}
2281
2282fn slice_global(
2285 chunks: &[RecordBatch],
2286 schema: &Arc<Schema>,
2287 offset: usize,
2288 limit: usize,
2289) -> Result<RecordBatch, AppError> {
2290 if limit == 0 || chunks.is_empty() {
2291 return Ok(RecordBatch::new_empty(schema.clone()));
2292 }
2293 let mut out = Vec::new();
2294 let mut to_skip = offset;
2295 let mut remaining = limit;
2296 for b in chunks {
2297 if remaining == 0 {
2298 break;
2299 }
2300 let n = b.num_rows();
2301 if to_skip >= n {
2302 to_skip -= n;
2303 continue;
2304 }
2305 let take = remaining.min(n - to_skip);
2306 out.push(b.slice(to_skip, take));
2307 to_skip = 0;
2308 remaining -= take;
2309 }
2310 if out.is_empty() {
2311 return Ok(RecordBatch::new_empty(schema.clone()));
2312 }
2313 compute::concat_batches(schema, out.iter()).map_err(AppError::from)
2314}
2315
2316fn take_page(
2321 chunks: &[RecordBatch],
2322 schema: &Arc<Schema>,
2323 rows: &[u32],
2324 offset: usize,
2325 limit: usize,
2326) -> Result<RecordBatch, AppError> {
2327 let start = offset.min(rows.len());
2328 let len = limit.min(rows.len() - start);
2329 if len == 0 || chunks.is_empty() {
2330 return Ok(RecordBatch::new_empty(schema.clone()));
2331 }
2332
2333 let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
2336 let mut acc: u32 = 0;
2337 offsets.push(0);
2338 for b in chunks {
2339 acc = acc
2340 .checked_add(b.num_rows() as u32)
2341 .expect("row count exceeds u32::MAX");
2342 offsets.push(acc);
2343 }
2344
2345 let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
2348 for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
2349 let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
2350 let local = gid - offsets[bi];
2351 buckets[bi].push((out_pos as u32, local));
2352 }
2353
2354 let mut takens: Vec<RecordBatch> = Vec::new();
2356 let mut dest: Vec<u32> = Vec::with_capacity(len);
2357 for (bi, bucket) in buckets.iter().enumerate() {
2358 if bucket.is_empty() {
2359 continue;
2360 }
2361 let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
2362 let cols: Vec<ArrayRef> = chunks[bi]
2363 .columns()
2364 .iter()
2365 .map(|c| {
2366 arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
2367 .map_err(AppError::from)
2368 })
2369 .collect::<Result<_, _>>()?;
2370 takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
2371 dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
2372 }
2373
2374 let stitched = compute::concat_batches(schema, takens.iter())?;
2376 let mut inv = vec![0u32; len];
2377 for (i, &d) in dest.iter().enumerate() {
2378 inv[d as usize] = i as u32;
2379 }
2380 let perm = UInt32Array::from(inv);
2381 let cols: Vec<ArrayRef> = stitched
2382 .columns()
2383 .iter()
2384 .map(|c| {
2385 arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
2386 .map_err(AppError::from)
2387 })
2388 .collect::<Result<_, _>>()?;
2389 RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
2390}
2391
2392fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
2396 use rayon::prelude::*;
2397
2398 if cfg.mode == IndexMode::None || chunks.is_empty() {
2399 return EqIndex::default();
2400 }
2401
2402 let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
2403 Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
2404 } else {
2405 None
2406 };
2407
2408 let max_card = if cfg.mode == IndexMode::Auto {
2409 Some(cfg.max_cardinality)
2410 } else {
2411 None
2412 };
2413
2414 let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
2416 let mut acc: u32 = 0;
2417 for b in chunks {
2418 batch_offsets.push(acc);
2419 acc = acc
2420 .checked_add(b.num_rows() as u32)
2421 .expect("row count exceeds u32::MAX");
2422 }
2423
2424 let schema = chunks[0].schema();
2425
2426 schema
2427 .fields()
2428 .par_iter()
2429 .enumerate()
2430 .filter_map(|(ci, field)| {
2431 let col_lower = field.name().to_lowercase();
2432 if let Some(a) = &allow
2433 && !a.contains_key(&col_lower)
2434 {
2435 return None;
2436 }
2437
2438 let dtype = field.data_type();
2441 let dict_utf8 = matches!(dtype,
2442 DataType::Dictionary(k, v)
2443 if matches!(k.as_ref(), DataType::Int32)
2444 && matches!(v.as_ref(), DataType::Utf8));
2445 match dtype {
2446 DataType::Utf8
2447 | DataType::Utf8View
2448 | DataType::Boolean
2449 | DataType::Int8
2450 | DataType::Int16
2451 | DataType::Int32
2452 | DataType::Int64 => {}
2453 _ if dict_utf8 => {}
2454 _ => return None,
2455 }
2456
2457 let mut map: FastMap<String, Vec<u32>> = FastMap::default();
2458
2459 for (bi, batch) in chunks.iter().enumerate() {
2460 let base = batch_offsets[bi];
2461 let col = batch.column(ci);
2462
2463 macro_rules! index_col {
2464 ($arr_ty:ty) => {{
2465 let arr = col.as_any().downcast_ref::<$arr_ty>()?;
2466 for row in 0..arr.len() {
2467 if arr.is_null(row) {
2468 continue;
2469 }
2470 let key = arr.value(row).to_string();
2471 let gid = base + row as u32;
2472 if let Some(v) = map.get_mut(&key) {
2473 v.push(gid);
2474 } else {
2475 if let Some(mc) = max_card {
2476 if map.len() >= mc {
2477 return None;
2478 }
2479 }
2480 map.insert(key, vec![gid]);
2481 }
2482 }
2483 }};
2484 }
2485
2486 if dict_utf8 {
2487 let arr = col
2494 .as_any()
2495 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
2496 )?;
2497 let keys = arr.keys();
2498 let values = arr.values().as_any().downcast_ref::<StringArray>()?;
2499 for row in 0..arr.len() {
2500 if arr.is_null(row) {
2501 continue;
2502 }
2503 let k = keys.value(row) as usize;
2504 let s = values.value(k);
2505 let gid = base + row as u32;
2506 if let Some(v) = map.get_mut(s) {
2507 v.push(gid);
2508 } else {
2509 if let Some(mc) = max_card
2510 && map.len() >= mc
2511 {
2512 return None;
2513 }
2514 map.insert(s.to_string(), vec![gid]);
2515 }
2516 }
2517 } else {
2518 match dtype {
2519 DataType::Utf8 => index_col!(StringArray),
2520 DataType::Utf8View => index_col!(StringViewArray),
2521 DataType::Boolean => index_col!(BooleanArray),
2522 DataType::Int8 => index_col!(Int8Array),
2523 DataType::Int16 => index_col!(Int16Array),
2524 DataType::Int32 => index_col!(Int32Array),
2525 DataType::Int64 => index_col!(Int64Array),
2526 _ => unreachable!(),
2527 }
2528 }
2529 }
2530
2531 Some((col_lower, map))
2532 })
2533 .collect()
2534}
2535
2536fn writable_inline(dt: &DataType) -> bool {
2549 match dt {
2550 DataType::Utf8
2551 | DataType::LargeUtf8
2552 | DataType::Utf8View
2553 | DataType::Boolean
2554 | DataType::Int8
2555 | DataType::Int16
2556 | DataType::Int32
2557 | DataType::Int64
2558 | DataType::UInt8
2559 | DataType::UInt16
2560 | DataType::UInt32
2561 | DataType::UInt64
2562 | DataType::Float32
2563 | DataType::Float64
2564 | DataType::Decimal128(_, _)
2565 | DataType::Decimal256(_, _) => true,
2566 DataType::Dictionary(k, v)
2567 if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
2568 {
2569 true
2570 }
2571 _ => false,
2572 }
2573}
2574
2575fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
2581 let schema = batch.schema();
2582 let to_cast: Vec<usize> = schema
2583 .fields()
2584 .iter()
2585 .enumerate()
2586 .filter_map(|(i, f)| {
2587 if writable_inline(f.data_type()) {
2588 None
2589 } else {
2590 Some(i)
2591 }
2592 })
2593 .collect();
2594 if to_cast.is_empty() {
2595 return Ok(batch.clone());
2596 }
2597 let new_fields: Vec<Field> = schema
2598 .fields()
2599 .iter()
2600 .enumerate()
2601 .map(|(i, f)| {
2602 if to_cast.contains(&i) {
2603 Field::new(f.name(), DataType::Utf8, f.is_nullable())
2604 } else {
2605 f.as_ref().clone()
2606 }
2607 })
2608 .collect();
2609 let new_schema = Arc::new(Schema::new(new_fields));
2610 let cols: Vec<ArrayRef> = batch
2611 .columns()
2612 .iter()
2613 .enumerate()
2614 .map(|(i, c)| {
2615 if to_cast.contains(&i) {
2616 compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2617 } else {
2618 Ok(c.clone())
2619 }
2620 })
2621 .collect::<Result<_, _>>()?;
2622 RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2623}
2624
2625#[allow(dead_code)]
2631#[derive(Clone, Copy)]
2632enum CmpOp {
2633 Eq,
2634 Neq,
2635 Gt,
2636 Gte,
2637 Lt,
2638 Lte,
2639 Like,
2640 ILike,
2641}
2642
2643#[allow(dead_code)]
2644fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2645 let arr = col
2646 .as_any()
2647 .downcast_ref::<StringArray>()
2648 .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2649 let s = Scalar::new(StringArray::from(vec![val]));
2650 Ok(eq(arr, &s)?)
2651}
2652
2653#[allow(dead_code)]
2654fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2655 macro_rules! num_cmp {
2656 ($arr_type:ty, $cast:ty) => {{
2657 let n = val
2658 .as_f64()
2659 .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2660 as $cast;
2661 let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2662 let s = Scalar::new(<$arr_type>::from(vec![n]));
2663 Ok(match op {
2664 CmpOp::Eq => eq(arr, &s)?,
2665 CmpOp::Neq => neq(arr, &s)?,
2666 CmpOp::Gt => gt(arr, &s)?,
2667 CmpOp::Gte => gt_eq(arr, &s)?,
2668 CmpOp::Lt => lt(arr, &s)?,
2669 CmpOp::Lte => lt_eq(arr, &s)?,
2670 CmpOp::Like | CmpOp::ILike => {
2671 return Err(AppError::InvalidValue(
2672 "LIKE requires a string column".into(),
2673 ));
2674 }
2675 })
2676 }};
2677 }
2678 match col.data_type() {
2679 DataType::Utf8 => {
2680 let s = val
2681 .as_str()
2682 .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
2683 let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
2684 let sc = Scalar::new(StringArray::from(vec![s]));
2685 Ok(match op {
2686 CmpOp::Eq => eq(arr, &sc)?,
2687 CmpOp::Neq => neq(arr, &sc)?,
2688 CmpOp::Gt => gt(arr, &sc)?,
2689 CmpOp::Gte => gt_eq(arr, &sc)?,
2690 CmpOp::Lt => lt(arr, &sc)?,
2691 CmpOp::Lte => lt_eq(arr, &sc)?,
2692 CmpOp::Like => compute::like(arr, &sc)?,
2693 CmpOp::ILike => compute::ilike(arr, &sc)?,
2694 })
2695 }
2696 DataType::Int8 => num_cmp!(Int8Array, i8),
2697 DataType::Int16 => num_cmp!(Int16Array, i16),
2698 DataType::Int32 => num_cmp!(Int32Array, i32),
2699 DataType::Int64 => num_cmp!(Int64Array, i64),
2700 DataType::Float32 => num_cmp!(Float32Array, f32),
2701 DataType::Float64 => num_cmp!(Float64Array, f64),
2702 dt => Err(AppError::InvalidValue(format!(
2703 "unsupported type for comparison: {dt:?}"
2704 ))),
2705 }
2706}
2707
2708pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
2713 let batch = cast_for_serialize(batch)?;
2718 let schema = batch.schema();
2719 let n_rows = batch.num_rows();
2720
2721 let keys: Vec<Vec<u8>> = schema
2722 .fields()
2723 .iter()
2724 .map(|f| {
2725 let mut k = Vec::with_capacity(f.name().len() + 3);
2726 k.push(b'"');
2727 k.extend_from_slice(f.name().as_bytes());
2728 k.extend_from_slice(b"\":");
2729 k
2730 })
2731 .collect();
2732
2733 let encoders: Vec<ColEnc> = batch
2738 .columns()
2739 .iter()
2740 .map(|c| ColEnc::new(c.as_ref()))
2741 .collect();
2742
2743 let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
2744 let mut itoa_buf = itoa::Buffer::new();
2745 let mut ryu_buf = ryu::Buffer::new();
2746 buf.push(b'[');
2747
2748 for row in 0..n_rows {
2749 if row > 0 {
2750 buf.push(b',');
2751 }
2752 buf.push(b'{');
2753 for (i, (key, enc)) in keys.iter().zip(encoders.iter()).enumerate() {
2754 if i > 0 {
2755 buf.push(b',');
2756 }
2757 buf.extend_from_slice(key);
2758 enc.write(&mut buf, row, &mut itoa_buf, &mut ryu_buf);
2759 }
2760 buf.push(b'}');
2761 }
2762
2763 buf.push(b']');
2764 Ok(unsafe { String::from_utf8_unchecked(buf) })
2765}
2766
2767enum ColEnc<'a> {
2772 Utf8(&'a StringArray),
2773 LargeUtf8(&'a LargeStringArray),
2774 Utf8View(&'a StringViewArray),
2775 DictI32Utf8(
2778 &'a arrow::array::DictionaryArray<arrow::datatypes::Int32Type>,
2779 &'a StringArray,
2780 ),
2781 Bool(&'a BooleanArray),
2782 I8(&'a Int8Array),
2783 I16(&'a Int16Array),
2784 I32(&'a Int32Array),
2785 I64(&'a Int64Array),
2786 U8(&'a UInt8Array),
2787 U16(&'a UInt16Array),
2788 U32(&'a UInt32Array),
2789 U64(&'a UInt64Array),
2790 Dec128(&'a Decimal128Array),
2791 Dec256(&'a Decimal256Array),
2792 F32(&'a Float32Array),
2793 F64(&'a Float64Array),
2794 Other(&'a dyn Array),
2796}
2797
2798impl<'a> ColEnc<'a> {
2799 fn new(col: &'a dyn Array) -> ColEnc<'a> {
2800 macro_rules! dc {
2801 ($t:ty) => {
2802 col.as_any().downcast_ref::<$t>().unwrap()
2803 };
2804 }
2805 match col.data_type() {
2806 DataType::Utf8 => ColEnc::Utf8(dc!(StringArray)),
2807 DataType::LargeUtf8 => ColEnc::LargeUtf8(dc!(LargeStringArray)),
2808 DataType::Utf8View => ColEnc::Utf8View(dc!(StringViewArray)),
2809 DataType::Dictionary(key, value)
2810 if matches!(key.as_ref(), DataType::Int32)
2811 && matches!(value.as_ref(), DataType::Utf8) =>
2812 {
2813 let dict = dc!(arrow::array::DictionaryArray<arrow::datatypes::Int32Type>);
2814 let values = dict
2815 .values()
2816 .as_any()
2817 .downcast_ref::<StringArray>()
2818 .unwrap();
2819 ColEnc::DictI32Utf8(dict, values)
2820 }
2821 DataType::Boolean => ColEnc::Bool(dc!(BooleanArray)),
2822 DataType::Int8 => ColEnc::I8(dc!(Int8Array)),
2823 DataType::Int16 => ColEnc::I16(dc!(Int16Array)),
2824 DataType::Int32 => ColEnc::I32(dc!(Int32Array)),
2825 DataType::Int64 => ColEnc::I64(dc!(Int64Array)),
2826 DataType::UInt8 => ColEnc::U8(dc!(UInt8Array)),
2827 DataType::UInt16 => ColEnc::U16(dc!(UInt16Array)),
2828 DataType::UInt32 => ColEnc::U32(dc!(UInt32Array)),
2829 DataType::UInt64 => ColEnc::U64(dc!(UInt64Array)),
2830 DataType::Decimal128(_, _) => ColEnc::Dec128(dc!(Decimal128Array)),
2831 DataType::Decimal256(_, _) => ColEnc::Dec256(dc!(Decimal256Array)),
2832 DataType::Float32 => ColEnc::F32(dc!(Float32Array)),
2833 DataType::Float64 => ColEnc::F64(dc!(Float64Array)),
2834 _ => ColEnc::Other(col),
2835 }
2836 }
2837
2838 #[inline]
2839 fn write(
2840 &self,
2841 buf: &mut Vec<u8>,
2842 row: usize,
2843 itoa_buf: &mut itoa::Buffer,
2844 ryu_buf: &mut ryu::Buffer,
2845 ) {
2846 macro_rules! int {
2847 ($arr:expr) => {{
2848 if $arr.is_null(row) {
2849 buf.extend_from_slice(b"null");
2850 } else {
2851 buf.extend_from_slice(itoa_buf.format($arr.value(row)).as_bytes());
2852 }
2853 }};
2854 }
2855 match self {
2856 ColEnc::Utf8(a) => {
2857 if a.is_null(row) {
2858 buf.extend_from_slice(b"null");
2859 } else {
2860 write_str(buf, a.value(row));
2861 }
2862 }
2863 ColEnc::LargeUtf8(a) => {
2864 if a.is_null(row) {
2865 buf.extend_from_slice(b"null");
2866 } else {
2867 write_str(buf, a.value(row));
2868 }
2869 }
2870 ColEnc::Utf8View(a) => {
2871 if a.is_null(row) {
2872 buf.extend_from_slice(b"null");
2873 } else {
2874 write_str(buf, a.value(row));
2875 }
2876 }
2877 ColEnc::DictI32Utf8(keys, values) => {
2878 if keys.is_null(row) {
2879 buf.extend_from_slice(b"null");
2880 } else {
2881 let k = keys.keys().value(row) as usize;
2882 write_str(buf, values.value(k));
2883 }
2884 }
2885 ColEnc::Bool(a) => {
2886 if a.is_null(row) {
2887 buf.extend_from_slice(b"null");
2888 } else {
2889 buf.extend_from_slice(if a.value(row) { b"true" } else { b"false" });
2890 }
2891 }
2892 ColEnc::I8(a) => int!(a),
2893 ColEnc::I16(a) => int!(a),
2894 ColEnc::I32(a) => int!(a),
2895 ColEnc::I64(a) => int!(a),
2896 ColEnc::U8(a) => int!(a),
2897 ColEnc::U16(a) => int!(a),
2898 ColEnc::U32(a) => int!(a),
2899 ColEnc::U64(a) => int!(a),
2900 ColEnc::Dec128(a) => {
2901 if a.is_null(row) {
2902 buf.extend_from_slice(b"null");
2903 } else {
2904 write_str(buf, &a.value_as_string(row));
2905 }
2906 }
2907 ColEnc::Dec256(a) => {
2908 if a.is_null(row) {
2909 buf.extend_from_slice(b"null");
2910 } else {
2911 write_str(buf, &a.value_as_string(row));
2912 }
2913 }
2914 ColEnc::F32(a) => {
2915 if a.is_null(row) {
2916 buf.extend_from_slice(b"null");
2917 } else {
2918 let v = a.value(row);
2919 if v.is_finite() {
2920 buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2921 } else {
2922 buf.extend_from_slice(b"null");
2923 }
2924 }
2925 }
2926 ColEnc::F64(a) => {
2927 if a.is_null(row) {
2928 buf.extend_from_slice(b"null");
2929 } else {
2930 let v = a.value(row);
2931 if v.is_finite() {
2932 buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2933 } else {
2934 buf.extend_from_slice(b"null");
2935 }
2936 }
2937 }
2938 ColEnc::Other(col) => {
2939 if col.is_null(row) {
2940 buf.extend_from_slice(b"null");
2941 } else {
2942 write_value(buf, *col, row);
2943 }
2944 }
2945 }
2946 }
2947}
2948
2949#[inline]
2950fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
2951 match col.data_type() {
2952 DataType::Utf8 => write_str(
2953 buf,
2954 col.as_any()
2955 .downcast_ref::<StringArray>()
2956 .unwrap()
2957 .value(row),
2958 ),
2959 DataType::LargeUtf8 => write_str(
2960 buf,
2961 col.as_any()
2962 .downcast_ref::<LargeStringArray>()
2963 .unwrap()
2964 .value(row),
2965 ),
2966 DataType::Utf8View => write_str(
2967 buf,
2968 col.as_any()
2969 .downcast_ref::<StringViewArray>()
2970 .unwrap()
2971 .value(row),
2972 ),
2973 DataType::Dictionary(key, value)
2974 if matches!(key.as_ref(), DataType::Int32)
2975 && matches!(value.as_ref(), DataType::Utf8) =>
2976 {
2977 let dict = col
2978 .as_any()
2979 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
2980 .unwrap();
2981 let keys = dict.keys();
2982 let values = dict
2983 .values()
2984 .as_any()
2985 .downcast_ref::<StringArray>()
2986 .unwrap();
2987 let k = keys.value(row) as usize;
2988 write_str(buf, values.value(k));
2989 }
2990 DataType::Boolean => {
2991 let v = col
2992 .as_any()
2993 .downcast_ref::<BooleanArray>()
2994 .unwrap()
2995 .value(row);
2996 buf.extend_from_slice(if v { b"true" } else { b"false" });
2997 }
2998 DataType::Int8 => {
2999 let mut b = itoa::Buffer::new();
3000 buf.extend_from_slice(
3001 b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
3002 .as_bytes(),
3003 );
3004 }
3005 DataType::Int16 => {
3006 let mut b = itoa::Buffer::new();
3007 buf.extend_from_slice(
3008 b.format(
3009 col.as_any()
3010 .downcast_ref::<Int16Array>()
3011 .unwrap()
3012 .value(row),
3013 )
3014 .as_bytes(),
3015 );
3016 }
3017 DataType::Int32 => {
3018 let mut b = itoa::Buffer::new();
3019 buf.extend_from_slice(
3020 b.format(
3021 col.as_any()
3022 .downcast_ref::<Int32Array>()
3023 .unwrap()
3024 .value(row),
3025 )
3026 .as_bytes(),
3027 );
3028 }
3029 DataType::Int64 => {
3030 let mut b = itoa::Buffer::new();
3031 buf.extend_from_slice(
3032 b.format(
3033 col.as_any()
3034 .downcast_ref::<Int64Array>()
3035 .unwrap()
3036 .value(row),
3037 )
3038 .as_bytes(),
3039 );
3040 }
3041 DataType::UInt8 => {
3042 let mut b = itoa::Buffer::new();
3043 buf.extend_from_slice(
3044 b.format(
3045 col.as_any()
3046 .downcast_ref::<UInt8Array>()
3047 .unwrap()
3048 .value(row),
3049 )
3050 .as_bytes(),
3051 );
3052 }
3053 DataType::UInt16 => {
3054 let mut b = itoa::Buffer::new();
3055 buf.extend_from_slice(
3056 b.format(
3057 col.as_any()
3058 .downcast_ref::<UInt16Array>()
3059 .unwrap()
3060 .value(row),
3061 )
3062 .as_bytes(),
3063 );
3064 }
3065 DataType::UInt32 => {
3066 let mut b = itoa::Buffer::new();
3067 buf.extend_from_slice(
3068 b.format(
3069 col.as_any()
3070 .downcast_ref::<UInt32Array>()
3071 .unwrap()
3072 .value(row),
3073 )
3074 .as_bytes(),
3075 );
3076 }
3077 DataType::UInt64 => {
3078 let mut b = itoa::Buffer::new();
3079 buf.extend_from_slice(
3080 b.format(
3081 col.as_any()
3082 .downcast_ref::<UInt64Array>()
3083 .unwrap()
3084 .value(row),
3085 )
3086 .as_bytes(),
3087 );
3088 }
3089 DataType::Decimal128(_, _) => {
3090 let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
3091 write_str(buf, &arr.value_as_string(row));
3092 }
3093 DataType::Decimal256(_, _) => {
3094 let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
3095 write_str(buf, &arr.value_as_string(row));
3096 }
3097 DataType::Float32 => {
3098 let v = col
3099 .as_any()
3100 .downcast_ref::<Float32Array>()
3101 .unwrap()
3102 .value(row);
3103 if v.is_finite() {
3104 let mut b = ryu::Buffer::new();
3105 buf.extend_from_slice(b.format_finite(v).as_bytes());
3106 } else {
3107 buf.extend_from_slice(b"null");
3108 }
3109 }
3110 DataType::Float64 => {
3111 let v = col
3112 .as_any()
3113 .downcast_ref::<Float64Array>()
3114 .unwrap()
3115 .value(row);
3116 if v.is_finite() {
3117 let mut b = ryu::Buffer::new();
3118 buf.extend_from_slice(b.format_finite(v).as_bytes());
3119 } else {
3120 buf.extend_from_slice(b"null");
3121 }
3122 }
3123 other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
3128 }
3129}
3130
3131#[inline]
3132fn write_str(buf: &mut Vec<u8>, s: &str) {
3133 buf.push(b'"');
3134 for &byte in s.as_bytes() {
3135 match byte {
3136 b'"' => buf.extend_from_slice(b"\\\""),
3137 b'\\' => buf.extend_from_slice(b"\\\\"),
3138 b'\n' => buf.extend_from_slice(b"\\n"),
3139 b'\r' => buf.extend_from_slice(b"\\r"),
3140 b'\t' => buf.extend_from_slice(b"\\t"),
3141 0x00..=0x1f => {
3142 buf.extend_from_slice(b"\\u00");
3143 const HEX: &[u8] = b"0123456789abcdef";
3144 buf.push(HEX[(byte >> 4) as usize]);
3145 buf.push(HEX[(byte & 0xf) as usize]);
3146 }
3147 b => buf.push(b),
3148 }
3149 }
3150 buf.push(b'"');
3151}
3152
3153#[async_trait]
3158impl Backend for Store {
3159 fn names(&self) -> Vec<String> {
3160 Store::names(self)
3161 }
3162
3163 fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
3164 let st = self.dataset(name)?;
3165 Ok(DatasetSummary {
3166 name: st.schema.name.clone(),
3167 columns: st.schema.columns.len(),
3168 rows: st.num_rows(),
3169 })
3170 }
3171
3172 fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
3173 let st = self.dataset(name)?;
3174 Ok(Arc::new(st.schema.clone()))
3175 }
3176
3177 fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
3178 let st = self.dataset(name)?;
3179 let mut cols: Vec<String> = st
3182 .schema
3183 .columns
3184 .iter()
3185 .map(|c| c.name.clone())
3186 .filter(|n| st.index.contains_key(n))
3187 .collect();
3188 let mut extras: Vec<String> = st
3191 .index
3192 .keys()
3193 .filter(|n| !cols.iter().any(|c| c == *n))
3194 .cloned()
3195 .collect();
3196 extras.sort();
3197 cols.extend(extras);
3198 Ok(cols)
3199 }
3200
3201 async fn sample(&self, name: &str) -> Result<String, AppError> {
3202 Store::sample(self, name).await
3203 }
3204
3205 async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
3206 Store::query(self, name, req).await
3207 }
3208
3209 async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
3210 Store::query_arrow(self, name, req).await
3211 }
3212
3213 async fn query_arrow_stream(
3214 &self,
3215 name: &str,
3216 req: &QueryRequest,
3217 ) -> Result<ArrowIpcStream, AppError> {
3218 Store::query_arrow_stream(self, name, req).await
3219 }
3220
3221 async fn query_arrow_stream_all(
3222 &self,
3223 name: &str,
3224 req: &QueryRequest,
3225 ) -> Result<ArrowIpcStream, AppError> {
3226 Store::query_arrow_stream_all(self, name, req).await
3227 }
3228
3229 async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
3230 Store::count(self, name, req).await
3231 }
3232
3233 async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
3234 Store::query_sql(self, sql, max_rows).await
3235 }
3236
3237 async fn query_sql_arrow_stream(
3238 &self,
3239 sql: &str,
3240 max_rows: u64,
3241 ) -> Result<ArrowIpcStream, AppError> {
3242 Store::query_sql_arrow_stream(self, sql, max_rows).await
3243 }
3244
3245 async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
3246 Store::parquet(self, name).await
3247 }
3248
3249 async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
3250 Store::reload(self, name).await
3251 }
3252}
3253
3254#[cfg(test)]
3255mod tests {
3256 use super::is_s3_access_denied;
3257
3258 #[test]
3259 fn detects_s3_access_denied_variants() {
3260 for msg in [
3262 "Generic S3 error: Error performing get request: response error \"<Error><Code>AccessDenied</Code></Error>\", status: 403",
3263 "Client error with status 403 Forbidden",
3264 "S3 error: Access Denied",
3265 "request failed: 403 Forbidden",
3266 ] {
3267 assert!(is_s3_access_denied(msg), "should flag: {msg}");
3268 }
3269 }
3270
3271 #[test]
3272 fn ignores_unrelated_errors() {
3273 for msg in [
3274 "Not a Delta table: Generic delta kernel error: No files in log segment",
3275 "object at location data/part.parquet not found",
3276 "failed to infer parquet schema: invalid magic bytes",
3277 ] {
3278 assert!(!is_s3_access_denied(msg), "should not flag: {msg}");
3279 }
3280 }
3281}
3282