1use std::any::Any;
2use std::borrow::Cow;
3use std::cmp::Ordering;
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex, RwLock};
6use std::time::Duration;
7
8use arc_swap::ArcSwap;
9use arrow::array::{
10 Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
11 Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
12 StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
13};
14use arrow::compute;
15use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
16use arrow::datatypes::{DataType, Field, Schema};
17use async_trait::async_trait;
18use parquet::arrow::ProjectionMask;
19use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
20use serde_json::Value as JsonValue;
21
22use datafusion::catalog::information_schema::InformationSchemaProvider;
23use datafusion::catalog::{CatalogProviderList, SchemaProvider};
24use datafusion::datasource::file_format::parquet::ParquetFormat;
25use datafusion::datasource::listing::{
26 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
27};
28use datafusion::datasource::{MemTable, TableProvider};
29use datafusion::error::Result as DfResult;
30use datafusion::execution::cache::DefaultListFilesCache;
31use datafusion::execution::cache::cache_manager::CacheManagerConfig;
32use datafusion::execution::runtime_env::RuntimeEnvBuilder;
33use datafusion::logical_expr::{
34 ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
35};
36use datafusion::prelude::{SessionConfig, SessionContext};
37use datafusion::scalar::ScalarValue;
38
39use object_store::aws::AmazonS3Builder;
40use url::Url;
41
42use datapress_core::backend::{
43 ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
44};
45use datapress_core::config::{
46 AddressingStyle, AppConfig, DataFusionConfig, DatasetConfig, IndexConfig, IndexMode,
47 Partitioning, ResolvedCreds, S3Config, ServerConfig, SourceKind,
48};
49use datapress_core::errors::AppError;
50use datapress_core::models::{CountRequest, Predicate, QueryRequest};
51use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
52
53type FastMap<K, V> = HashMap<K, V, ahash::RandomState>;
62
63type EqIndex = FastMap<String, FastMap<String, Vec<u32>>>;
65
66pub struct DatasetState {
81 pub schema: DatasetSchema,
82 pub data: Vec<RecordBatch>,
83 pub arrow_schema: Arc<Schema>,
84 pub index: EqIndex,
85 pub lazy: bool,
86}
87
88impl DatasetState {
89 pub fn num_rows(&self) -> usize {
91 self.data.iter().map(|b| b.num_rows()).sum()
92 }
93}
94
95pub struct Store {
100 ctx: SessionContext,
101 max_page_size: u64,
102 configs: RwLock<HashMap<String, DatasetConfig>>,
107 datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
109 reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
112}
113
114impl Store {
115 pub fn session_context(&self) -> &SessionContext {
119 &self.ctx
120 }
121
122 pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
124 if cfg
127 .datasets
128 .iter()
129 .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
130 {
131 deltalake::aws::register_handlers(None);
132 }
133
134 let ctx = build_tuned_context(&cfg.datafusion);
140 let mut datasets = HashMap::with_capacity(cfg.datasets.len());
141 let mut configs = HashMap::with_capacity(cfg.datasets.len());
142
143 for d in &cfg.datasets {
144 log::info!(
145 "Loading dataset '{}' ({} @ {})",
146 d.name,
147 d.source.kind.as_str(),
148 d.source.location
149 );
150 let d: std::borrow::Cow<'_, DatasetConfig> = match should_force_lazy(d, &cfg.server)
154 .await
155 {
156 Some(bytes) => {
157 log::info!(
158 "dataset '{}': {:.1} MiB exceeds force_lazy_above_mb = {} → forcing lazy",
159 d.name,
160 bytes as f64 / (1024.0 * 1024.0),
161 cfg.server.force_lazy_above_mb
162 );
163 let mut forced = d.clone();
164 forced.lazy = true;
165 std::borrow::Cow::Owned(forced)
166 }
167 None => std::borrow::Cow::Borrowed(d),
168 };
169 let d = d.as_ref();
170 let (state, provider) = match build_dataset(d, &ctx).await {
171 Ok(built) => built,
172 Err(AppError::EmptyDataset(msg)) => {
173 log::warn!("skipping empty dataset '{}': {msg}", d.name);
174 continue;
175 }
176 Err(e) if d.source.is_s3() && is_s3_access_denied(&e.to_string()) => {
182 log::warn!(
183 "skipping dataset '{}': S3 access denied — check credentials \
184 and bucket policy ({e})",
185 d.name
186 );
187 continue;
188 }
189 Err(e) => return Err(e),
190 };
191 ctx.register_table(d.name.as_str(), provider)?;
192 datasets.insert(d.name.clone(), Arc::new(state));
193 configs.insert(d.name.clone(), d.clone());
194 }
195 Ok(Self {
196 ctx,
197 max_page_size: cfg.server.max_page_size.max(1),
198 configs: RwLock::new(configs),
199 datasets: ArcSwap::from_pointee(datasets),
200 reload_locks: Mutex::new(HashMap::new()),
201 })
202 }
203
204 pub fn names(&self) -> Vec<String> {
206 let snap = self.datasets.load();
207 let mut v: Vec<String> = snap.keys().cloned().collect();
208 v.sort();
209 v
210 }
211
212 pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
213 self.datasets
214 .load()
215 .get(name)
216 .cloned()
217 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
218 }
219
220 pub async fn sample(&self, name: &str) -> Result<String, AppError> {
223 let st = self.dataset(name)?;
224
225 if st.lazy {
227 let table = DatasetSchema::quote_ident(&st.schema.name);
228 let sql = format!("SELECT * FROM {table} LIMIT 1");
229 let df = self.ctx.sql(&sql).await?;
230 let batches = df.collect().await?;
231 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
232 return Ok("null".into());
233 }
234 let arr = serialize(&batches[0].slice(0, 1))?;
235 let trimmed = arr.trim();
236 let inner = trimmed
237 .strip_prefix('[')
238 .and_then(|s| s.strip_suffix(']'))
239 .unwrap_or(trimmed);
240 return Ok(inner.to_string());
241 }
242
243 let first = match st.data.iter().find(|b| b.num_rows() > 0) {
244 Some(b) => b,
245 None => return Ok("null".into()),
246 };
247 let arr = serialize(&first.slice(0, 1))?;
248 let trimmed = arr.trim();
250 let inner = trimmed
251 .strip_prefix('[')
252 .and_then(|s| s.strip_suffix(']'))
253 .unwrap_or(trimmed);
254 Ok(inner.to_string())
255 }
256
257 pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
262 let cfg = self
264 .configs
265 .read()
266 .unwrap()
267 .get(name)
268 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
269 .clone();
270
271 let lock = {
273 let mut locks = self.reload_locks.lock().unwrap();
274 locks
275 .entry(name.to_string())
276 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
277 .clone()
278 };
279 let _guard = lock.lock().await;
280
281 let started = std::time::Instant::now();
282
283 if let Some(cache) = self.ctx.runtime_env().cache_manager.get_list_files_cache() {
299 cache.clear();
300 }
301
302 let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
305 let rows = state.num_rows();
306
307 let _ = self.ctx.deregister_table(name)?;
313 self.ctx.register_table(name, provider)?;
314
315 let mut new_map = (**self.datasets.load()).clone();
316 new_map.insert(name.to_string(), Arc::new(state));
317 self.datasets.store(Arc::new(new_map));
318
319 let elapsed_ms = started.elapsed().as_millis();
320 log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
321 Ok(ReloadStats { rows, elapsed_ms })
322 }
323
324 pub async fn register(&self, cfg: DatasetConfig) -> Result<DatasetSummary, AppError> {
328 cfg.validate_for_register()?;
329
330 if self.datasets.load().contains_key(&cfg.name) {
332 return Err(AppError::InvalidValue(format!(
333 "dataset '{}' already exists",
334 cfg.name
335 )));
336 }
337
338 let lock = {
340 let mut locks = self.reload_locks.lock().unwrap();
341 locks
342 .entry(cfg.name.clone())
343 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
344 .clone()
345 };
346 let _guard = lock.lock().await;
347
348 if self.datasets.load().contains_key(&cfg.name) {
350 return Err(AppError::InvalidValue(format!(
351 "dataset '{}' already exists",
352 cfg.name
353 )));
354 }
355
356 if cfg.source.kind == SourceKind::Delta && cfg.source.is_s3() {
359 deltalake::aws::register_handlers(None);
360 }
361
362 let started = std::time::Instant::now();
363 let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
364 let rows = state.num_rows();
365 let columns = state.schema.columns.len();
366
367 self.ctx.register_table(cfg.name.as_str(), provider)?;
368
369 let mut new_map = (**self.datasets.load()).clone();
370 new_map.insert(cfg.name.clone(), Arc::new(state));
371 self.datasets.store(Arc::new(new_map));
372 self.configs
373 .write()
374 .unwrap()
375 .insert(cfg.name.clone(), cfg.clone());
376
377 let elapsed_ms = started.elapsed().as_millis();
378 log::info!(
379 "registered dataset '{}' ({} @ {}): {rows} rows in {elapsed_ms} ms",
380 cfg.name,
381 cfg.source.kind.as_str(),
382 cfg.source.location
383 );
384 Ok(DatasetSummary {
385 name: cfg.name,
386 columns,
387 rows,
388 })
389 }
390
391 pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
395 let batch = self.query_batch(name, req).await?;
396 if batch.num_rows() == 0 {
397 return Ok("[]".to_string());
398 }
399 serialize(&batch)
400 }
401
402 fn canonicalize_sql(&self, sql: &str) -> String {
408 let snap = self.datasets.load();
409 let mut tables: HashMap<String, String> = HashMap::with_capacity(snap.len());
410 let mut columns: HashMap<String, String> = HashMap::new();
411 for (name, state) in snap.iter() {
412 tables.insert(name.to_lowercase(), name.clone());
413 for col in &state.schema.columns {
414 columns
415 .entry(col.name.to_lowercase())
416 .or_insert_with(|| col.name.clone());
417 }
418 }
419 datapress_core::sql::canonicalize_identifiers(sql, &tables, &columns)
420 }
421
422 pub async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
429 let cap = max_rows.max(1);
430 let sql = self.canonicalize_sql(sql);
431 let wrapped = if datapress_core::sql::is_describe(&sql) {
436 sql
437 } else {
438 format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
439 };
440 let df = self.ctx.sql(&wrapped).await?;
441 let batches = df.collect().await?;
442 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
443 return Ok("[]".to_string());
444 }
445 let batch = if batches.len() == 1 {
446 batches.into_iter().next().expect("checked len")
447 } else {
448 compute::concat_batches(&batches[0].schema(), batches.iter())?
449 };
450 let batch = if batch.num_rows() as u64 > cap {
453 batch.slice(0, cap as usize)
454 } else {
455 batch
456 };
457 serialize(&batch)
458 }
459
460 pub async fn query_sql_arrow_stream(
464 &self,
465 sql: &str,
466 max_rows: u64,
467 ) -> Result<ArrowIpcStream, AppError> {
468 let cap = max_rows.max(1);
469 let sql = self.canonicalize_sql(sql);
470 let wrapped = if datapress_core::sql::is_describe(&sql) {
474 sql
475 } else {
476 format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
477 };
478 let df = self.ctx.sql(&wrapped).await?;
479 let batches = df.collect().await?;
480 Ok(stream_arrow_batches(batches))
481 }
482
483 pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
487 let batch = self.query_batch(name, req).await?;
488 let schema = batch.schema();
489 let mut buf = Vec::with_capacity(8 * 1024);
490 {
491 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
492 if batch.num_rows() > 0 {
493 w.write(&batch)?;
494 }
495 w.finish()?;
496 }
497 Ok(buf)
498 }
499
500 pub async fn query_arrow_stream(
501 &self,
502 name: &str,
503 req: &QueryRequest,
504 ) -> Result<ArrowIpcStream, AppError> {
505 let batches = self.query_batches(name, req).await?;
506 Ok(stream_arrow_batches(batches))
507 } pub async fn query_arrow_stream_all(
508 &self,
509 name: &str,
510 req: &QueryRequest,
511 ) -> Result<ArrowIpcStream, AppError> {
512 let batches = self.query_batches_all(name, req).await?;
513 Ok(stream_arrow_batches(batches))
514 }
515
516 pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
524 let req = QueryRequest {
526 columns: Vec::new(),
527 predicates: Vec::new(),
528 group_by: Vec::new(),
529 aggregations: Vec::new(),
530 having: Vec::new(),
531 distinct: false,
532 order_by: Vec::new(),
533 limit: None,
534 page: 1,
535 page_size: 1,
536 };
537 let st = self.dataset(name)?;
538 let batches = self.query_batches_all(name, &req).await?;
539 let schema = batches
543 .first()
544 .map(|b| b.schema())
545 .unwrap_or_else(|| st.arrow_schema.clone());
546
547 let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
548 {
549 let props = parquet::file::properties::WriterProperties::builder()
550 .set_compression(parquet::basic::Compression::SNAPPY)
551 .build();
552 let mut writer =
553 parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
554 .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
555 for batch in &batches {
556 if batch.num_rows() > 0 {
557 writer
558 .write(batch)
559 .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
560 }
561 }
562 writer
563 .close()
564 .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
565 }
566 Ok(bytes::Bytes::from(buf))
567 }
568
569 async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
572 let batches = self.query_batches(name, req).await?;
573 if batches.is_empty() {
574 return Ok(RecordBatch::new_empty(Arc::new(
575 arrow::datatypes::Schema::empty(),
576 )));
577 }
578 if batches.len() == 1 {
579 return Ok(batches.into_iter().next().expect("checked len"));
580 }
581 if batches.iter().all(|b| b.num_rows() == 0) {
582 return Ok(RecordBatch::new_empty(batches[0].schema()));
583 }
584 let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
585 Ok(batch)
586 }
587
588 async fn query_batches(
592 &self,
593 name: &str,
594 req: &QueryRequest,
595 ) -> Result<Vec<RecordBatch>, AppError> {
596 let st = self.dataset(name)?;
597
598 let page = req.page.max(1);
599 let page_size = req.page_size.clamp(1, self.max_page_size);
600 let offset = ((page - 1) * page_size) as usize;
601 let limit = page_size as usize;
602
603 self.query_batches_inner(st, req, Some((offset, limit)))
604 .await
605 }
606
607 async fn query_batches_all(
611 &self,
612 name: &str,
613 req: &QueryRequest,
614 ) -> Result<Vec<RecordBatch>, AppError> {
615 let st = self.dataset(name)?;
616 self.query_batches_inner(st, req, None).await
617 }
618
619 async fn query_batches_inner(
620 &self,
621 st: Arc<DatasetState>,
622 req: &QueryRequest,
623 page_window: Option<(usize, usize)>,
624 ) -> Result<Vec<RecordBatch>, AppError> {
625 let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
626
627 let can_fast_path = !st.lazy
634 && req.order_by.is_empty()
635 && (page_window.is_none() || req.limit.is_none())
636 && req.group_by.is_empty()
637 && !req.distinct;
638
639 if can_fast_path {
640 let total = st.num_rows();
641
642 if req.predicates.is_empty() {
645 if page_window.is_none() && req.limit.is_none() {
646 return st
647 .data
648 .iter()
649 .cloned()
650 .map(|batch| project(&st.schema, batch, &req.columns))
651 .collect();
652 }
653 let start = offset.min(total);
654 let len = limit.min(total - start);
655 let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
656 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
657 }
658
659 if let Some(rows) = try_index(&st.index, &req.predicates) {
662 let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
663 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
664 }
665 }
666
667 let (sql, params) = match page_window {
669 Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
670 None => build_query_stream_sql(&st.schema, req)?,
671 };
672 let mut df = self.ctx.sql(&sql).await?;
673 if !params.is_empty() {
674 df = df.with_param_values(params)?;
675 }
676 let batches = df.collect().await?;
677 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
678 let schema = batches
679 .first()
680 .map(|b| b.schema())
681 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
682 return Ok(vec![RecordBatch::new_empty(schema)]);
683 }
684 Ok(batches)
685 }
686}
687
688fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
689 let schema = batches
690 .first()
691 .map(|batch| batch.schema())
692 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
693 let (mut writer, stream) = arrow_ipc_stream_channel(8);
694
695 tokio::task::spawn_blocking(move || {
696 let result = (|| -> Result<(), AppError> {
697 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
698 for batch in batches {
699 if batch.num_rows() > 0 {
700 w.write(&batch)?;
701 }
702 }
703 w.finish()?;
704 Ok(())
705 })();
706 if let Err(err) = result {
707 log::error!("datafusion arrow stream failed: {err}");
708 writer.send_error(err);
709 }
710 });
711
712 stream
713}
714
715impl Store {
716 pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
720 let st = self.dataset(name)?;
721
722 if !st.lazy {
723 if req.predicates.is_empty() {
725 return Ok(st.num_rows() as i64);
726 }
727 if let Some(rows) = try_index(&st.index, &req.predicates) {
729 return Ok(rows.len() as i64);
730 }
731 }
732
733 let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
736 let mut df = self.ctx.sql(&sql).await?;
737 if !params.is_empty() {
738 df = df.with_param_values(params)?;
739 }
740 let batches = df.collect().await?;
741 let n = batches
742 .first()
743 .and_then(|b| {
744 b.column(0)
745 .as_any()
746 .downcast_ref::<arrow::array::Int64Array>()
747 })
748 .filter(|a| !a.is_empty())
749 .map(|a| a.value(0))
750 .unwrap_or(0);
751 Ok(n)
752 }
753}
754
755fn build_tuned_context(cfg: &DataFusionConfig) -> SessionContext {
782 let mut config = SessionConfig::new();
783 {
784 let opts = config.options_mut();
785 opts.execution.parquet.pushdown_filters = cfg.pushdown_filters;
786 opts.execution.parquet.reorder_filters = cfg.reorder_filters;
787 opts.catalog.information_schema = false;
801 }
802
803 let default_schema = config.options().catalog.default_schema.clone();
806
807 if !cfg.list_files_cache {
808 let ctx = SessionContext::new_with_config(config);
809 register_compat_udfs(&ctx, default_schema);
810 register_information_schema_shim(&ctx);
811 return ctx;
812 }
813
814 let ttl = (cfg.list_files_cache_ttl_secs > 0)
817 .then(|| Duration::from_secs(cfg.list_files_cache_ttl_secs));
818 let list_cache = Arc::new(DefaultListFilesCache::new(
819 cfg.list_files_cache_mb.saturating_mul(1024 * 1024),
820 ttl,
821 ));
822 let cache_manager = CacheManagerConfig::default().with_list_files_cache(Some(list_cache));
823
824 let runtime = RuntimeEnvBuilder::new()
825 .with_cache_manager(cache_manager)
826 .build_arc()
827 .expect("failed to build DataFusion runtime env");
828
829 let ctx = SessionContext::new_with_config_rt(config, runtime);
830 register_compat_udfs(&ctx, default_schema);
831 register_information_schema_shim(&ctx);
832 ctx
833}
834
835fn register_compat_udfs(ctx: &SessionContext, default_schema: String) {
850 ctx.register_udf(ScalarUDF::from(CurrentSchemaUdf::new(default_schema)));
851}
852
853fn register_information_schema_shim(ctx: &SessionContext) {
879 let default_catalog = ctx
880 .state()
881 .config()
882 .options()
883 .catalog
884 .default_catalog
885 .clone();
886 let Some(catalog) = ctx.catalog(&default_catalog) else {
887 return;
888 };
889 let catalog_list = Arc::clone(ctx.state().catalog_list());
890 let provider = Arc::new(InformationSchemaWithConstraints::new(catalog_list));
891 let _ = catalog.register_schema("information_schema", provider);
895}
896
897#[derive(Debug)]
900struct InformationSchemaWithConstraints {
901 inner: InformationSchemaProvider,
904 constraints: HashMap<&'static str, Arc<dyn TableProvider>>,
906}
907
908impl InformationSchemaWithConstraints {
909 fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
910 let mut constraints: HashMap<&'static str, Arc<dyn TableProvider>> = HashMap::new();
911 constraints.insert("table_constraints", empty_table(table_constraints_schema()));
912 constraints.insert("key_column_usage", empty_table(key_column_usage_schema()));
913 constraints.insert(
914 "referential_constraints",
915 empty_table(referential_constraints_schema()),
916 );
917 Self {
918 inner: InformationSchemaProvider::new(catalog_list),
919 constraints,
920 }
921 }
922}
923
924#[async_trait]
925impl SchemaProvider for InformationSchemaWithConstraints {
926 fn as_any(&self) -> &dyn Any {
927 self
928 }
929
930 fn table_names(&self) -> Vec<String> {
931 let mut names = self.inner.table_names();
932 names.extend(self.constraints.keys().map(|k| (*k).to_string()));
933 names
934 }
935
936 async fn table(&self, name: &str) -> DfResult<Option<Arc<dyn TableProvider>>> {
937 if let Some(table) = self.constraints.get(name.to_ascii_lowercase().as_str()) {
938 return Ok(Some(Arc::clone(table)));
939 }
940 self.inner.table(name).await
941 }
942
943 fn table_exist(&self, name: &str) -> bool {
944 self.constraints
945 .contains_key(name.to_ascii_lowercase().as_str())
946 || self.inner.table_exist(name)
947 }
948}
949
950fn empty_table(schema: Arc<Schema>) -> Arc<dyn TableProvider> {
952 Arc::new(MemTable::try_new(schema, vec![Vec::new()]).expect("empty MemTable schema is valid"))
953}
954
955fn table_constraints_schema() -> Arc<Schema> {
959 Arc::new(Schema::new(vec![
960 Field::new("constraint_catalog", DataType::Utf8, true),
961 Field::new("constraint_schema", DataType::Utf8, true),
962 Field::new("constraint_name", DataType::Utf8, true),
963 Field::new("table_catalog", DataType::Utf8, true),
964 Field::new("table_schema", DataType::Utf8, true),
965 Field::new("table_name", DataType::Utf8, true),
966 Field::new("constraint_type", DataType::Utf8, true),
967 Field::new("is_deferrable", DataType::Utf8, true),
968 Field::new("initially_deferred", DataType::Utf8, true),
969 Field::new("enforced", DataType::Utf8, true),
970 ]))
971}
972
973fn key_column_usage_schema() -> Arc<Schema> {
976 Arc::new(Schema::new(vec![
977 Field::new("constraint_catalog", DataType::Utf8, true),
978 Field::new("constraint_schema", DataType::Utf8, true),
979 Field::new("constraint_name", DataType::Utf8, true),
980 Field::new("table_catalog", DataType::Utf8, true),
981 Field::new("table_schema", DataType::Utf8, true),
982 Field::new("table_name", DataType::Utf8, true),
983 Field::new("column_name", DataType::Utf8, true),
984 Field::new("ordinal_position", DataType::Int32, true),
985 Field::new("position_in_unique_constraint", DataType::Int32, true),
986 ]))
987}
988
989fn referential_constraints_schema() -> Arc<Schema> {
992 Arc::new(Schema::new(vec![
993 Field::new("constraint_catalog", DataType::Utf8, true),
994 Field::new("constraint_schema", DataType::Utf8, true),
995 Field::new("constraint_name", DataType::Utf8, true),
996 Field::new("unique_constraint_catalog", DataType::Utf8, true),
997 Field::new("unique_constraint_schema", DataType::Utf8, true),
998 Field::new("unique_constraint_name", DataType::Utf8, true),
999 Field::new("match_option", DataType::Utf8, true),
1000 Field::new("update_rule", DataType::Utf8, true),
1001 Field::new("delete_rule", DataType::Utf8, true),
1002 ]))
1003}
1004
1005#[derive(Debug, PartialEq, Eq, Hash)]
1008struct CurrentSchemaUdf {
1009 signature: Signature,
1010 schema: String,
1011}
1012
1013impl CurrentSchemaUdf {
1014 fn new(schema: String) -> Self {
1015 Self {
1016 signature: Signature::nullary(Volatility::Stable),
1017 schema,
1018 }
1019 }
1020}
1021
1022impl ScalarUDFImpl for CurrentSchemaUdf {
1023 fn as_any(&self) -> &dyn Any {
1024 self
1025 }
1026
1027 fn name(&self) -> &str {
1028 "current_schema"
1029 }
1030
1031 fn signature(&self) -> &Signature {
1032 &self.signature
1033 }
1034
1035 fn return_type(&self, _arg_types: &[DataType]) -> DfResult<DataType> {
1036 Ok(DataType::Utf8)
1037 }
1038
1039 fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> DfResult<ColumnarValue> {
1040 Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(
1041 self.schema.clone(),
1042 ))))
1043 }
1044}
1045
1046async fn build_dataset(
1047 d: &DatasetConfig,
1048 ctx: &SessionContext,
1049) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1050 if d.lazy {
1057 match (d.source.kind, d.source.is_s3()) {
1058 (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
1059 (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
1060 (SourceKind::Delta, _) => return build_lazy_delta(d, ctx).await,
1061 }
1062 }
1063
1064 let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
1069 (SourceKind::Parquet, false) => read_local_parquet(d)?,
1070 (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
1071 (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
1072 (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
1073 };
1074 if raw_batches.is_empty() {
1075 return Err(AppError::EmptyDataset(format!(
1076 "dataset '{}': source produced no batches",
1077 d.name
1078 )));
1079 }
1080 if raw_batches.iter().all(|b| b.num_rows() == 0) {
1085 return Err(AppError::EmptyDataset(format!(
1086 "dataset '{}': source has a schema but no rows",
1087 d.name
1088 )));
1089 }
1090
1091 let chunks = raw_batches;
1092 let arrow_sch = chunks[0].schema();
1093
1094 let columns: Vec<ColumnInfo> = arrow_sch
1096 .fields()
1097 .iter()
1098 .map(|f| {
1099 let dt = f.data_type();
1100 ColumnInfo {
1101 name: f.name().clone(),
1102 logical: arrow_to_logical(dt),
1103 sql_type: format!("{dt:?}"),
1104 nullable: f.is_nullable(),
1105 }
1106 })
1107 .collect();
1108 let schema = DatasetSchema::new(&d.name, columns)
1109 .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1110
1111 let index = build_eq_index_with_policy(&chunks, &d.index);
1116
1117 let n_parts = std::thread::available_parallelism()
1122 .map(|n| n.get())
1123 .unwrap_or(4);
1124 let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
1125 for (i, b) in chunks.iter().enumerate() {
1126 if b.num_rows() == 0 {
1127 continue;
1128 }
1129 parts[i % n_parts].push(b.clone());
1130 }
1131 parts.retain(|p| !p.is_empty());
1132 let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
1133
1134 let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
1135 let mem_mb: usize = chunks
1136 .iter()
1137 .flat_map(|b| b.columns().iter())
1138 .map(|c| c.get_buffer_memory_size())
1139 .sum::<usize>()
1140 / 1_048_576;
1141 log::info!(
1142 "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
1143 d.name,
1144 d.source.kind.as_str(),
1145 total_rows,
1146 schema.columns.len(),
1147 mem_mb,
1148 chunks.len(),
1149 index.len()
1150 );
1151
1152 Ok((
1153 DatasetState {
1154 schema,
1155 data: chunks,
1156 arrow_schema: arrow_sch,
1157 index,
1158 lazy: false,
1159 },
1160 provider,
1161 ))
1162}
1163
1164async fn build_lazy_local_parquet(
1169 d: &DatasetConfig,
1170 ctx: &SessionContext,
1171) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1172 let (url, part_keys) = lazy_local_listing(d)?;
1173
1174 let mut opts =
1175 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
1176 if !part_keys.is_empty() {
1177 opts = opts.with_table_partition_cols(
1178 part_keys
1179 .iter()
1180 .map(|k| (k.clone(), DataType::Utf8))
1181 .collect(),
1182 );
1183 }
1184
1185 let session_state = ctx.state();
1186 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
1189 AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
1190 })?;
1191
1192 if file_schema.fields().is_empty() {
1196 return Err(AppError::EmptyDataset(format!(
1197 "dataset '{}': no .parquet files at {}",
1198 d.name, d.source.location
1199 )));
1200 }
1201
1202 let cfg = ListingTableConfig::new(url)
1203 .with_listing_options(opts)
1204 .with_schema(file_schema.clone());
1205 let table = ListingTable::try_new(cfg).map_err(|e| {
1206 AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
1207 })?;
1208 let provider: Arc<dyn TableProvider> = Arc::new(table);
1209
1210 let mut fields: Vec<Field> = file_schema
1212 .fields()
1213 .iter()
1214 .map(|f| f.as_ref().clone())
1215 .collect();
1216 for k in &part_keys {
1217 if !fields.iter().any(|f| f.name() == k) {
1218 fields.push(Field::new(k, DataType::Utf8, false));
1219 }
1220 }
1221 let arrow_sch = Arc::new(Schema::new(fields));
1222
1223 let columns: Vec<ColumnInfo> = arrow_sch
1224 .fields()
1225 .iter()
1226 .map(|f| {
1227 let dt = f.data_type();
1228 ColumnInfo {
1229 name: f.name().clone(),
1230 logical: arrow_to_logical(dt),
1231 sql_type: format!("{dt:?}"),
1232 nullable: f.is_nullable(),
1233 }
1234 })
1235 .collect();
1236 let schema = DatasetSchema::new(&d.name, columns)
1237 .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1238
1239 log::info!(
1240 "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
1241 d.name,
1242 d.source.kind.as_str(),
1243 schema.columns.len(),
1244 part_keys.len()
1245 );
1246
1247 Ok((
1248 DatasetState {
1249 schema,
1250 data: Vec::new(),
1251 arrow_schema: arrow_sch,
1252 index: EqIndex::default(),
1253 lazy: true,
1254 },
1255 provider,
1256 ))
1257}
1258
1259fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1264 let loc = &d.source.location;
1265
1266 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1267 let parts: Vec<&str> = loc.split('/').collect();
1268 let first_wild = parts
1269 .iter()
1270 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1271 .unwrap_or(parts.len());
1272 let base = parts[..first_wild].join("/");
1273 let base = if base.is_empty() {
1274 "/".to_string()
1275 } else {
1276 base
1277 };
1278 let upper = parts.len().saturating_sub(1);
1281 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1282 .iter()
1283 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1284 .filter(|k| !k.is_empty())
1285 .collect();
1286 return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
1287 }
1288
1289 let path = std::path::Path::new(loc);
1290 if path.is_dir() {
1291 let keys = discover_hive_keys(path);
1292 return Ok((dir_url(path, d)?, keys));
1293 }
1294
1295 let url = ListingTableUrl::parse(loc)
1296 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
1297 Ok((url, Vec::new()))
1298}
1299
1300fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
1303 let s = path.to_str().ok_or_else(|| {
1304 AppError::Internal(format!(
1305 "dataset '{}': non-utf8 path {}",
1306 d.name,
1307 path.display()
1308 ))
1309 })?;
1310 let s = if s.ends_with('/') {
1311 s.to_string()
1312 } else {
1313 format!("{s}/")
1314 };
1315 ListingTableUrl::parse(&s)
1316 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
1317}
1318
1319fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
1323 let mut keys = Vec::new();
1324 let mut cur = base.to_path_buf();
1325 loop {
1326 let Ok(rd) = std::fs::read_dir(&cur) else {
1327 break;
1328 };
1329 let mut next: Option<(String, std::path::PathBuf)> = None;
1330 for entry in rd.flatten() {
1331 let p = entry.path();
1332 if !p.is_dir() {
1333 continue;
1334 }
1335 let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
1336 continue;
1337 };
1338 if let Some((k, v)) = name.split_once('=')
1339 && !k.is_empty()
1340 && !v.is_empty()
1341 {
1342 next = Some((k.to_string(), p));
1343 break;
1344 }
1345 }
1346 match next {
1347 Some((k, p)) => {
1348 keys.push(k);
1349 cur = p;
1350 }
1351 None => break,
1352 }
1353 }
1354 keys
1355}
1356
1357async fn build_lazy_s3_parquet(
1363 d: &DatasetConfig,
1364 ctx: &SessionContext,
1365) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1366 register_s3_object_store(d, ctx)?;
1367
1368 let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
1369
1370 if file_schema.fields().is_empty() {
1373 return Err(AppError::EmptyDataset(format!(
1374 "dataset '{}': no .parquet files at {}",
1375 d.name, d.source.location
1376 )));
1377 }
1378
1379 let mut fields: Vec<Field> = file_schema
1381 .fields()
1382 .iter()
1383 .map(|f| f.as_ref().clone())
1384 .collect();
1385 for k in &part_keys {
1386 if !fields.iter().any(|f| f.name() == k) {
1387 fields.push(Field::new(k, DataType::Utf8, false));
1388 }
1389 }
1390 let arrow_sch = Arc::new(Schema::new(fields));
1391
1392 let columns: Vec<ColumnInfo> = arrow_sch
1393 .fields()
1394 .iter()
1395 .map(|f| {
1396 let dt = f.data_type();
1397 ColumnInfo {
1398 name: f.name().clone(),
1399 logical: arrow_to_logical(dt),
1400 sql_type: format!("{dt:?}"),
1401 nullable: f.is_nullable(),
1402 }
1403 })
1404 .collect();
1405 let schema = DatasetSchema::new(&d.name, columns)
1406 .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1407
1408 log::info!(
1409 "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
1410 d.name,
1411 d.source.kind.as_str(),
1412 schema.columns.len(),
1413 part_keys.len()
1414 );
1415
1416 Ok((
1417 DatasetState {
1418 schema,
1419 data: Vec::new(),
1420 arrow_schema: arrow_sch,
1421 index: EqIndex::default(),
1422 lazy: true,
1423 },
1424 provider,
1425 ))
1426}
1427
1428async fn build_s3_listing_table(
1434 d: &DatasetConfig,
1435 ctx: &SessionContext,
1436) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
1437 let (url, part_keys) = s3_listing(d, ctx).await?;
1438
1439 let mut opts =
1440 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
1441 if !part_keys.is_empty() {
1442 opts = opts.with_table_partition_cols(
1443 part_keys
1444 .iter()
1445 .map(|k| (k.clone(), DataType::Utf8))
1446 .collect(),
1447 );
1448 }
1449
1450 let session_state = ctx.state();
1451 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
1452 AppError::Internal(format!(
1453 "dataset '{}': infer parquet schema on s3: {e}",
1454 d.name
1455 ))
1456 })?;
1457
1458 let cfg = ListingTableConfig::new(url)
1459 .with_listing_options(opts)
1460 .with_schema(file_schema.clone());
1461 let table = ListingTable::try_new(cfg).map_err(|e| {
1462 AppError::Internal(format!(
1463 "dataset '{}': ListingTable::try_new (s3): {e}",
1464 d.name
1465 ))
1466 })?;
1467 Ok((Arc::new(table), file_schema, part_keys))
1468}
1469
1470async fn s3_listing(
1476 d: &DatasetConfig,
1477 ctx: &SessionContext,
1478) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1479 let s3 = d.s3.clone().unwrap_or_default();
1480 let want_partitions = !matches!(s3.partitioning, Partitioning::None);
1481 let loc = &d.source.location;
1482
1483 if d.source.has_glob() {
1484 let (base, keys) = split_glob_base_keys(loc);
1485 let base = format!("{}/", base.trim_end_matches('/'));
1486 let url = ListingTableUrl::parse(&base).map_err(|e| {
1487 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1488 })?;
1489 let keys = if want_partitions { keys } else { Vec::new() };
1490 return Ok((url, keys));
1491 }
1492
1493 let base = if loc.ends_with('/') {
1494 loc.clone()
1495 } else {
1496 format!("{loc}/")
1497 };
1498 let url = ListingTableUrl::parse(&base).map_err(|e| {
1499 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1500 })?;
1501 let keys = if want_partitions {
1502 discover_s3_hive_keys(ctx, &url).await
1503 } else {
1504 Vec::new()
1505 };
1506 Ok((url, keys))
1507}
1508
1509fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
1513 let parts: Vec<&str> = loc.split('/').collect();
1514 let first_wild = parts
1515 .iter()
1516 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1517 .unwrap_or(parts.len());
1518 let base = parts[..first_wild].join("/");
1519 let base = if base.is_empty() {
1520 "/".to_string()
1521 } else {
1522 base
1523 };
1524 let upper = parts.len().saturating_sub(1);
1525 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1526 .iter()
1527 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1528 .filter(|k| !k.is_empty())
1529 .collect();
1530 (base, keys)
1531}
1532
1533async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
1538 let store = match ctx.runtime_env().object_store(url.object_store()) {
1539 Ok(s) => s,
1540 Err(_) => return Vec::new(),
1541 };
1542 let mut keys = Vec::new();
1543 let mut prefix = url.prefix().clone();
1544 loop {
1545 let listing = match store.list_with_delimiter(Some(&prefix)).await {
1546 Ok(l) => l,
1547 Err(_) => break,
1548 };
1549 let mut next: Option<object_store::path::Path> = None;
1550 for cp in &listing.common_prefixes {
1551 if let Some(seg) = cp.parts().next_back() {
1552 let seg = seg.as_ref().to_string();
1553 if let Some((k, v)) = seg.split_once('=')
1554 && !k.is_empty()
1555 && !v.is_empty()
1556 {
1557 keys.push(k.to_string());
1558 next = Some(cp.clone());
1559 break;
1560 }
1561 }
1562 }
1563 match next {
1564 Some(p) => prefix = p,
1565 None => break,
1566 }
1567 }
1568 keys
1569}
1570
1571fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1588 let files = d.resolve_local_parquet_files()?;
1589 let mut all = Vec::new();
1590 let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1591 None
1592 } else {
1593 Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1594 };
1595
1596 for f in &files {
1597 let file = std::fs::File::open(f)
1598 .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1599
1600 let probe = ParquetRecordBatchReaderBuilder::try_new(
1605 file.try_clone()
1606 .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1607 )?;
1608 let parquet_schema = probe.parquet_schema().clone();
1609 let arrow_schema = probe.schema().clone();
1610 let metadata = probe.metadata().clone();
1611 drop(probe);
1612
1613 let projection = if let Some(w) = &wanted {
1615 let indices: Vec<usize> = arrow_schema
1616 .fields()
1617 .iter()
1618 .enumerate()
1619 .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1620 .map(|(i, _)| i)
1621 .collect();
1622 if indices.is_empty() {
1623 return Err(AppError::Internal(format!(
1624 "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1625 d.name,
1626 d.columns,
1627 f.display()
1628 )));
1629 }
1630 ProjectionMask::roots(&parquet_schema, indices)
1631 } else {
1632 ProjectionMask::all()
1633 };
1634
1635 let mut new_fields: Vec<Field> = arrow_schema
1643 .fields()
1644 .iter()
1645 .map(|f| f.as_ref().clone())
1646 .collect();
1647 if d.dict_encode
1648 && let Some(rg0) = metadata.row_groups().first()
1649 {
1650 for (i, fld) in arrow_schema.fields().iter().enumerate() {
1651 if !matches!(
1652 fld.data_type(),
1653 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1654 ) {
1655 continue;
1656 }
1657 if let Some(col) = rg0.columns().get(i)
1658 && col.dictionary_page_offset().is_some()
1659 {
1660 new_fields[i] = Field::new(
1661 fld.name(),
1662 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1663 fld.is_nullable(),
1664 );
1665 }
1666 }
1667 }
1668 let forced_schema = Arc::new(Schema::new(new_fields));
1669
1670 let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1671 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1672 .with_batch_size(65_536)
1673 .with_projection(projection)
1674 .build()?;
1675 let pairs = hive_pairs(f);
1679 for batch in reader {
1680 let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1681 all.push(if pairs.is_empty() {
1682 batch
1683 } else {
1684 append_partition_cols(&batch, &pairs)?
1685 });
1686 }
1687 }
1688 if all.is_empty() {
1689 return Err(AppError::Internal(format!(
1690 "dataset '{}': parquet source is empty",
1691 d.name
1692 )));
1693 }
1694 Ok(all)
1695}
1696
1697fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1700 path.components()
1701 .filter_map(|c| c.as_os_str().to_str())
1702 .filter_map(|seg| {
1703 let (k, v) = seg.split_once('=')?;
1704 if k.is_empty() || v.is_empty() || v.contains('=') {
1705 return None;
1706 }
1707 Some((k.to_string(), v.to_string()))
1708 })
1709 .collect()
1710}
1711
1712fn append_partition_cols(
1715 batch: &RecordBatch,
1716 pairs: &[(String, String)],
1717) -> Result<RecordBatch, AppError> {
1718 let n = batch.num_rows();
1719 let mut fields: Vec<Field> = batch
1720 .schema()
1721 .fields()
1722 .iter()
1723 .map(|f| f.as_ref().clone())
1724 .collect();
1725 let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1726 for (k, v) in pairs {
1727 if fields.iter().any(|f| f.name() == k) {
1728 continue;
1729 }
1730 fields.push(Field::new(k, DataType::Utf8, false));
1731 cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1732 }
1733 RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1734 .map_err(|e| AppError::Internal(e.to_string()))
1735}
1736
1737async fn read_s3_parquet(
1743 d: &DatasetConfig,
1744 ctx: &SessionContext,
1745) -> Result<Vec<RecordBatch>, AppError> {
1746 register_s3_object_store(d, ctx)?;
1747 let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1748 let df = ctx
1749 .read_table(provider)
1750 .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1751 Ok(df.collect().await?)
1752}
1753
1754async fn open_delta_table(
1761 d: &DatasetConfig,
1762 opts: HashMap<String, String>,
1763) -> Result<deltalake::DeltaTable, AppError> {
1764 let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1765 AppError::Internal(format!(
1766 "dataset '{}': bad delta location '{}': {e}",
1767 d.name, d.source.location
1768 ))
1769 })?;
1770 deltalake::open_table_with_storage_options(url, opts)
1771 .await
1772 .map_err(|e| {
1773 let msg = e.to_string();
1783 let low = msg.to_lowercase();
1784 if low.contains("no files in log segment") || low.contains("not a delta table") {
1785 AppError::EmptyDataset(format!(
1786 "delta location '{}' has no committed files ({msg})",
1787 d.source.location
1788 ))
1789 } else {
1790 AppError::Internal(format!(
1791 "dataset '{}': delta open '{}': {msg}",
1792 d.name, d.source.location
1793 ))
1794 }
1795 })
1796}
1797
1798async fn open_delta_provider(
1804 d: &DatasetConfig,
1805 opts: HashMap<String, String>,
1806) -> Result<Arc<dyn TableProvider>, AppError> {
1807 let table = open_delta_table(d, opts).await?;
1808 table.table_provider().await.map_err(|e| {
1809 AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1810 })
1811}
1812
1813fn delta_storage_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1816 if d.source.is_s3() {
1817 delta_s3_options(d)
1818 } else {
1819 Ok(HashMap::new())
1820 }
1821}
1822
1823async fn read_delta(
1827 d: &DatasetConfig,
1828 opts: HashMap<String, String>,
1829) -> Result<Vec<RecordBatch>, AppError> {
1830 let provider = open_delta_provider(d, opts).await?;
1831 let scan_ctx = SessionContext::new();
1841 let df = scan_ctx.read_table(provider).map_err(|e| {
1842 AppError::EmptyDataset(format!(
1843 "delta location '{}' could not be scanned, skipping ({e})",
1844 d.source.location
1845 ))
1846 })?;
1847 df.collect().await.map_err(|e| {
1848 AppError::EmptyDataset(format!(
1849 "delta location '{}' could not be scanned, skipping ({e})",
1850 d.source.location
1851 ))
1852 })
1853}
1854
1855async fn build_lazy_delta(
1861 d: &DatasetConfig,
1862 _ctx: &SessionContext,
1863) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1864 let table = open_delta_table(d, delta_storage_options(d)?).await?;
1865
1866 let file_count = table
1873 .get_file_uris()
1874 .map(|it| it.count())
1875 .map_err(|e| AppError::Internal(format!("dataset '{}': delta file list: {e}", d.name)))?;
1876 if file_count == 0 {
1877 return Err(AppError::EmptyDataset(format!(
1878 "delta location '{}' has a schema but no data files",
1879 d.source.location
1880 )));
1881 }
1882
1883 let provider = table.table_provider().await.map_err(|e| {
1884 AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1885 })?;
1886
1887 {
1900 let probe_ctx = SessionContext::new();
1901 let probe = probe_ctx
1902 .read_table(provider.clone())
1903 .and_then(|df| df.limit(0, Some(1)));
1904 match probe {
1905 Ok(df) => match df.collect().await {
1906 Ok(batches) if batches.iter().all(|b| b.num_rows() == 0) => {
1907 return Err(AppError::EmptyDataset(format!(
1908 "delta location '{}' resolves to no rows",
1909 d.source.location
1910 )));
1911 }
1912 Ok(_) => {}
1913 Err(e) => {
1914 return Err(AppError::EmptyDataset(format!(
1915 "delta location '{}' could not be scanned, skipping ({e})",
1916 d.source.location
1917 )));
1918 }
1919 },
1920 Err(e) => {
1921 return Err(AppError::EmptyDataset(format!(
1922 "delta location '{}' could not be scanned, skipping ({e})",
1923 d.source.location
1924 )));
1925 }
1926 }
1927 }
1928
1929 let arrow_sch = provider.schema();
1932 let columns: Vec<ColumnInfo> = arrow_sch
1933 .fields()
1934 .iter()
1935 .map(|f| {
1936 let dt = f.data_type();
1937 ColumnInfo {
1938 name: f.name().clone(),
1939 logical: arrow_to_logical(dt),
1940 sql_type: format!("{dt:?}"),
1941 nullable: f.is_nullable(),
1942 }
1943 })
1944 .collect();
1945 let schema = DatasetSchema::new(&d.name, columns)
1946 .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1947
1948 log::info!(
1949 "dataset '{}' [{}, lazy]: {} cols, no materialise, no index",
1950 d.name,
1951 d.source.kind.as_str(),
1952 schema.columns.len()
1953 );
1954
1955 Ok((
1956 DatasetState {
1957 schema,
1958 data: Vec::new(),
1959 arrow_schema: arrow_sch,
1960 index: EqIndex::default(),
1961 lazy: true,
1962 },
1963 provider,
1964 ))
1965}
1966
1967fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1971 let creds = d.resolved_creds();
1972 let region = d.resolved_region();
1973 let s3 = d.s3.clone().unwrap_or_default();
1974 let (bucket, _) = d.source.s3_bucket()?;
1975
1976 let mut opts = HashMap::new();
1977 opts.insert("AWS_REGION".into(), region);
1978 if let Some(ep) = s3.effective_endpoint(bucket) {
1979 opts.insert("AWS_ENDPOINT_URL".into(), ep);
1980 }
1981 if s3.allow_http {
1982 opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1983 }
1984 opts.insert(
1985 "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1986 (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1987 );
1988 if let Some(k) = creds.access_key_id {
1989 opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1990 }
1991 if let Some(s) = creds.secret_access_key {
1992 opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1993 }
1994 if let Some(t) = creds.session_token {
1995 opts.insert("AWS_SESSION_TOKEN".into(), t);
1996 }
1997 opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1999 Ok(opts)
2000}
2001
2002fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
2006 let (bucket, _key) = d.source.s3_bucket()?;
2007 let creds = d.resolved_creds();
2008 let region = d.resolved_region();
2009 let s3 = d.s3.clone().unwrap_or_default();
2010
2011 let store = build_s3(bucket, ®ion, &s3, &creds).map_err(|e| {
2012 AppError::Internal(format!(
2013 "dataset '{}': build S3 store for '{bucket}': {e}",
2014 d.name
2015 ))
2016 })?;
2017
2018 let url = Url::parse(&format!("s3://{bucket}"))
2019 .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
2020 ctx.register_object_store(&url, Arc::new(store));
2021 Ok(())
2022}
2023
2024fn is_s3_access_denied(msg: &str) -> bool {
2031 let low = msg.to_lowercase();
2032 low.contains("access denied")
2033 || low.contains("accessdenied")
2034 || low.contains("forbidden")
2035 || low.contains("403")
2036}
2037
2038
2039async fn should_force_lazy(d: &DatasetConfig, server: &ServerConfig) -> Option<u64> {
2045 if d.lazy || server.force_lazy_above_mb == 0 {
2046 return None;
2047 }
2048 let threshold = server.force_lazy_above_mb.saturating_mul(1024 * 1024);
2049
2050 let bytes = if d.source.is_s3() {
2051 match estimate_s3_bytes(d).await {
2052 Ok(b) => b,
2053 Err(e) => {
2054 log::warn!(
2055 "dataset '{}': could not measure S3 size for force_lazy_above_mb: {e}",
2056 d.name
2057 );
2058 return None;
2059 }
2060 }
2061 } else {
2062 d.estimate_local_bytes()?
2063 };
2064
2065 (bytes > threshold).then_some(bytes)
2066}
2067
2068async fn estimate_s3_bytes(d: &DatasetConfig) -> Result<u64, AppError> {
2077 use futures_util::StreamExt;
2078 use object_store::ObjectStore;
2079
2080 let (bucket, _key) = d.source.s3_bucket()?;
2081 let creds = d.resolved_creds();
2082 let region = d.resolved_region();
2083 let s3 = d.s3.clone().unwrap_or_default();
2084 let store = build_s3(bucket, ®ion, &s3, &creds).map_err(|e| {
2085 AppError::Internal(format!(
2086 "dataset '{}': build S3 store for '{bucket}': {e}",
2087 d.name
2088 ))
2089 })?;
2090
2091 let (base, _keys) = split_glob_base_keys(&d.source.location);
2094 let prefix_key = base
2095 .strip_prefix("s3://")
2096 .and_then(|rest| rest.split_once('/').map(|(_bucket, key)| key))
2097 .unwrap_or("")
2098 .trim_end_matches('/');
2099 let prefix =
2100 (!prefix_key.is_empty()).then(|| object_store::path::Path::from(prefix_key));
2101
2102 let mut total: u64 = 0;
2103 let mut stream = store.list(prefix.as_ref());
2104 while let Some(meta) = stream.next().await {
2105 let meta = meta.map_err(|e| {
2106 AppError::Internal(format!(
2107 "dataset '{}': s3 list under '{prefix_key}': {e}",
2108 d.name
2109 ))
2110 })?;
2111 if meta.location.as_ref().ends_with(".parquet") {
2112 total = total.saturating_add(meta.size);
2113 }
2114 }
2115 Ok(total)
2116}
2117
2118fn build_s3(
2119 bucket: &str,
2120 region: &str,
2121 s3: &S3Config,
2122 creds: &ResolvedCreds,
2123) -> Result<object_store::aws::AmazonS3, object_store::Error> {
2124 let mut b = AmazonS3Builder::new()
2125 .with_bucket_name(bucket)
2126 .with_region(region)
2127 .with_allow_http(s3.allow_http)
2128 .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
2129 if let Some(ep) = s3.effective_endpoint(bucket) {
2130 b = b.with_endpoint(ep);
2131 }
2132 if let Some(k) = creds.access_key_id.as_deref() {
2133 b = b.with_access_key_id(k);
2134 }
2135 if let Some(s) = creds.secret_access_key.as_deref() {
2136 b = b.with_secret_access_key(s);
2137 }
2138 if let Some(t) = creds.session_token.as_deref() {
2139 b = b.with_token(t);
2140 }
2141 b.build()
2142}
2143
2144fn arrow_to_logical(dt: &DataType) -> LogicalType {
2145 match dt {
2146 DataType::Boolean => LogicalType::Bool,
2147 DataType::Int8
2148 | DataType::Int16
2149 | DataType::Int32
2150 | DataType::Int64
2151 | DataType::UInt8
2152 | DataType::UInt16
2153 | DataType::UInt32
2154 | DataType::UInt64 => LogicalType::Int,
2155 DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
2156 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
2157 DataType::Dictionary(_, v)
2161 if matches!(
2162 v.as_ref(),
2163 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
2164 ) =>
2165 {
2166 LogicalType::Utf8
2167 }
2168 DataType::Date32
2169 | DataType::Date64
2170 | DataType::Time32(_)
2171 | DataType::Time64(_)
2172 | DataType::Timestamp(_, _)
2173 | DataType::Duration(_)
2174 | DataType::Interval(_) => LogicalType::Temporal,
2175 _ => LogicalType::Other,
2176 }
2177}
2178
2179fn project(
2184 schema: &DatasetSchema,
2185 batch: RecordBatch,
2186 columns: &[String],
2187) -> Result<RecordBatch, AppError> {
2188 if columns.is_empty() {
2189 return Ok(batch);
2190 }
2191 let indices: Vec<usize> = columns
2192 .iter()
2193 .map(|c| {
2194 schema
2195 .find(c)
2196 .map(|info| schema.by_name[&info.name.to_lowercase()])
2197 })
2198 .collect::<Result<_, _>>()?;
2199 let fields: Vec<Field> = indices
2200 .iter()
2201 .map(|&i| batch.schema().field(i).clone())
2202 .collect();
2203 let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
2204 Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
2205}
2206
2207#[derive(Default)]
2220struct Params {
2221 values: Vec<ScalarValue>,
2222}
2223
2224impl Params {
2225 fn new() -> Self {
2226 Self::default()
2227 }
2228
2229 fn bind(&mut self, v: ScalarValue) -> String {
2231 self.values.push(v);
2232 format!("${}", self.values.len())
2233 }
2234
2235 fn into_values(self) -> Vec<ScalarValue> {
2236 self.values
2237 }
2238}
2239
2240fn build_query_sql(
2241 schema: &DatasetSchema,
2242 req: &QueryRequest,
2243 max_page_size: u64,
2244) -> Result<(String, Vec<ScalarValue>), AppError> {
2245 let (limit, offset) = req.effective_limit_offset(max_page_size);
2246 build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
2247}
2248
2249fn build_query_stream_sql(
2250 schema: &DatasetSchema,
2251 req: &QueryRequest,
2252) -> Result<(String, Vec<ScalarValue>), AppError> {
2253 let suffix = req
2254 .limit
2255 .map(|limit| format!(" LIMIT {limit}"))
2256 .unwrap_or_default();
2257 build_query_sql_with_suffix(schema, req, &suffix)
2258}
2259
2260fn build_query_sql_with_suffix(
2261 schema: &DatasetSchema,
2262 req: &QueryRequest,
2263 suffix: &str,
2264) -> Result<(String, Vec<ScalarValue>), AppError> {
2265 let agg_plan = req.agg_plan(schema)?;
2266
2267 let cols = if let Some(plan) = &agg_plan {
2268 let mut parts: Vec<String> = plan
2270 .group_cols
2271 .iter()
2272 .map(|c| DatasetSchema::quote_ident(c))
2273 .collect();
2274 for a in &plan.aggs {
2275 let expr = a.sql_expr()?;
2276 parts.push(format!(
2277 "{expr} AS {}",
2278 DatasetSchema::quote_ident(&a.alias)
2279 ));
2280 }
2281 parts.join(", ")
2282 } else if req.columns.is_empty() {
2283 if req.distinct {
2284 "DISTINCT *".to_string()
2285 } else {
2286 "*".to_string()
2287 }
2288 } else {
2289 let list = req
2290 .columns
2291 .iter()
2292 .map(|c| {
2293 schema
2294 .find(c)
2295 .map(|info| DatasetSchema::quote_ident(&info.name))
2296 })
2297 .collect::<Result<Vec<_>, _>>()?
2298 .join(", ");
2299 if req.distinct {
2300 format!("DISTINCT {list}")
2301 } else {
2302 list
2303 }
2304 };
2305
2306 let mut params = Params::new();
2307 let clauses: Vec<String> = req
2308 .predicates
2309 .iter()
2310 .map(|p| pred_to_sql(schema, p, &mut params))
2311 .collect::<Result<_, _>>()?;
2312
2313 let table = DatasetSchema::quote_ident(&schema.name);
2314 let where_clause = if clauses.is_empty() {
2315 String::new()
2316 } else {
2317 format!(" WHERE {}", clauses.join(" AND "))
2318 };
2319 let group_clause = match &agg_plan {
2320 Some(p) => format!(
2321 " GROUP BY {}",
2322 p.group_cols
2323 .iter()
2324 .map(|c| DatasetSchema::quote_ident(c))
2325 .collect::<Vec<_>>()
2326 .join(", "),
2327 ),
2328 None => String::new(),
2329 };
2330 let having_clause = {
2331 let resolved = req.having_plan(agg_plan.as_ref())?;
2332 if resolved.is_empty() {
2333 String::new()
2334 } else {
2335 let clauses: Vec<String> = resolved
2336 .iter()
2337 .map(|(lhs, p)| pred_to_sql_with_lhs(lhs, p, &mut params))
2338 .collect::<Result<_, _>>()?;
2339 format!(" HAVING {}", clauses.join(" AND "))
2340 }
2341 };
2342 let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
2343 Some(s) => format!(" ORDER BY {s}"),
2344 None => String::new(),
2345 };
2346 let sql =
2347 format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{having_clause}{order_clause}{suffix}");
2348 Ok((sql, params.into_values()))
2349}
2350
2351fn build_count_sql(
2352 schema: &DatasetSchema,
2353 predicates: &[Predicate],
2354) -> Result<(String, Vec<ScalarValue>), AppError> {
2355 let mut params = Params::new();
2356 let clauses: Vec<String> = predicates
2357 .iter()
2358 .map(|p| pred_to_sql(schema, p, &mut params))
2359 .collect::<Result<_, _>>()?;
2360 let table = DatasetSchema::quote_ident(&schema.name);
2361 let where_clause = if clauses.is_empty() {
2362 String::new()
2363 } else {
2364 format!(" WHERE {}", clauses.join(" AND "))
2365 };
2366 let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
2367 Ok((sql, params.into_values()))
2368}
2369
2370fn pred_to_sql(
2371 schema: &DatasetSchema,
2372 pred: &Predicate,
2373 params: &mut Params,
2374) -> Result<String, AppError> {
2375 let info = schema.find(&pred.col)?;
2376 let col = DatasetSchema::quote_ident(&info.name);
2377 pred_to_sql_with_lhs(&col, pred, params)
2378}
2379
2380fn pred_to_sql_with_lhs(
2386 col: &str,
2387 pred: &Predicate,
2388 params: &mut Params,
2389) -> Result<String, AppError> {
2390 match pred.op.as_str() {
2391 "is_null" => return Ok(format!("{col} IS NULL")),
2392 "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
2393 _ => {}
2394 }
2395
2396 let val = pred
2397 .val
2398 .as_ref()
2399 .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
2400
2401 if pred.op == "in" {
2402 let items = val
2403 .as_array()
2404 .filter(|a| !a.is_empty())
2405 .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
2406 let placeholders: Vec<String> = items
2407 .iter()
2408 .map(|item| Ok(params.bind(json_to_scalar(item)?)))
2409 .collect::<Result<_, AppError>>()?;
2410 return Ok(format!("{col} IN ({})", placeholders.join(", ")));
2411 }
2412
2413 let sql_op = match pred.op.as_str() {
2414 "eq" => "=",
2415 "neq" => "!=",
2416 "gt" => ">",
2417 "gte" => ">=",
2418 "lt" => "<",
2419 "lte" => "<=",
2420 "like" => "LIKE",
2421 "ilike" => "ILIKE",
2422 other => return Err(AppError::UnknownOperator(other.into())),
2423 };
2424 let placeholder = params.bind(json_to_scalar(val)?);
2425 Ok(format!("{col} {sql_op} {placeholder}"))
2426}
2427
2428fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
2432 match val {
2433 JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
2434 JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
2435 JsonValue::Null => Ok(ScalarValue::Null),
2436 JsonValue::Number(n) => {
2437 if let Some(i) = n.as_i64() {
2438 Ok(ScalarValue::Int64(Some(i)))
2439 } else if let Some(u) = n.as_u64() {
2440 Ok(ScalarValue::UInt64(Some(u)))
2441 } else if let Some(f) = n.as_f64() {
2442 Ok(ScalarValue::Float64(Some(f)))
2443 } else {
2444 Err(AppError::InvalidValue(
2445 "unsupported numeric literal in predicate".into(),
2446 ))
2447 }
2448 }
2449 _ => Err(AppError::InvalidValue(
2450 "unsupported literal type in predicate".into(),
2451 )),
2452 }
2453}
2454
2455fn json_index_key(val: &JsonValue) -> Option<String> {
2460 match val {
2461 JsonValue::String(s) => Some(s.clone()),
2462 JsonValue::Number(n) => Some(n.to_string()),
2463 JsonValue::Bool(b) => Some(b.to_string()),
2464 _ => None,
2465 }
2466}
2467
2468fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2469 let mut out = Vec::new();
2470 let (mut i, mut j) = (0, 0);
2471 while i < a.len() && j < b.len() {
2472 match a[i].cmp(&b[j]) {
2473 Ordering::Equal => {
2474 out.push(a[i]);
2475 i += 1;
2476 j += 1;
2477 }
2478 Ordering::Less => i += 1,
2479 Ordering::Greater => j += 1,
2480 }
2481 }
2482 out
2483}
2484
2485fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2486 let mut out = Vec::with_capacity(a.len() + b.len());
2487 let (mut i, mut j) = (0, 0);
2488 while i < a.len() && j < b.len() {
2489 match a[i].cmp(&b[j]) {
2490 Ordering::Less => {
2491 out.push(a[i]);
2492 i += 1;
2493 }
2494 Ordering::Greater => {
2495 out.push(b[j]);
2496 j += 1;
2497 }
2498 Ordering::Equal => {
2499 out.push(a[i]);
2500 i += 1;
2501 j += 1;
2502 }
2503 }
2504 }
2505 out.extend_from_slice(&a[i..]);
2506 out.extend_from_slice(&b[j..]);
2507 out
2508}
2509
2510fn try_index<'a>(index: &'a EqIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2511 if predicates.is_empty() || index.is_empty() {
2512 return None;
2513 }
2514
2515 if let [pred] = predicates
2518 && pred.op.as_str() == "eq"
2519 {
2520 let col_lower = pred.col.to_lowercase();
2521 let col_map = index.get(&col_lower)?;
2522 let key = json_index_key(pred.val.as_ref()?)?;
2523 return Some(match col_map.get(&key) {
2524 Some(rows) => Cow::Borrowed(rows.as_slice()),
2525 None => Cow::Owned(Vec::new()),
2526 });
2527 }
2528
2529 let mut result: Option<Vec<u32>> = None;
2530 for pred in predicates {
2531 let col_lower = pred.col.to_lowercase();
2532 let col_map = index.get(&col_lower)?;
2533
2534 let rows: Vec<u32> = match pred.op.as_str() {
2535 "eq" => {
2536 let key = json_index_key(pred.val.as_ref()?)?;
2537 col_map.get(&key).cloned().unwrap_or_default()
2538 }
2539 "in" => {
2540 let items = pred.val.as_ref()?.as_array()?;
2541 let mut merged: Vec<u32> = Vec::new();
2542 for item in items {
2543 if let Some(r) = col_map.get(&json_index_key(item)?) {
2544 merged = union_sorted(&merged, r);
2545 }
2546 }
2547 merged
2548 }
2549 _ => return None,
2550 };
2551
2552 result = Some(match result {
2553 None => rows,
2554 Some(r) => intersect_sorted(&r, &rows),
2555 });
2556 }
2557 result.map(Cow::Owned)
2558}
2559
2560#[doc(hidden)]
2564pub mod bench {
2565 use super::{EqIndex, FastMap, json_index_key, try_index};
2566 use datapress_core::models::Predicate;
2567 use serde_json::Value as JsonValue;
2568 use std::borrow::Cow;
2569
2570 pub struct BenchIndex(EqIndex);
2572
2573 pub fn single_bucket_index(col: &str, val: &JsonValue, rows: Vec<u32>) -> BenchIndex {
2577 let key = json_index_key(val).expect("benchable index key");
2578 let mut col_map: FastMap<String, Vec<u32>> = FastMap::default();
2579 col_map.insert(key, rows);
2580 let mut index: EqIndex = EqIndex::default();
2581 index.insert(col.to_string(), col_map);
2582 BenchIndex(index)
2583 }
2584
2585 pub fn lookup<'a>(idx: &'a BenchIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2587 try_index(&idx.0, predicates)
2588 }
2589
2590 pub fn lookup_cloning(idx: &BenchIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
2595 let [pred] = predicates else { return None };
2596 if pred.op.as_str() != "eq" {
2597 return None;
2598 }
2599 let col_lower = pred.col.to_lowercase();
2600 let col_map = idx.0.get(&col_lower)?;
2601 let key = json_index_key(pred.val.as_ref()?)?;
2602 Some(col_map.get(&key).cloned().unwrap_or_default())
2603 }
2604}
2605
2606fn slice_global(
2609 chunks: &[RecordBatch],
2610 schema: &Arc<Schema>,
2611 offset: usize,
2612 limit: usize,
2613) -> Result<RecordBatch, AppError> {
2614 if limit == 0 || chunks.is_empty() {
2615 return Ok(RecordBatch::new_empty(schema.clone()));
2616 }
2617 let mut out = Vec::new();
2618 let mut to_skip = offset;
2619 let mut remaining = limit;
2620 for b in chunks {
2621 if remaining == 0 {
2622 break;
2623 }
2624 let n = b.num_rows();
2625 if to_skip >= n {
2626 to_skip -= n;
2627 continue;
2628 }
2629 let take = remaining.min(n - to_skip);
2630 out.push(b.slice(to_skip, take));
2631 to_skip = 0;
2632 remaining -= take;
2633 }
2634 if out.is_empty() {
2635 return Ok(RecordBatch::new_empty(schema.clone()));
2636 }
2637 compute::concat_batches(schema, out.iter()).map_err(AppError::from)
2638}
2639
2640fn take_page(
2645 chunks: &[RecordBatch],
2646 schema: &Arc<Schema>,
2647 rows: &[u32],
2648 offset: usize,
2649 limit: usize,
2650) -> Result<RecordBatch, AppError> {
2651 let start = offset.min(rows.len());
2652 let len = limit.min(rows.len() - start);
2653 if len == 0 || chunks.is_empty() {
2654 return Ok(RecordBatch::new_empty(schema.clone()));
2655 }
2656
2657 let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
2660 let mut acc: u32 = 0;
2661 offsets.push(0);
2662 for b in chunks {
2663 acc = acc
2664 .checked_add(b.num_rows() as u32)
2665 .expect("row count exceeds u32::MAX");
2666 offsets.push(acc);
2667 }
2668
2669 let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
2672 for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
2673 let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
2674 let local = gid - offsets[bi];
2675 buckets[bi].push((out_pos as u32, local));
2676 }
2677
2678 let mut takens: Vec<RecordBatch> = Vec::new();
2680 let mut dest: Vec<u32> = Vec::with_capacity(len);
2681 for (bi, bucket) in buckets.iter().enumerate() {
2682 if bucket.is_empty() {
2683 continue;
2684 }
2685 let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
2686 let cols: Vec<ArrayRef> = chunks[bi]
2687 .columns()
2688 .iter()
2689 .map(|c| {
2690 arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
2691 .map_err(AppError::from)
2692 })
2693 .collect::<Result<_, _>>()?;
2694 takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
2695 dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
2696 }
2697
2698 let stitched = compute::concat_batches(schema, takens.iter())?;
2700 let mut inv = vec![0u32; len];
2701 for (i, &d) in dest.iter().enumerate() {
2702 inv[d as usize] = i as u32;
2703 }
2704 let perm = UInt32Array::from(inv);
2705 let cols: Vec<ArrayRef> = stitched
2706 .columns()
2707 .iter()
2708 .map(|c| {
2709 arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
2710 .map_err(AppError::from)
2711 })
2712 .collect::<Result<_, _>>()?;
2713 RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
2714}
2715
2716fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
2720 use rayon::prelude::*;
2721
2722 if cfg.mode == IndexMode::None || chunks.is_empty() {
2723 return EqIndex::default();
2724 }
2725
2726 let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
2727 Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
2728 } else {
2729 None
2730 };
2731
2732 let max_card = if cfg.mode == IndexMode::Auto {
2733 Some(cfg.max_cardinality)
2734 } else {
2735 None
2736 };
2737
2738 let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
2740 let mut acc: u32 = 0;
2741 for b in chunks {
2742 batch_offsets.push(acc);
2743 acc = acc
2744 .checked_add(b.num_rows() as u32)
2745 .expect("row count exceeds u32::MAX");
2746 }
2747
2748 let schema = chunks[0].schema();
2749
2750 schema
2751 .fields()
2752 .par_iter()
2753 .enumerate()
2754 .filter_map(|(ci, field)| {
2755 let col_lower = field.name().to_lowercase();
2756 if let Some(a) = &allow
2757 && !a.contains_key(&col_lower)
2758 {
2759 return None;
2760 }
2761
2762 let dtype = field.data_type();
2765 let dict_utf8 = matches!(dtype,
2766 DataType::Dictionary(k, v)
2767 if matches!(k.as_ref(), DataType::Int32)
2768 && matches!(v.as_ref(), DataType::Utf8));
2769 match dtype {
2770 DataType::Utf8
2771 | DataType::Utf8View
2772 | DataType::Boolean
2773 | DataType::Int8
2774 | DataType::Int16
2775 | DataType::Int32
2776 | DataType::Int64 => {}
2777 _ if dict_utf8 => {}
2778 _ => return None,
2779 }
2780
2781 let mut map: FastMap<String, Vec<u32>> = FastMap::default();
2782
2783 for (bi, batch) in chunks.iter().enumerate() {
2784 let base = batch_offsets[bi];
2785 let col = batch.column(ci);
2786
2787 macro_rules! index_col {
2788 ($arr_ty:ty) => {{
2789 let arr = col.as_any().downcast_ref::<$arr_ty>()?;
2790 for row in 0..arr.len() {
2791 if arr.is_null(row) {
2792 continue;
2793 }
2794 let key = arr.value(row).to_string();
2795 let gid = base + row as u32;
2796 if let Some(v) = map.get_mut(&key) {
2797 v.push(gid);
2798 } else {
2799 if let Some(mc) = max_card {
2800 if map.len() >= mc {
2801 return None;
2802 }
2803 }
2804 map.insert(key, vec![gid]);
2805 }
2806 }
2807 }};
2808 }
2809
2810 if dict_utf8 {
2811 let arr = col
2818 .as_any()
2819 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
2820 )?;
2821 let keys = arr.keys();
2822 let values = arr.values().as_any().downcast_ref::<StringArray>()?;
2823 for row in 0..arr.len() {
2824 if arr.is_null(row) {
2825 continue;
2826 }
2827 let k = keys.value(row) as usize;
2828 let s = values.value(k);
2829 let gid = base + row as u32;
2830 if let Some(v) = map.get_mut(s) {
2831 v.push(gid);
2832 } else {
2833 if let Some(mc) = max_card
2834 && map.len() >= mc
2835 {
2836 return None;
2837 }
2838 map.insert(s.to_string(), vec![gid]);
2839 }
2840 }
2841 } else {
2842 match dtype {
2843 DataType::Utf8 => index_col!(StringArray),
2844 DataType::Utf8View => index_col!(StringViewArray),
2845 DataType::Boolean => index_col!(BooleanArray),
2846 DataType::Int8 => index_col!(Int8Array),
2847 DataType::Int16 => index_col!(Int16Array),
2848 DataType::Int32 => index_col!(Int32Array),
2849 DataType::Int64 => index_col!(Int64Array),
2850 _ => unreachable!(),
2851 }
2852 }
2853 }
2854
2855 Some((col_lower, map))
2856 })
2857 .collect()
2858}
2859
2860fn writable_inline(dt: &DataType) -> bool {
2873 match dt {
2874 DataType::Utf8
2875 | DataType::LargeUtf8
2876 | DataType::Utf8View
2877 | DataType::Boolean
2878 | DataType::Int8
2879 | DataType::Int16
2880 | DataType::Int32
2881 | DataType::Int64
2882 | DataType::UInt8
2883 | DataType::UInt16
2884 | DataType::UInt32
2885 | DataType::UInt64
2886 | DataType::Float32
2887 | DataType::Float64
2888 | DataType::Decimal128(_, _)
2889 | DataType::Decimal256(_, _) => true,
2890 DataType::Dictionary(k, v)
2891 if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
2892 {
2893 true
2894 }
2895 _ => false,
2896 }
2897}
2898
2899fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
2905 let schema = batch.schema();
2906 let to_cast: Vec<usize> = schema
2907 .fields()
2908 .iter()
2909 .enumerate()
2910 .filter_map(|(i, f)| {
2911 if writable_inline(f.data_type()) {
2912 None
2913 } else {
2914 Some(i)
2915 }
2916 })
2917 .collect();
2918 if to_cast.is_empty() {
2919 return Ok(batch.clone());
2920 }
2921 let new_fields: Vec<Field> = schema
2922 .fields()
2923 .iter()
2924 .enumerate()
2925 .map(|(i, f)| {
2926 if to_cast.contains(&i) {
2927 Field::new(f.name(), DataType::Utf8, f.is_nullable())
2928 } else {
2929 f.as_ref().clone()
2930 }
2931 })
2932 .collect();
2933 let new_schema = Arc::new(Schema::new(new_fields));
2934 let cols: Vec<ArrayRef> = batch
2935 .columns()
2936 .iter()
2937 .enumerate()
2938 .map(|(i, c)| {
2939 if to_cast.contains(&i) {
2940 compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2941 } else {
2942 Ok(c.clone())
2943 }
2944 })
2945 .collect::<Result<_, _>>()?;
2946 RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2947}
2948
2949#[allow(dead_code)]
2955#[derive(Clone, Copy)]
2956enum CmpOp {
2957 Eq,
2958 Neq,
2959 Gt,
2960 Gte,
2961 Lt,
2962 Lte,
2963 Like,
2964 ILike,
2965}
2966
2967#[allow(dead_code)]
2968fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2969 let arr = col
2970 .as_any()
2971 .downcast_ref::<StringArray>()
2972 .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2973 let s = Scalar::new(StringArray::from(vec![val]));
2974 Ok(eq(arr, &s)?)
2975}
2976
2977#[allow(dead_code)]
2978fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2979 macro_rules! num_cmp {
2980 ($arr_type:ty, $cast:ty) => {{
2981 let n = val
2982 .as_f64()
2983 .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2984 as $cast;
2985 let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2986 let s = Scalar::new(<$arr_type>::from(vec![n]));
2987 Ok(match op {
2988 CmpOp::Eq => eq(arr, &s)?,
2989 CmpOp::Neq => neq(arr, &s)?,
2990 CmpOp::Gt => gt(arr, &s)?,
2991 CmpOp::Gte => gt_eq(arr, &s)?,
2992 CmpOp::Lt => lt(arr, &s)?,
2993 CmpOp::Lte => lt_eq(arr, &s)?,
2994 CmpOp::Like | CmpOp::ILike => {
2995 return Err(AppError::InvalidValue(
2996 "LIKE requires a string column".into(),
2997 ));
2998 }
2999 })
3000 }};
3001 }
3002 match col.data_type() {
3003 DataType::Utf8 => {
3004 let s = val
3005 .as_str()
3006 .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
3007 let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
3008 let sc = Scalar::new(StringArray::from(vec![s]));
3009 Ok(match op {
3010 CmpOp::Eq => eq(arr, &sc)?,
3011 CmpOp::Neq => neq(arr, &sc)?,
3012 CmpOp::Gt => gt(arr, &sc)?,
3013 CmpOp::Gte => gt_eq(arr, &sc)?,
3014 CmpOp::Lt => lt(arr, &sc)?,
3015 CmpOp::Lte => lt_eq(arr, &sc)?,
3016 CmpOp::Like => compute::like(arr, &sc)?,
3017 CmpOp::ILike => compute::ilike(arr, &sc)?,
3018 })
3019 }
3020 DataType::Int8 => num_cmp!(Int8Array, i8),
3021 DataType::Int16 => num_cmp!(Int16Array, i16),
3022 DataType::Int32 => num_cmp!(Int32Array, i32),
3023 DataType::Int64 => num_cmp!(Int64Array, i64),
3024 DataType::Float32 => num_cmp!(Float32Array, f32),
3025 DataType::Float64 => num_cmp!(Float64Array, f64),
3026 dt => Err(AppError::InvalidValue(format!(
3027 "unsupported type for comparison: {dt:?}"
3028 ))),
3029 }
3030}
3031
3032pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
3037 let batch = cast_for_serialize(batch)?;
3042 let schema = batch.schema();
3043 let n_rows = batch.num_rows();
3044
3045 let keys: Vec<Vec<u8>> = schema
3046 .fields()
3047 .iter()
3048 .map(|f| {
3049 let mut k = Vec::with_capacity(f.name().len() + 3);
3050 k.push(b'"');
3051 k.extend_from_slice(f.name().as_bytes());
3052 k.extend_from_slice(b"\":");
3053 k
3054 })
3055 .collect();
3056
3057 let encoders: Vec<ColEnc> = batch
3062 .columns()
3063 .iter()
3064 .map(|c| ColEnc::new(c.as_ref()))
3065 .collect();
3066
3067 let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
3068 let mut itoa_buf = itoa::Buffer::new();
3069 let mut ryu_buf = ryu::Buffer::new();
3070 buf.push(b'[');
3071
3072 for row in 0..n_rows {
3073 if row > 0 {
3074 buf.push(b',');
3075 }
3076 buf.push(b'{');
3077 for (i, (key, enc)) in keys.iter().zip(encoders.iter()).enumerate() {
3078 if i > 0 {
3079 buf.push(b',');
3080 }
3081 buf.extend_from_slice(key);
3082 enc.write(&mut buf, row, &mut itoa_buf, &mut ryu_buf);
3083 }
3084 buf.push(b'}');
3085 }
3086
3087 buf.push(b']');
3088 Ok(unsafe { String::from_utf8_unchecked(buf) })
3089}
3090
3091enum ColEnc<'a> {
3096 Utf8(&'a StringArray),
3097 LargeUtf8(&'a LargeStringArray),
3098 Utf8View(&'a StringViewArray),
3099 DictI32Utf8(
3102 &'a arrow::array::DictionaryArray<arrow::datatypes::Int32Type>,
3103 &'a StringArray,
3104 ),
3105 Bool(&'a BooleanArray),
3106 I8(&'a Int8Array),
3107 I16(&'a Int16Array),
3108 I32(&'a Int32Array),
3109 I64(&'a Int64Array),
3110 U8(&'a UInt8Array),
3111 U16(&'a UInt16Array),
3112 U32(&'a UInt32Array),
3113 U64(&'a UInt64Array),
3114 Dec128(&'a Decimal128Array),
3115 Dec256(&'a Decimal256Array),
3116 F32(&'a Float32Array),
3117 F64(&'a Float64Array),
3118 Other(&'a dyn Array),
3120}
3121
3122impl<'a> ColEnc<'a> {
3123 fn new(col: &'a dyn Array) -> ColEnc<'a> {
3124 macro_rules! dc {
3125 ($t:ty) => {
3126 col.as_any().downcast_ref::<$t>().unwrap()
3127 };
3128 }
3129 match col.data_type() {
3130 DataType::Utf8 => ColEnc::Utf8(dc!(StringArray)),
3131 DataType::LargeUtf8 => ColEnc::LargeUtf8(dc!(LargeStringArray)),
3132 DataType::Utf8View => ColEnc::Utf8View(dc!(StringViewArray)),
3133 DataType::Dictionary(key, value)
3134 if matches!(key.as_ref(), DataType::Int32)
3135 && matches!(value.as_ref(), DataType::Utf8) =>
3136 {
3137 let dict = dc!(arrow::array::DictionaryArray<arrow::datatypes::Int32Type>);
3138 let values = dict
3139 .values()
3140 .as_any()
3141 .downcast_ref::<StringArray>()
3142 .unwrap();
3143 ColEnc::DictI32Utf8(dict, values)
3144 }
3145 DataType::Boolean => ColEnc::Bool(dc!(BooleanArray)),
3146 DataType::Int8 => ColEnc::I8(dc!(Int8Array)),
3147 DataType::Int16 => ColEnc::I16(dc!(Int16Array)),
3148 DataType::Int32 => ColEnc::I32(dc!(Int32Array)),
3149 DataType::Int64 => ColEnc::I64(dc!(Int64Array)),
3150 DataType::UInt8 => ColEnc::U8(dc!(UInt8Array)),
3151 DataType::UInt16 => ColEnc::U16(dc!(UInt16Array)),
3152 DataType::UInt32 => ColEnc::U32(dc!(UInt32Array)),
3153 DataType::UInt64 => ColEnc::U64(dc!(UInt64Array)),
3154 DataType::Decimal128(_, _) => ColEnc::Dec128(dc!(Decimal128Array)),
3155 DataType::Decimal256(_, _) => ColEnc::Dec256(dc!(Decimal256Array)),
3156 DataType::Float32 => ColEnc::F32(dc!(Float32Array)),
3157 DataType::Float64 => ColEnc::F64(dc!(Float64Array)),
3158 _ => ColEnc::Other(col),
3159 }
3160 }
3161
3162 #[inline]
3163 fn write(
3164 &self,
3165 buf: &mut Vec<u8>,
3166 row: usize,
3167 itoa_buf: &mut itoa::Buffer,
3168 ryu_buf: &mut ryu::Buffer,
3169 ) {
3170 macro_rules! int {
3171 ($arr:expr) => {{
3172 if $arr.is_null(row) {
3173 buf.extend_from_slice(b"null");
3174 } else {
3175 buf.extend_from_slice(itoa_buf.format($arr.value(row)).as_bytes());
3176 }
3177 }};
3178 }
3179 match self {
3180 ColEnc::Utf8(a) => {
3181 if a.is_null(row) {
3182 buf.extend_from_slice(b"null");
3183 } else {
3184 write_str(buf, a.value(row));
3185 }
3186 }
3187 ColEnc::LargeUtf8(a) => {
3188 if a.is_null(row) {
3189 buf.extend_from_slice(b"null");
3190 } else {
3191 write_str(buf, a.value(row));
3192 }
3193 }
3194 ColEnc::Utf8View(a) => {
3195 if a.is_null(row) {
3196 buf.extend_from_slice(b"null");
3197 } else {
3198 write_str(buf, a.value(row));
3199 }
3200 }
3201 ColEnc::DictI32Utf8(keys, values) => {
3202 if keys.is_null(row) {
3203 buf.extend_from_slice(b"null");
3204 } else {
3205 let k = keys.keys().value(row) as usize;
3206 write_str(buf, values.value(k));
3207 }
3208 }
3209 ColEnc::Bool(a) => {
3210 if a.is_null(row) {
3211 buf.extend_from_slice(b"null");
3212 } else {
3213 buf.extend_from_slice(if a.value(row) { b"true" } else { b"false" });
3214 }
3215 }
3216 ColEnc::I8(a) => int!(a),
3217 ColEnc::I16(a) => int!(a),
3218 ColEnc::I32(a) => int!(a),
3219 ColEnc::I64(a) => int!(a),
3220 ColEnc::U8(a) => int!(a),
3221 ColEnc::U16(a) => int!(a),
3222 ColEnc::U32(a) => int!(a),
3223 ColEnc::U64(a) => int!(a),
3224 ColEnc::Dec128(a) => {
3225 if a.is_null(row) {
3226 buf.extend_from_slice(b"null");
3227 } else {
3228 write_str(buf, &a.value_as_string(row));
3229 }
3230 }
3231 ColEnc::Dec256(a) => {
3232 if a.is_null(row) {
3233 buf.extend_from_slice(b"null");
3234 } else {
3235 write_str(buf, &a.value_as_string(row));
3236 }
3237 }
3238 ColEnc::F32(a) => {
3239 if a.is_null(row) {
3240 buf.extend_from_slice(b"null");
3241 } else {
3242 let v = a.value(row);
3243 if v.is_finite() {
3244 buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
3245 } else {
3246 buf.extend_from_slice(b"null");
3247 }
3248 }
3249 }
3250 ColEnc::F64(a) => {
3251 if a.is_null(row) {
3252 buf.extend_from_slice(b"null");
3253 } else {
3254 let v = a.value(row);
3255 if v.is_finite() {
3256 buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
3257 } else {
3258 buf.extend_from_slice(b"null");
3259 }
3260 }
3261 }
3262 ColEnc::Other(col) => {
3263 if col.is_null(row) {
3264 buf.extend_from_slice(b"null");
3265 } else {
3266 write_value(buf, *col, row);
3267 }
3268 }
3269 }
3270 }
3271}
3272
3273#[inline]
3274fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
3275 match col.data_type() {
3276 DataType::Utf8 => write_str(
3277 buf,
3278 col.as_any()
3279 .downcast_ref::<StringArray>()
3280 .unwrap()
3281 .value(row),
3282 ),
3283 DataType::LargeUtf8 => write_str(
3284 buf,
3285 col.as_any()
3286 .downcast_ref::<LargeStringArray>()
3287 .unwrap()
3288 .value(row),
3289 ),
3290 DataType::Utf8View => write_str(
3291 buf,
3292 col.as_any()
3293 .downcast_ref::<StringViewArray>()
3294 .unwrap()
3295 .value(row),
3296 ),
3297 DataType::Dictionary(key, value)
3298 if matches!(key.as_ref(), DataType::Int32)
3299 && matches!(value.as_ref(), DataType::Utf8) =>
3300 {
3301 let dict = col
3302 .as_any()
3303 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
3304 .unwrap();
3305 let keys = dict.keys();
3306 let values = dict
3307 .values()
3308 .as_any()
3309 .downcast_ref::<StringArray>()
3310 .unwrap();
3311 let k = keys.value(row) as usize;
3312 write_str(buf, values.value(k));
3313 }
3314 DataType::Boolean => {
3315 let v = col
3316 .as_any()
3317 .downcast_ref::<BooleanArray>()
3318 .unwrap()
3319 .value(row);
3320 buf.extend_from_slice(if v { b"true" } else { b"false" });
3321 }
3322 DataType::Int8 => {
3323 let mut b = itoa::Buffer::new();
3324 buf.extend_from_slice(
3325 b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
3326 .as_bytes(),
3327 );
3328 }
3329 DataType::Int16 => {
3330 let mut b = itoa::Buffer::new();
3331 buf.extend_from_slice(
3332 b.format(
3333 col.as_any()
3334 .downcast_ref::<Int16Array>()
3335 .unwrap()
3336 .value(row),
3337 )
3338 .as_bytes(),
3339 );
3340 }
3341 DataType::Int32 => {
3342 let mut b = itoa::Buffer::new();
3343 buf.extend_from_slice(
3344 b.format(
3345 col.as_any()
3346 .downcast_ref::<Int32Array>()
3347 .unwrap()
3348 .value(row),
3349 )
3350 .as_bytes(),
3351 );
3352 }
3353 DataType::Int64 => {
3354 let mut b = itoa::Buffer::new();
3355 buf.extend_from_slice(
3356 b.format(
3357 col.as_any()
3358 .downcast_ref::<Int64Array>()
3359 .unwrap()
3360 .value(row),
3361 )
3362 .as_bytes(),
3363 );
3364 }
3365 DataType::UInt8 => {
3366 let mut b = itoa::Buffer::new();
3367 buf.extend_from_slice(
3368 b.format(
3369 col.as_any()
3370 .downcast_ref::<UInt8Array>()
3371 .unwrap()
3372 .value(row),
3373 )
3374 .as_bytes(),
3375 );
3376 }
3377 DataType::UInt16 => {
3378 let mut b = itoa::Buffer::new();
3379 buf.extend_from_slice(
3380 b.format(
3381 col.as_any()
3382 .downcast_ref::<UInt16Array>()
3383 .unwrap()
3384 .value(row),
3385 )
3386 .as_bytes(),
3387 );
3388 }
3389 DataType::UInt32 => {
3390 let mut b = itoa::Buffer::new();
3391 buf.extend_from_slice(
3392 b.format(
3393 col.as_any()
3394 .downcast_ref::<UInt32Array>()
3395 .unwrap()
3396 .value(row),
3397 )
3398 .as_bytes(),
3399 );
3400 }
3401 DataType::UInt64 => {
3402 let mut b = itoa::Buffer::new();
3403 buf.extend_from_slice(
3404 b.format(
3405 col.as_any()
3406 .downcast_ref::<UInt64Array>()
3407 .unwrap()
3408 .value(row),
3409 )
3410 .as_bytes(),
3411 );
3412 }
3413 DataType::Decimal128(_, _) => {
3414 let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
3415 write_str(buf, &arr.value_as_string(row));
3416 }
3417 DataType::Decimal256(_, _) => {
3418 let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
3419 write_str(buf, &arr.value_as_string(row));
3420 }
3421 DataType::Float32 => {
3422 let v = col
3423 .as_any()
3424 .downcast_ref::<Float32Array>()
3425 .unwrap()
3426 .value(row);
3427 if v.is_finite() {
3428 let mut b = ryu::Buffer::new();
3429 buf.extend_from_slice(b.format_finite(v).as_bytes());
3430 } else {
3431 buf.extend_from_slice(b"null");
3432 }
3433 }
3434 DataType::Float64 => {
3435 let v = col
3436 .as_any()
3437 .downcast_ref::<Float64Array>()
3438 .unwrap()
3439 .value(row);
3440 if v.is_finite() {
3441 let mut b = ryu::Buffer::new();
3442 buf.extend_from_slice(b.format_finite(v).as_bytes());
3443 } else {
3444 buf.extend_from_slice(b"null");
3445 }
3446 }
3447 other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
3452 }
3453}
3454
3455#[inline]
3456fn write_str(buf: &mut Vec<u8>, s: &str) {
3457 buf.push(b'"');
3458 for &byte in s.as_bytes() {
3459 match byte {
3460 b'"' => buf.extend_from_slice(b"\\\""),
3461 b'\\' => buf.extend_from_slice(b"\\\\"),
3462 b'\n' => buf.extend_from_slice(b"\\n"),
3463 b'\r' => buf.extend_from_slice(b"\\r"),
3464 b'\t' => buf.extend_from_slice(b"\\t"),
3465 0x00..=0x1f => {
3466 buf.extend_from_slice(b"\\u00");
3467 const HEX: &[u8] = b"0123456789abcdef";
3468 buf.push(HEX[(byte >> 4) as usize]);
3469 buf.push(HEX[(byte & 0xf) as usize]);
3470 }
3471 b => buf.push(b),
3472 }
3473 }
3474 buf.push(b'"');
3475}
3476
3477#[async_trait]
3482impl Backend for Store {
3483 fn names(&self) -> Vec<String> {
3484 Store::names(self)
3485 }
3486
3487 fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
3488 let st = self.dataset(name)?;
3489 Ok(DatasetSummary {
3490 name: st.schema.name.clone(),
3491 columns: st.schema.columns.len(),
3492 rows: st.num_rows(),
3493 })
3494 }
3495
3496 fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
3497 let st = self.dataset(name)?;
3498 Ok(Arc::new(st.schema.clone()))
3499 }
3500
3501 fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
3502 let st = self.dataset(name)?;
3503 let mut cols: Vec<String> = st
3506 .schema
3507 .columns
3508 .iter()
3509 .map(|c| c.name.clone())
3510 .filter(|n| st.index.contains_key(n))
3511 .collect();
3512 let mut extras: Vec<String> = st
3515 .index
3516 .keys()
3517 .filter(|n| !cols.iter().any(|c| c == *n))
3518 .cloned()
3519 .collect();
3520 extras.sort();
3521 cols.extend(extras);
3522 Ok(cols)
3523 }
3524
3525 async fn sample(&self, name: &str) -> Result<String, AppError> {
3526 Store::sample(self, name).await
3527 }
3528
3529 async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
3530 Store::query(self, name, req).await
3531 }
3532
3533 async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
3534 Store::query_arrow(self, name, req).await
3535 }
3536
3537 async fn query_arrow_stream(
3538 &self,
3539 name: &str,
3540 req: &QueryRequest,
3541 ) -> Result<ArrowIpcStream, AppError> {
3542 Store::query_arrow_stream(self, name, req).await
3543 }
3544
3545 async fn query_arrow_stream_all(
3546 &self,
3547 name: &str,
3548 req: &QueryRequest,
3549 ) -> Result<ArrowIpcStream, AppError> {
3550 Store::query_arrow_stream_all(self, name, req).await
3551 }
3552
3553 async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
3554 Store::count(self, name, req).await
3555 }
3556
3557 async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
3558 Store::query_sql(self, sql, max_rows).await
3559 }
3560
3561 async fn query_sql_arrow_stream(
3562 &self,
3563 sql: &str,
3564 max_rows: u64,
3565 ) -> Result<ArrowIpcStream, AppError> {
3566 Store::query_sql_arrow_stream(self, sql, max_rows).await
3567 }
3568
3569 async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
3570 Store::parquet(self, name).await
3571 }
3572
3573 async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
3574 Store::reload(self, name).await
3575 }
3576
3577 async fn register(&self, cfg: DatasetConfig) -> Result<DatasetSummary, AppError> {
3578 Store::register(self, cfg).await
3579 }
3580}
3581
3582#[cfg(test)]
3583mod tests {
3584 use super::is_s3_access_denied;
3585
3586 #[test]
3587 fn detects_s3_access_denied_variants() {
3588 for msg in [
3590 "Generic S3 error: Error performing get request: response error \"<Error><Code>AccessDenied</Code></Error>\", status: 403",
3591 "Client error with status 403 Forbidden",
3592 "S3 error: Access Denied",
3593 "request failed: 403 Forbidden",
3594 ] {
3595 assert!(is_s3_access_denied(msg), "should flag: {msg}");
3596 }
3597 }
3598
3599 #[test]
3600 fn ignores_unrelated_errors() {
3601 for msg in [
3602 "Not a Delta table: Generic delta kernel error: No files in log segment",
3603 "object at location data/part.parquet not found",
3604 "failed to infer parquet schema: invalid magic bytes",
3605 ] {
3606 assert!(!is_s3_access_denied(msg), "should not flag: {msg}");
3607 }
3608 }
3609}
3610