1use std::any::Any;
2use std::borrow::Cow;
3use std::cmp::Ordering;
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex, RwLock};
6use std::time::Duration;
7
8use arc_swap::ArcSwap;
9use arrow::array::{
10 Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
11 Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
12 StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
13};
14use arrow::compute;
15use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
16use arrow::datatypes::{DataType, Field, Schema};
17use async_trait::async_trait;
18use parquet::arrow::ProjectionMask;
19use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
20use serde_json::Value as JsonValue;
21
22use datafusion::datasource::file_format::parquet::ParquetFormat;
23use datafusion::datasource::listing::{
24 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
25};
26use datafusion::datasource::{MemTable, TableProvider};
27use datafusion::error::Result as DfResult;
28use datafusion::execution::cache::DefaultListFilesCache;
29use datafusion::execution::cache::cache_manager::CacheManagerConfig;
30use datafusion::execution::runtime_env::RuntimeEnvBuilder;
31use datafusion::logical_expr::{
32 ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
33};
34use datafusion::prelude::{SessionConfig, SessionContext};
35use datafusion::scalar::ScalarValue;
36
37use object_store::aws::AmazonS3Builder;
38use url::Url;
39
40use datapress_core::backend::{
41 ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
42};
43use datapress_core::config::{
44 AddressingStyle, AppConfig, DataFusionConfig, DatasetConfig, IndexConfig, IndexMode,
45 Partitioning, ResolvedCreds, S3Config, ServerConfig, SourceKind,
46};
47use datapress_core::errors::AppError;
48use datapress_core::models::{CountRequest, Predicate, QueryRequest};
49use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
50
51type FastMap<K, V> = HashMap<K, V, ahash::RandomState>;
60
61type EqIndex = FastMap<String, FastMap<String, Vec<u32>>>;
63
64pub struct DatasetState {
79 pub schema: DatasetSchema,
80 pub data: Vec<RecordBatch>,
81 pub arrow_schema: Arc<Schema>,
82 pub index: EqIndex,
83 pub lazy: bool,
84}
85
86impl DatasetState {
87 pub fn num_rows(&self) -> usize {
89 self.data.iter().map(|b| b.num_rows()).sum()
90 }
91}
92
93pub struct Store {
98 ctx: SessionContext,
99 max_page_size: u64,
100 configs: RwLock<HashMap<String, DatasetConfig>>,
105 datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
107 reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
110}
111
112impl Store {
113 pub fn session_context(&self) -> &SessionContext {
117 &self.ctx
118 }
119
120 pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
122 if cfg
125 .datasets
126 .iter()
127 .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
128 {
129 deltalake::aws::register_handlers(None);
130 }
131
132 let ctx = build_tuned_context(&cfg.datafusion);
138 let mut datasets = HashMap::with_capacity(cfg.datasets.len());
139 let mut configs = HashMap::with_capacity(cfg.datasets.len());
140
141 for d in &cfg.datasets {
142 log::info!(
143 "Loading dataset '{}' ({} @ {})",
144 d.name,
145 d.source.kind.as_str(),
146 d.source.location
147 );
148 let d: std::borrow::Cow<'_, DatasetConfig> = match should_force_lazy(d, &cfg.server)
152 .await
153 {
154 Some(bytes) => {
155 log::info!(
156 "dataset '{}': {:.1} MiB exceeds force_lazy_above_mb = {} → forcing lazy",
157 d.name,
158 bytes as f64 / (1024.0 * 1024.0),
159 cfg.server.force_lazy_above_mb
160 );
161 let mut forced = d.clone();
162 forced.lazy = true;
163 std::borrow::Cow::Owned(forced)
164 }
165 None => std::borrow::Cow::Borrowed(d),
166 };
167 let d = d.as_ref();
168 let (state, provider) = match build_dataset(d, &ctx).await {
169 Ok(built) => built,
170 Err(AppError::EmptyDataset(msg)) => {
171 log::warn!("skipping empty dataset '{}': {msg}", d.name);
172 continue;
173 }
174 Err(e) if d.source.is_s3() && is_s3_access_denied(&e.to_string()) => {
180 log::warn!(
181 "skipping dataset '{}': S3 access denied — check credentials \
182 and bucket policy ({e})",
183 d.name
184 );
185 continue;
186 }
187 Err(e) => return Err(e),
188 };
189 ctx.register_table(d.name.as_str(), provider)?;
190 datasets.insert(d.name.clone(), Arc::new(state));
191 configs.insert(d.name.clone(), d.clone());
192 }
193 Ok(Self {
194 ctx,
195 max_page_size: cfg.server.max_page_size.max(1),
196 configs: RwLock::new(configs),
197 datasets: ArcSwap::from_pointee(datasets),
198 reload_locks: Mutex::new(HashMap::new()),
199 })
200 }
201
202 pub fn names(&self) -> Vec<String> {
204 let snap = self.datasets.load();
205 let mut v: Vec<String> = snap.keys().cloned().collect();
206 v.sort();
207 v
208 }
209
210 pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
211 self.datasets
212 .load()
213 .get(name)
214 .cloned()
215 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
216 }
217
218 pub async fn sample(&self, name: &str) -> Result<String, AppError> {
221 let st = self.dataset(name)?;
222
223 if st.lazy {
225 let table = DatasetSchema::quote_ident(&st.schema.name);
226 let sql = format!("SELECT * FROM {table} LIMIT 1");
227 let df = self.ctx.sql(&sql).await?;
228 let batches = df.collect().await?;
229 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
230 return Ok("null".into());
231 }
232 let arr = serialize(&batches[0].slice(0, 1))?;
233 let trimmed = arr.trim();
234 let inner = trimmed
235 .strip_prefix('[')
236 .and_then(|s| s.strip_suffix(']'))
237 .unwrap_or(trimmed);
238 return Ok(inner.to_string());
239 }
240
241 let first = match st.data.iter().find(|b| b.num_rows() > 0) {
242 Some(b) => b,
243 None => return Ok("null".into()),
244 };
245 let arr = serialize(&first.slice(0, 1))?;
246 let trimmed = arr.trim();
248 let inner = trimmed
249 .strip_prefix('[')
250 .and_then(|s| s.strip_suffix(']'))
251 .unwrap_or(trimmed);
252 Ok(inner.to_string())
253 }
254
255 pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
260 let cfg = self
262 .configs
263 .read()
264 .unwrap()
265 .get(name)
266 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
267 .clone();
268
269 let lock = {
271 let mut locks = self.reload_locks.lock().unwrap();
272 locks
273 .entry(name.to_string())
274 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
275 .clone()
276 };
277 let _guard = lock.lock().await;
278
279 let started = std::time::Instant::now();
280
281 if let Some(cache) = self.ctx.runtime_env().cache_manager.get_list_files_cache() {
297 cache.clear();
298 }
299
300 let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
303 let rows = state.num_rows();
304
305 let _ = self.ctx.deregister_table(name)?;
311 self.ctx.register_table(name, provider)?;
312
313 let mut new_map = (**self.datasets.load()).clone();
314 new_map.insert(name.to_string(), Arc::new(state));
315 self.datasets.store(Arc::new(new_map));
316
317 let elapsed_ms = started.elapsed().as_millis();
318 log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
319 Ok(ReloadStats { rows, elapsed_ms })
320 }
321
322 pub async fn register(&self, cfg: DatasetConfig) -> Result<DatasetSummary, AppError> {
326 cfg.validate_for_register()?;
327
328 if self.datasets.load().contains_key(&cfg.name) {
330 return Err(AppError::InvalidValue(format!(
331 "dataset '{}' already exists",
332 cfg.name
333 )));
334 }
335
336 let lock = {
338 let mut locks = self.reload_locks.lock().unwrap();
339 locks
340 .entry(cfg.name.clone())
341 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
342 .clone()
343 };
344 let _guard = lock.lock().await;
345
346 if self.datasets.load().contains_key(&cfg.name) {
348 return Err(AppError::InvalidValue(format!(
349 "dataset '{}' already exists",
350 cfg.name
351 )));
352 }
353
354 if cfg.source.kind == SourceKind::Delta && cfg.source.is_s3() {
357 deltalake::aws::register_handlers(None);
358 }
359
360 let started = std::time::Instant::now();
361 let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
362 let rows = state.num_rows();
363 let columns = state.schema.columns.len();
364
365 self.ctx.register_table(cfg.name.as_str(), provider)?;
366
367 let mut new_map = (**self.datasets.load()).clone();
368 new_map.insert(cfg.name.clone(), Arc::new(state));
369 self.datasets.store(Arc::new(new_map));
370 self.configs
371 .write()
372 .unwrap()
373 .insert(cfg.name.clone(), cfg.clone());
374
375 let elapsed_ms = started.elapsed().as_millis();
376 log::info!(
377 "registered dataset '{}' ({} @ {}): {rows} rows in {elapsed_ms} ms",
378 cfg.name,
379 cfg.source.kind.as_str(),
380 cfg.source.location
381 );
382 Ok(DatasetSummary {
383 name: cfg.name,
384 columns,
385 rows,
386 })
387 }
388
389 pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
393 let batch = self.query_batch(name, req).await?;
394 if batch.num_rows() == 0 {
395 return Ok("[]".to_string());
396 }
397 serialize(&batch)
398 }
399
400 fn canonicalize_sql(&self, sql: &str) -> String {
406 let snap = self.datasets.load();
407 let mut tables: HashMap<String, String> = HashMap::with_capacity(snap.len());
408 let mut columns: HashMap<String, String> = HashMap::new();
409 for (name, state) in snap.iter() {
410 tables.insert(name.to_lowercase(), name.clone());
411 for col in &state.schema.columns {
412 columns
413 .entry(col.name.to_lowercase())
414 .or_insert_with(|| col.name.clone());
415 }
416 }
417 datapress_core::sql::canonicalize_identifiers(sql, &tables, &columns)
418 }
419
420 pub async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
427 let cap = max_rows.max(1);
428 let sql = self.canonicalize_sql(sql);
429 let wrapped = if datapress_core::sql::is_describe(&sql) {
434 sql
435 } else {
436 format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
437 };
438 let df = self.ctx.sql(&wrapped).await?;
439 let batches = df.collect().await?;
440 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
441 return Ok("[]".to_string());
442 }
443 let batch = if batches.len() == 1 {
444 batches.into_iter().next().expect("checked len")
445 } else {
446 compute::concat_batches(&batches[0].schema(), batches.iter())?
447 };
448 let batch = if batch.num_rows() as u64 > cap {
451 batch.slice(0, cap as usize)
452 } else {
453 batch
454 };
455 serialize(&batch)
456 }
457
458 pub async fn query_sql_arrow_stream(
462 &self,
463 sql: &str,
464 max_rows: u64,
465 ) -> Result<ArrowIpcStream, AppError> {
466 let cap = max_rows.max(1);
467 let sql = self.canonicalize_sql(sql);
468 let wrapped = if datapress_core::sql::is_describe(&sql) {
472 sql
473 } else {
474 format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}")
475 };
476 let df = self.ctx.sql(&wrapped).await?;
477 let batches = df.collect().await?;
478 Ok(stream_arrow_batches(batches))
479 }
480
481 pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
485 let batch = self.query_batch(name, req).await?;
486 let schema = batch.schema();
487 let mut buf = Vec::with_capacity(8 * 1024);
488 {
489 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
490 if batch.num_rows() > 0 {
491 w.write(&batch)?;
492 }
493 w.finish()?;
494 }
495 Ok(buf)
496 }
497
498 pub async fn query_arrow_stream(
499 &self,
500 name: &str,
501 req: &QueryRequest,
502 ) -> Result<ArrowIpcStream, AppError> {
503 let batches = self.query_batches(name, req).await?;
504 Ok(stream_arrow_batches(batches))
505 } pub async fn query_arrow_stream_all(
506 &self,
507 name: &str,
508 req: &QueryRequest,
509 ) -> Result<ArrowIpcStream, AppError> {
510 let batches = self.query_batches_all(name, req).await?;
511 Ok(stream_arrow_batches(batches))
512 }
513
514 pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
522 let req = QueryRequest {
524 columns: Vec::new(),
525 predicates: Vec::new(),
526 group_by: Vec::new(),
527 aggregations: Vec::new(),
528 having: Vec::new(),
529 distinct: false,
530 order_by: Vec::new(),
531 limit: None,
532 page: 1,
533 page_size: 1,
534 };
535 let st = self.dataset(name)?;
536 let batches = self.query_batches_all(name, &req).await?;
537 let schema = batches
541 .first()
542 .map(|b| b.schema())
543 .unwrap_or_else(|| st.arrow_schema.clone());
544
545 let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
546 {
547 let props = parquet::file::properties::WriterProperties::builder()
548 .set_compression(parquet::basic::Compression::SNAPPY)
549 .build();
550 let mut writer =
551 parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
552 .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
553 for batch in &batches {
554 if batch.num_rows() > 0 {
555 writer
556 .write(batch)
557 .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
558 }
559 }
560 writer
561 .close()
562 .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
563 }
564 Ok(bytes::Bytes::from(buf))
565 }
566
567 async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
570 let batches = self.query_batches(name, req).await?;
571 if batches.is_empty() {
572 return Ok(RecordBatch::new_empty(Arc::new(
573 arrow::datatypes::Schema::empty(),
574 )));
575 }
576 if batches.len() == 1 {
577 return Ok(batches.into_iter().next().expect("checked len"));
578 }
579 if batches.iter().all(|b| b.num_rows() == 0) {
580 return Ok(RecordBatch::new_empty(batches[0].schema()));
581 }
582 let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
583 Ok(batch)
584 }
585
586 async fn query_batches(
590 &self,
591 name: &str,
592 req: &QueryRequest,
593 ) -> Result<Vec<RecordBatch>, AppError> {
594 let st = self.dataset(name)?;
595
596 let page = req.page.max(1);
597 let page_size = req.page_size.clamp(1, self.max_page_size);
598 let offset = ((page - 1) * page_size) as usize;
599 let limit = page_size as usize;
600
601 self.query_batches_inner(st, req, Some((offset, limit)))
602 .await
603 }
604
605 async fn query_batches_all(
609 &self,
610 name: &str,
611 req: &QueryRequest,
612 ) -> Result<Vec<RecordBatch>, AppError> {
613 let st = self.dataset(name)?;
614 self.query_batches_inner(st, req, None).await
615 }
616
617 async fn query_batches_inner(
618 &self,
619 st: Arc<DatasetState>,
620 req: &QueryRequest,
621 page_window: Option<(usize, usize)>,
622 ) -> Result<Vec<RecordBatch>, AppError> {
623 let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
624
625 let can_fast_path = !st.lazy
632 && req.order_by.is_empty()
633 && (page_window.is_none() || req.limit.is_none())
634 && req.group_by.is_empty()
635 && !req.distinct;
636
637 if can_fast_path {
638 let total = st.num_rows();
639
640 if req.predicates.is_empty() {
643 if page_window.is_none() && req.limit.is_none() {
644 return st
645 .data
646 .iter()
647 .cloned()
648 .map(|batch| project(&st.schema, batch, &req.columns))
649 .collect();
650 }
651 let start = offset.min(total);
652 let len = limit.min(total - start);
653 let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
654 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
655 }
656
657 if let Some(rows) = try_index(&st.index, &req.predicates) {
660 let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
661 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
662 }
663 }
664
665 let (sql, params) = match page_window {
667 Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
668 None => build_query_stream_sql(&st.schema, req)?,
669 };
670 let mut df = self.ctx.sql(&sql).await?;
671 if !params.is_empty() {
672 df = df.with_param_values(params)?;
673 }
674 let batches = df.collect().await?;
675 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
676 let schema = batches
677 .first()
678 .map(|b| b.schema())
679 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
680 return Ok(vec![RecordBatch::new_empty(schema)]);
681 }
682 Ok(batches)
683 }
684}
685
686fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
687 let schema = batches
688 .first()
689 .map(|batch| batch.schema())
690 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
691 let (mut writer, stream) = arrow_ipc_stream_channel(8);
692
693 tokio::task::spawn_blocking(move || {
694 let result = (|| -> Result<(), AppError> {
695 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
696 for batch in batches {
697 if batch.num_rows() > 0 {
698 w.write(&batch)?;
699 }
700 }
701 w.finish()?;
702 Ok(())
703 })();
704 if let Err(err) = result {
705 log::error!("datafusion arrow stream failed: {err}");
706 writer.send_error(err);
707 }
708 });
709
710 stream
711}
712
713impl Store {
714 pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
718 let st = self.dataset(name)?;
719
720 if !st.lazy {
721 if req.predicates.is_empty() {
723 return Ok(st.num_rows() as i64);
724 }
725 if let Some(rows) = try_index(&st.index, &req.predicates) {
727 return Ok(rows.len() as i64);
728 }
729 }
730
731 let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
734 let mut df = self.ctx.sql(&sql).await?;
735 if !params.is_empty() {
736 df = df.with_param_values(params)?;
737 }
738 let batches = df.collect().await?;
739 let n = batches
740 .first()
741 .and_then(|b| {
742 b.column(0)
743 .as_any()
744 .downcast_ref::<arrow::array::Int64Array>()
745 })
746 .filter(|a| !a.is_empty())
747 .map(|a| a.value(0))
748 .unwrap_or(0);
749 Ok(n)
750 }
751}
752
753fn build_tuned_context(cfg: &DataFusionConfig) -> SessionContext {
780 let mut config = SessionConfig::new();
781 {
782 let opts = config.options_mut();
783 opts.execution.parquet.pushdown_filters = cfg.pushdown_filters;
784 opts.execution.parquet.reorder_filters = cfg.reorder_filters;
785 opts.catalog.information_schema = true;
792 }
793
794 let default_schema = config.options().catalog.default_schema.clone();
797
798 if !cfg.list_files_cache {
799 let ctx = SessionContext::new_with_config(config);
800 register_compat_udfs(&ctx, default_schema);
801 return ctx;
802 }
803
804 let ttl = (cfg.list_files_cache_ttl_secs > 0)
807 .then(|| Duration::from_secs(cfg.list_files_cache_ttl_secs));
808 let list_cache = Arc::new(DefaultListFilesCache::new(
809 cfg.list_files_cache_mb.saturating_mul(1024 * 1024),
810 ttl,
811 ));
812 let cache_manager = CacheManagerConfig::default().with_list_files_cache(Some(list_cache));
813
814 let runtime = RuntimeEnvBuilder::new()
815 .with_cache_manager(cache_manager)
816 .build_arc()
817 .expect("failed to build DataFusion runtime env");
818
819 let ctx = SessionContext::new_with_config_rt(config, runtime);
820 register_compat_udfs(&ctx, default_schema);
821 ctx
822}
823
824fn register_compat_udfs(ctx: &SessionContext, default_schema: String) {
839 ctx.register_udf(ScalarUDF::from(CurrentSchemaUdf::new(default_schema)));
840}
841
842#[derive(Debug, PartialEq, Eq, Hash)]
845struct CurrentSchemaUdf {
846 signature: Signature,
847 schema: String,
848}
849
850impl CurrentSchemaUdf {
851 fn new(schema: String) -> Self {
852 Self {
853 signature: Signature::nullary(Volatility::Stable),
854 schema,
855 }
856 }
857}
858
859impl ScalarUDFImpl for CurrentSchemaUdf {
860 fn as_any(&self) -> &dyn Any {
861 self
862 }
863
864 fn name(&self) -> &str {
865 "current_schema"
866 }
867
868 fn signature(&self) -> &Signature {
869 &self.signature
870 }
871
872 fn return_type(&self, _arg_types: &[DataType]) -> DfResult<DataType> {
873 Ok(DataType::Utf8)
874 }
875
876 fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> DfResult<ColumnarValue> {
877 Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(
878 self.schema.clone(),
879 ))))
880 }
881}
882
883async fn build_dataset(
884 d: &DatasetConfig,
885 ctx: &SessionContext,
886) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
887 if d.lazy {
894 match (d.source.kind, d.source.is_s3()) {
895 (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
896 (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
897 (SourceKind::Delta, _) => return build_lazy_delta(d, ctx).await,
898 }
899 }
900
901 let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
906 (SourceKind::Parquet, false) => read_local_parquet(d)?,
907 (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
908 (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
909 (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
910 };
911 if raw_batches.is_empty() {
912 return Err(AppError::EmptyDataset(format!(
913 "dataset '{}': source produced no batches",
914 d.name
915 )));
916 }
917 if raw_batches.iter().all(|b| b.num_rows() == 0) {
922 return Err(AppError::EmptyDataset(format!(
923 "dataset '{}': source has a schema but no rows",
924 d.name
925 )));
926 }
927
928 let chunks = raw_batches;
929 let arrow_sch = chunks[0].schema();
930
931 let columns: Vec<ColumnInfo> = arrow_sch
933 .fields()
934 .iter()
935 .map(|f| {
936 let dt = f.data_type();
937 ColumnInfo {
938 name: f.name().clone(),
939 logical: arrow_to_logical(dt),
940 sql_type: format!("{dt:?}"),
941 nullable: f.is_nullable(),
942 }
943 })
944 .collect();
945 let schema = DatasetSchema::new(&d.name, columns)
946 .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
947
948 let index = build_eq_index_with_policy(&chunks, &d.index);
953
954 let n_parts = std::thread::available_parallelism()
959 .map(|n| n.get())
960 .unwrap_or(4);
961 let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
962 for (i, b) in chunks.iter().enumerate() {
963 if b.num_rows() == 0 {
964 continue;
965 }
966 parts[i % n_parts].push(b.clone());
967 }
968 parts.retain(|p| !p.is_empty());
969 let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
970
971 let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
972 let mem_mb: usize = chunks
973 .iter()
974 .flat_map(|b| b.columns().iter())
975 .map(|c| c.get_buffer_memory_size())
976 .sum::<usize>()
977 / 1_048_576;
978 log::info!(
979 "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
980 d.name,
981 d.source.kind.as_str(),
982 total_rows,
983 schema.columns.len(),
984 mem_mb,
985 chunks.len(),
986 index.len()
987 );
988
989 Ok((
990 DatasetState {
991 schema,
992 data: chunks,
993 arrow_schema: arrow_sch,
994 index,
995 lazy: false,
996 },
997 provider,
998 ))
999}
1000
1001async fn build_lazy_local_parquet(
1006 d: &DatasetConfig,
1007 ctx: &SessionContext,
1008) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1009 let (url, part_keys) = lazy_local_listing(d)?;
1010
1011 let mut opts =
1012 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
1013 if !part_keys.is_empty() {
1014 opts = opts.with_table_partition_cols(
1015 part_keys
1016 .iter()
1017 .map(|k| (k.clone(), DataType::Utf8))
1018 .collect(),
1019 );
1020 }
1021
1022 let session_state = ctx.state();
1023 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
1026 AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
1027 })?;
1028
1029 if file_schema.fields().is_empty() {
1033 return Err(AppError::EmptyDataset(format!(
1034 "dataset '{}': no .parquet files at {}",
1035 d.name, d.source.location
1036 )));
1037 }
1038
1039 let cfg = ListingTableConfig::new(url)
1040 .with_listing_options(opts)
1041 .with_schema(file_schema.clone());
1042 let table = ListingTable::try_new(cfg).map_err(|e| {
1043 AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
1044 })?;
1045 let provider: Arc<dyn TableProvider> = Arc::new(table);
1046
1047 let mut fields: Vec<Field> = file_schema
1049 .fields()
1050 .iter()
1051 .map(|f| f.as_ref().clone())
1052 .collect();
1053 for k in &part_keys {
1054 if !fields.iter().any(|f| f.name() == k) {
1055 fields.push(Field::new(k, DataType::Utf8, false));
1056 }
1057 }
1058 let arrow_sch = Arc::new(Schema::new(fields));
1059
1060 let columns: Vec<ColumnInfo> = arrow_sch
1061 .fields()
1062 .iter()
1063 .map(|f| {
1064 let dt = f.data_type();
1065 ColumnInfo {
1066 name: f.name().clone(),
1067 logical: arrow_to_logical(dt),
1068 sql_type: format!("{dt:?}"),
1069 nullable: f.is_nullable(),
1070 }
1071 })
1072 .collect();
1073 let schema = DatasetSchema::new(&d.name, columns)
1074 .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1075
1076 log::info!(
1077 "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
1078 d.name,
1079 d.source.kind.as_str(),
1080 schema.columns.len(),
1081 part_keys.len()
1082 );
1083
1084 Ok((
1085 DatasetState {
1086 schema,
1087 data: Vec::new(),
1088 arrow_schema: arrow_sch,
1089 index: EqIndex::default(),
1090 lazy: true,
1091 },
1092 provider,
1093 ))
1094}
1095
1096fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1101 let loc = &d.source.location;
1102
1103 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
1104 let parts: Vec<&str> = loc.split('/').collect();
1105 let first_wild = parts
1106 .iter()
1107 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1108 .unwrap_or(parts.len());
1109 let base = parts[..first_wild].join("/");
1110 let base = if base.is_empty() {
1111 "/".to_string()
1112 } else {
1113 base
1114 };
1115 let upper = parts.len().saturating_sub(1);
1118 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1119 .iter()
1120 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1121 .filter(|k| !k.is_empty())
1122 .collect();
1123 return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
1124 }
1125
1126 let path = std::path::Path::new(loc);
1127 if path.is_dir() {
1128 let keys = discover_hive_keys(path);
1129 return Ok((dir_url(path, d)?, keys));
1130 }
1131
1132 let url = ListingTableUrl::parse(loc)
1133 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
1134 Ok((url, Vec::new()))
1135}
1136
1137fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
1140 let s = path.to_str().ok_or_else(|| {
1141 AppError::Internal(format!(
1142 "dataset '{}': non-utf8 path {}",
1143 d.name,
1144 path.display()
1145 ))
1146 })?;
1147 let s = if s.ends_with('/') {
1148 s.to_string()
1149 } else {
1150 format!("{s}/")
1151 };
1152 ListingTableUrl::parse(&s)
1153 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
1154}
1155
1156fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
1160 let mut keys = Vec::new();
1161 let mut cur = base.to_path_buf();
1162 loop {
1163 let Ok(rd) = std::fs::read_dir(&cur) else {
1164 break;
1165 };
1166 let mut next: Option<(String, std::path::PathBuf)> = None;
1167 for entry in rd.flatten() {
1168 let p = entry.path();
1169 if !p.is_dir() {
1170 continue;
1171 }
1172 let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
1173 continue;
1174 };
1175 if let Some((k, v)) = name.split_once('=')
1176 && !k.is_empty()
1177 && !v.is_empty()
1178 {
1179 next = Some((k.to_string(), p));
1180 break;
1181 }
1182 }
1183 match next {
1184 Some((k, p)) => {
1185 keys.push(k);
1186 cur = p;
1187 }
1188 None => break,
1189 }
1190 }
1191 keys
1192}
1193
1194async fn build_lazy_s3_parquet(
1200 d: &DatasetConfig,
1201 ctx: &SessionContext,
1202) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1203 register_s3_object_store(d, ctx)?;
1204
1205 let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
1206
1207 if file_schema.fields().is_empty() {
1210 return Err(AppError::EmptyDataset(format!(
1211 "dataset '{}': no .parquet files at {}",
1212 d.name, d.source.location
1213 )));
1214 }
1215
1216 let mut fields: Vec<Field> = file_schema
1218 .fields()
1219 .iter()
1220 .map(|f| f.as_ref().clone())
1221 .collect();
1222 for k in &part_keys {
1223 if !fields.iter().any(|f| f.name() == k) {
1224 fields.push(Field::new(k, DataType::Utf8, false));
1225 }
1226 }
1227 let arrow_sch = Arc::new(Schema::new(fields));
1228
1229 let columns: Vec<ColumnInfo> = arrow_sch
1230 .fields()
1231 .iter()
1232 .map(|f| {
1233 let dt = f.data_type();
1234 ColumnInfo {
1235 name: f.name().clone(),
1236 logical: arrow_to_logical(dt),
1237 sql_type: format!("{dt:?}"),
1238 nullable: f.is_nullable(),
1239 }
1240 })
1241 .collect();
1242 let schema = DatasetSchema::new(&d.name, columns)
1243 .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1244
1245 log::info!(
1246 "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
1247 d.name,
1248 d.source.kind.as_str(),
1249 schema.columns.len(),
1250 part_keys.len()
1251 );
1252
1253 Ok((
1254 DatasetState {
1255 schema,
1256 data: Vec::new(),
1257 arrow_schema: arrow_sch,
1258 index: EqIndex::default(),
1259 lazy: true,
1260 },
1261 provider,
1262 ))
1263}
1264
1265async fn build_s3_listing_table(
1271 d: &DatasetConfig,
1272 ctx: &SessionContext,
1273) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
1274 let (url, part_keys) = s3_listing(d, ctx).await?;
1275
1276 let mut opts =
1277 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
1278 if !part_keys.is_empty() {
1279 opts = opts.with_table_partition_cols(
1280 part_keys
1281 .iter()
1282 .map(|k| (k.clone(), DataType::Utf8))
1283 .collect(),
1284 );
1285 }
1286
1287 let session_state = ctx.state();
1288 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
1289 AppError::Internal(format!(
1290 "dataset '{}': infer parquet schema on s3: {e}",
1291 d.name
1292 ))
1293 })?;
1294
1295 let cfg = ListingTableConfig::new(url)
1296 .with_listing_options(opts)
1297 .with_schema(file_schema.clone());
1298 let table = ListingTable::try_new(cfg).map_err(|e| {
1299 AppError::Internal(format!(
1300 "dataset '{}': ListingTable::try_new (s3): {e}",
1301 d.name
1302 ))
1303 })?;
1304 Ok((Arc::new(table), file_schema, part_keys))
1305}
1306
1307async fn s3_listing(
1313 d: &DatasetConfig,
1314 ctx: &SessionContext,
1315) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1316 let s3 = d.s3.clone().unwrap_or_default();
1317 let want_partitions = !matches!(s3.partitioning, Partitioning::None);
1318 let loc = &d.source.location;
1319
1320 if d.source.has_glob() {
1321 let (base, keys) = split_glob_base_keys(loc);
1322 let base = format!("{}/", base.trim_end_matches('/'));
1323 let url = ListingTableUrl::parse(&base).map_err(|e| {
1324 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1325 })?;
1326 let keys = if want_partitions { keys } else { Vec::new() };
1327 return Ok((url, keys));
1328 }
1329
1330 let base = if loc.ends_with('/') {
1331 loc.clone()
1332 } else {
1333 format!("{loc}/")
1334 };
1335 let url = ListingTableUrl::parse(&base).map_err(|e| {
1336 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1337 })?;
1338 let keys = if want_partitions {
1339 discover_s3_hive_keys(ctx, &url).await
1340 } else {
1341 Vec::new()
1342 };
1343 Ok((url, keys))
1344}
1345
1346fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
1350 let parts: Vec<&str> = loc.split('/').collect();
1351 let first_wild = parts
1352 .iter()
1353 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1354 .unwrap_or(parts.len());
1355 let base = parts[..first_wild].join("/");
1356 let base = if base.is_empty() {
1357 "/".to_string()
1358 } else {
1359 base
1360 };
1361 let upper = parts.len().saturating_sub(1);
1362 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1363 .iter()
1364 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1365 .filter(|k| !k.is_empty())
1366 .collect();
1367 (base, keys)
1368}
1369
1370async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
1375 let store = match ctx.runtime_env().object_store(url.object_store()) {
1376 Ok(s) => s,
1377 Err(_) => return Vec::new(),
1378 };
1379 let mut keys = Vec::new();
1380 let mut prefix = url.prefix().clone();
1381 loop {
1382 let listing = match store.list_with_delimiter(Some(&prefix)).await {
1383 Ok(l) => l,
1384 Err(_) => break,
1385 };
1386 let mut next: Option<object_store::path::Path> = None;
1387 for cp in &listing.common_prefixes {
1388 if let Some(seg) = cp.parts().next_back() {
1389 let seg = seg.as_ref().to_string();
1390 if let Some((k, v)) = seg.split_once('=')
1391 && !k.is_empty()
1392 && !v.is_empty()
1393 {
1394 keys.push(k.to_string());
1395 next = Some(cp.clone());
1396 break;
1397 }
1398 }
1399 }
1400 match next {
1401 Some(p) => prefix = p,
1402 None => break,
1403 }
1404 }
1405 keys
1406}
1407
1408fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1425 let files = d.resolve_local_parquet_files()?;
1426 let mut all = Vec::new();
1427 let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1428 None
1429 } else {
1430 Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1431 };
1432
1433 for f in &files {
1434 let file = std::fs::File::open(f)
1435 .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1436
1437 let probe = ParquetRecordBatchReaderBuilder::try_new(
1442 file.try_clone()
1443 .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1444 )?;
1445 let parquet_schema = probe.parquet_schema().clone();
1446 let arrow_schema = probe.schema().clone();
1447 let metadata = probe.metadata().clone();
1448 drop(probe);
1449
1450 let projection = if let Some(w) = &wanted {
1452 let indices: Vec<usize> = arrow_schema
1453 .fields()
1454 .iter()
1455 .enumerate()
1456 .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1457 .map(|(i, _)| i)
1458 .collect();
1459 if indices.is_empty() {
1460 return Err(AppError::Internal(format!(
1461 "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1462 d.name,
1463 d.columns,
1464 f.display()
1465 )));
1466 }
1467 ProjectionMask::roots(&parquet_schema, indices)
1468 } else {
1469 ProjectionMask::all()
1470 };
1471
1472 let mut new_fields: Vec<Field> = arrow_schema
1480 .fields()
1481 .iter()
1482 .map(|f| f.as_ref().clone())
1483 .collect();
1484 if d.dict_encode
1485 && let Some(rg0) = metadata.row_groups().first()
1486 {
1487 for (i, fld) in arrow_schema.fields().iter().enumerate() {
1488 if !matches!(
1489 fld.data_type(),
1490 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1491 ) {
1492 continue;
1493 }
1494 if let Some(col) = rg0.columns().get(i)
1495 && col.dictionary_page_offset().is_some()
1496 {
1497 new_fields[i] = Field::new(
1498 fld.name(),
1499 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1500 fld.is_nullable(),
1501 );
1502 }
1503 }
1504 }
1505 let forced_schema = Arc::new(Schema::new(new_fields));
1506
1507 let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1508 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1509 .with_batch_size(65_536)
1510 .with_projection(projection)
1511 .build()?;
1512 let pairs = hive_pairs(f);
1516 for batch in reader {
1517 let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1518 all.push(if pairs.is_empty() {
1519 batch
1520 } else {
1521 append_partition_cols(&batch, &pairs)?
1522 });
1523 }
1524 }
1525 if all.is_empty() {
1526 return Err(AppError::Internal(format!(
1527 "dataset '{}': parquet source is empty",
1528 d.name
1529 )));
1530 }
1531 Ok(all)
1532}
1533
1534fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1537 path.components()
1538 .filter_map(|c| c.as_os_str().to_str())
1539 .filter_map(|seg| {
1540 let (k, v) = seg.split_once('=')?;
1541 if k.is_empty() || v.is_empty() || v.contains('=') {
1542 return None;
1543 }
1544 Some((k.to_string(), v.to_string()))
1545 })
1546 .collect()
1547}
1548
1549fn append_partition_cols(
1552 batch: &RecordBatch,
1553 pairs: &[(String, String)],
1554) -> Result<RecordBatch, AppError> {
1555 let n = batch.num_rows();
1556 let mut fields: Vec<Field> = batch
1557 .schema()
1558 .fields()
1559 .iter()
1560 .map(|f| f.as_ref().clone())
1561 .collect();
1562 let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1563 for (k, v) in pairs {
1564 if fields.iter().any(|f| f.name() == k) {
1565 continue;
1566 }
1567 fields.push(Field::new(k, DataType::Utf8, false));
1568 cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1569 }
1570 RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1571 .map_err(|e| AppError::Internal(e.to_string()))
1572}
1573
1574async fn read_s3_parquet(
1580 d: &DatasetConfig,
1581 ctx: &SessionContext,
1582) -> Result<Vec<RecordBatch>, AppError> {
1583 register_s3_object_store(d, ctx)?;
1584 let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1585 let df = ctx
1586 .read_table(provider)
1587 .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1588 Ok(df.collect().await?)
1589}
1590
1591async fn open_delta_table(
1598 d: &DatasetConfig,
1599 opts: HashMap<String, String>,
1600) -> Result<deltalake::DeltaTable, AppError> {
1601 let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1602 AppError::Internal(format!(
1603 "dataset '{}': bad delta location '{}': {e}",
1604 d.name, d.source.location
1605 ))
1606 })?;
1607 deltalake::open_table_with_storage_options(url, opts)
1608 .await
1609 .map_err(|e| {
1610 let msg = e.to_string();
1620 let low = msg.to_lowercase();
1621 if low.contains("no files in log segment") || low.contains("not a delta table") {
1622 AppError::EmptyDataset(format!(
1623 "delta location '{}' has no committed files ({msg})",
1624 d.source.location
1625 ))
1626 } else {
1627 AppError::Internal(format!(
1628 "dataset '{}': delta open '{}': {msg}",
1629 d.name, d.source.location
1630 ))
1631 }
1632 })
1633}
1634
1635async fn open_delta_provider(
1641 d: &DatasetConfig,
1642 opts: HashMap<String, String>,
1643) -> Result<Arc<dyn TableProvider>, AppError> {
1644 let table = open_delta_table(d, opts).await?;
1645 table.table_provider().await.map_err(|e| {
1646 AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1647 })
1648}
1649
1650fn delta_storage_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1653 if d.source.is_s3() {
1654 delta_s3_options(d)
1655 } else {
1656 Ok(HashMap::new())
1657 }
1658}
1659
1660async fn read_delta(
1664 d: &DatasetConfig,
1665 opts: HashMap<String, String>,
1666) -> Result<Vec<RecordBatch>, AppError> {
1667 let provider = open_delta_provider(d, opts).await?;
1668 let scan_ctx = SessionContext::new();
1678 let df = scan_ctx.read_table(provider).map_err(|e| {
1679 AppError::EmptyDataset(format!(
1680 "delta location '{}' could not be scanned, skipping ({e})",
1681 d.source.location
1682 ))
1683 })?;
1684 df.collect().await.map_err(|e| {
1685 AppError::EmptyDataset(format!(
1686 "delta location '{}' could not be scanned, skipping ({e})",
1687 d.source.location
1688 ))
1689 })
1690}
1691
1692async fn build_lazy_delta(
1698 d: &DatasetConfig,
1699 _ctx: &SessionContext,
1700) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
1701 let table = open_delta_table(d, delta_storage_options(d)?).await?;
1702
1703 let file_count = table
1710 .get_file_uris()
1711 .map(|it| it.count())
1712 .map_err(|e| AppError::Internal(format!("dataset '{}': delta file list: {e}", d.name)))?;
1713 if file_count == 0 {
1714 return Err(AppError::EmptyDataset(format!(
1715 "delta location '{}' has a schema but no data files",
1716 d.source.location
1717 )));
1718 }
1719
1720 let provider = table.table_provider().await.map_err(|e| {
1721 AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1722 })?;
1723
1724 {
1737 let probe_ctx = SessionContext::new();
1738 let probe = probe_ctx
1739 .read_table(provider.clone())
1740 .and_then(|df| df.limit(0, Some(1)));
1741 match probe {
1742 Ok(df) => match df.collect().await {
1743 Ok(batches) if batches.iter().all(|b| b.num_rows() == 0) => {
1744 return Err(AppError::EmptyDataset(format!(
1745 "delta location '{}' resolves to no rows",
1746 d.source.location
1747 )));
1748 }
1749 Ok(_) => {}
1750 Err(e) => {
1751 return Err(AppError::EmptyDataset(format!(
1752 "delta location '{}' could not be scanned, skipping ({e})",
1753 d.source.location
1754 )));
1755 }
1756 },
1757 Err(e) => {
1758 return Err(AppError::EmptyDataset(format!(
1759 "delta location '{}' could not be scanned, skipping ({e})",
1760 d.source.location
1761 )));
1762 }
1763 }
1764 }
1765
1766 let arrow_sch = provider.schema();
1769 let columns: Vec<ColumnInfo> = arrow_sch
1770 .fields()
1771 .iter()
1772 .map(|f| {
1773 let dt = f.data_type();
1774 ColumnInfo {
1775 name: f.name().clone(),
1776 logical: arrow_to_logical(dt),
1777 sql_type: format!("{dt:?}"),
1778 nullable: f.is_nullable(),
1779 }
1780 })
1781 .collect();
1782 let schema = DatasetSchema::new(&d.name, columns)
1783 .with_filters(d.predicate_filter.clone(), d.projection_filter.clone())?;
1784
1785 log::info!(
1786 "dataset '{}' [{}, lazy]: {} cols, no materialise, no index",
1787 d.name,
1788 d.source.kind.as_str(),
1789 schema.columns.len()
1790 );
1791
1792 Ok((
1793 DatasetState {
1794 schema,
1795 data: Vec::new(),
1796 arrow_schema: arrow_sch,
1797 index: EqIndex::default(),
1798 lazy: true,
1799 },
1800 provider,
1801 ))
1802}
1803
1804fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1808 let creds = d.resolved_creds();
1809 let region = d.resolved_region();
1810 let s3 = d.s3.clone().unwrap_or_default();
1811 let (bucket, _) = d.source.s3_bucket()?;
1812
1813 let mut opts = HashMap::new();
1814 opts.insert("AWS_REGION".into(), region);
1815 if let Some(ep) = s3.effective_endpoint(bucket) {
1816 opts.insert("AWS_ENDPOINT_URL".into(), ep);
1817 }
1818 if s3.allow_http {
1819 opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1820 }
1821 opts.insert(
1822 "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1823 (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1824 );
1825 if let Some(k) = creds.access_key_id {
1826 opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1827 }
1828 if let Some(s) = creds.secret_access_key {
1829 opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1830 }
1831 if let Some(t) = creds.session_token {
1832 opts.insert("AWS_SESSION_TOKEN".into(), t);
1833 }
1834 opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1836 Ok(opts)
1837}
1838
1839fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1843 let (bucket, _key) = d.source.s3_bucket()?;
1844 let creds = d.resolved_creds();
1845 let region = d.resolved_region();
1846 let s3 = d.s3.clone().unwrap_or_default();
1847
1848 let store = build_s3(bucket, ®ion, &s3, &creds).map_err(|e| {
1849 AppError::Internal(format!(
1850 "dataset '{}': build S3 store for '{bucket}': {e}",
1851 d.name
1852 ))
1853 })?;
1854
1855 let url = Url::parse(&format!("s3://{bucket}"))
1856 .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1857 ctx.register_object_store(&url, Arc::new(store));
1858 Ok(())
1859}
1860
1861fn is_s3_access_denied(msg: &str) -> bool {
1868 let low = msg.to_lowercase();
1869 low.contains("access denied")
1870 || low.contains("accessdenied")
1871 || low.contains("forbidden")
1872 || low.contains("403")
1873}
1874
1875
1876async fn should_force_lazy(d: &DatasetConfig, server: &ServerConfig) -> Option<u64> {
1882 if d.lazy || server.force_lazy_above_mb == 0 {
1883 return None;
1884 }
1885 let threshold = server.force_lazy_above_mb.saturating_mul(1024 * 1024);
1886
1887 let bytes = if d.source.is_s3() {
1888 match estimate_s3_bytes(d).await {
1889 Ok(b) => b,
1890 Err(e) => {
1891 log::warn!(
1892 "dataset '{}': could not measure S3 size for force_lazy_above_mb: {e}",
1893 d.name
1894 );
1895 return None;
1896 }
1897 }
1898 } else {
1899 d.estimate_local_bytes()?
1900 };
1901
1902 (bytes > threshold).then_some(bytes)
1903}
1904
1905async fn estimate_s3_bytes(d: &DatasetConfig) -> Result<u64, AppError> {
1914 use futures_util::StreamExt;
1915 use object_store::ObjectStore;
1916
1917 let (bucket, _key) = d.source.s3_bucket()?;
1918 let creds = d.resolved_creds();
1919 let region = d.resolved_region();
1920 let s3 = d.s3.clone().unwrap_or_default();
1921 let store = build_s3(bucket, ®ion, &s3, &creds).map_err(|e| {
1922 AppError::Internal(format!(
1923 "dataset '{}': build S3 store for '{bucket}': {e}",
1924 d.name
1925 ))
1926 })?;
1927
1928 let (base, _keys) = split_glob_base_keys(&d.source.location);
1931 let prefix_key = base
1932 .strip_prefix("s3://")
1933 .and_then(|rest| rest.split_once('/').map(|(_bucket, key)| key))
1934 .unwrap_or("")
1935 .trim_end_matches('/');
1936 let prefix =
1937 (!prefix_key.is_empty()).then(|| object_store::path::Path::from(prefix_key));
1938
1939 let mut total: u64 = 0;
1940 let mut stream = store.list(prefix.as_ref());
1941 while let Some(meta) = stream.next().await {
1942 let meta = meta.map_err(|e| {
1943 AppError::Internal(format!(
1944 "dataset '{}': s3 list under '{prefix_key}': {e}",
1945 d.name
1946 ))
1947 })?;
1948 if meta.location.as_ref().ends_with(".parquet") {
1949 total = total.saturating_add(meta.size);
1950 }
1951 }
1952 Ok(total)
1953}
1954
1955fn build_s3(
1956 bucket: &str,
1957 region: &str,
1958 s3: &S3Config,
1959 creds: &ResolvedCreds,
1960) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1961 let mut b = AmazonS3Builder::new()
1962 .with_bucket_name(bucket)
1963 .with_region(region)
1964 .with_allow_http(s3.allow_http)
1965 .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1966 if let Some(ep) = s3.effective_endpoint(bucket) {
1967 b = b.with_endpoint(ep);
1968 }
1969 if let Some(k) = creds.access_key_id.as_deref() {
1970 b = b.with_access_key_id(k);
1971 }
1972 if let Some(s) = creds.secret_access_key.as_deref() {
1973 b = b.with_secret_access_key(s);
1974 }
1975 if let Some(t) = creds.session_token.as_deref() {
1976 b = b.with_token(t);
1977 }
1978 b.build()
1979}
1980
1981fn arrow_to_logical(dt: &DataType) -> LogicalType {
1982 match dt {
1983 DataType::Boolean => LogicalType::Bool,
1984 DataType::Int8
1985 | DataType::Int16
1986 | DataType::Int32
1987 | DataType::Int64
1988 | DataType::UInt8
1989 | DataType::UInt16
1990 | DataType::UInt32
1991 | DataType::UInt64 => LogicalType::Int,
1992 DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1993 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1994 DataType::Dictionary(_, v)
1998 if matches!(
1999 v.as_ref(),
2000 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
2001 ) =>
2002 {
2003 LogicalType::Utf8
2004 }
2005 DataType::Date32
2006 | DataType::Date64
2007 | DataType::Time32(_)
2008 | DataType::Time64(_)
2009 | DataType::Timestamp(_, _)
2010 | DataType::Duration(_)
2011 | DataType::Interval(_) => LogicalType::Temporal,
2012 _ => LogicalType::Other,
2013 }
2014}
2015
2016fn project(
2021 schema: &DatasetSchema,
2022 batch: RecordBatch,
2023 columns: &[String],
2024) -> Result<RecordBatch, AppError> {
2025 if columns.is_empty() {
2026 return Ok(batch);
2027 }
2028 let indices: Vec<usize> = columns
2029 .iter()
2030 .map(|c| {
2031 schema
2032 .find(c)
2033 .map(|info| schema.by_name[&info.name.to_lowercase()])
2034 })
2035 .collect::<Result<_, _>>()?;
2036 let fields: Vec<Field> = indices
2037 .iter()
2038 .map(|&i| batch.schema().field(i).clone())
2039 .collect();
2040 let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
2041 Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
2042}
2043
2044#[derive(Default)]
2057struct Params {
2058 values: Vec<ScalarValue>,
2059}
2060
2061impl Params {
2062 fn new() -> Self {
2063 Self::default()
2064 }
2065
2066 fn bind(&mut self, v: ScalarValue) -> String {
2068 self.values.push(v);
2069 format!("${}", self.values.len())
2070 }
2071
2072 fn into_values(self) -> Vec<ScalarValue> {
2073 self.values
2074 }
2075}
2076
2077fn build_query_sql(
2078 schema: &DatasetSchema,
2079 req: &QueryRequest,
2080 max_page_size: u64,
2081) -> Result<(String, Vec<ScalarValue>), AppError> {
2082 let (limit, offset) = req.effective_limit_offset(max_page_size);
2083 build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
2084}
2085
2086fn build_query_stream_sql(
2087 schema: &DatasetSchema,
2088 req: &QueryRequest,
2089) -> Result<(String, Vec<ScalarValue>), AppError> {
2090 let suffix = req
2091 .limit
2092 .map(|limit| format!(" LIMIT {limit}"))
2093 .unwrap_or_default();
2094 build_query_sql_with_suffix(schema, req, &suffix)
2095}
2096
2097fn build_query_sql_with_suffix(
2098 schema: &DatasetSchema,
2099 req: &QueryRequest,
2100 suffix: &str,
2101) -> Result<(String, Vec<ScalarValue>), AppError> {
2102 let agg_plan = req.agg_plan(schema)?;
2103
2104 let cols = if let Some(plan) = &agg_plan {
2105 let mut parts: Vec<String> = plan
2107 .group_cols
2108 .iter()
2109 .map(|c| DatasetSchema::quote_ident(c))
2110 .collect();
2111 for a in &plan.aggs {
2112 let expr = a.sql_expr()?;
2113 parts.push(format!(
2114 "{expr} AS {}",
2115 DatasetSchema::quote_ident(&a.alias)
2116 ));
2117 }
2118 parts.join(", ")
2119 } else if req.columns.is_empty() {
2120 if req.distinct {
2121 "DISTINCT *".to_string()
2122 } else {
2123 "*".to_string()
2124 }
2125 } else {
2126 let list = req
2127 .columns
2128 .iter()
2129 .map(|c| {
2130 schema
2131 .find(c)
2132 .map(|info| DatasetSchema::quote_ident(&info.name))
2133 })
2134 .collect::<Result<Vec<_>, _>>()?
2135 .join(", ");
2136 if req.distinct {
2137 format!("DISTINCT {list}")
2138 } else {
2139 list
2140 }
2141 };
2142
2143 let mut params = Params::new();
2144 let clauses: Vec<String> = req
2145 .predicates
2146 .iter()
2147 .map(|p| pred_to_sql(schema, p, &mut params))
2148 .collect::<Result<_, _>>()?;
2149
2150 let table = DatasetSchema::quote_ident(&schema.name);
2151 let where_clause = if clauses.is_empty() {
2152 String::new()
2153 } else {
2154 format!(" WHERE {}", clauses.join(" AND "))
2155 };
2156 let group_clause = match &agg_plan {
2157 Some(p) => format!(
2158 " GROUP BY {}",
2159 p.group_cols
2160 .iter()
2161 .map(|c| DatasetSchema::quote_ident(c))
2162 .collect::<Vec<_>>()
2163 .join(", "),
2164 ),
2165 None => String::new(),
2166 };
2167 let having_clause = {
2168 let resolved = req.having_plan(agg_plan.as_ref())?;
2169 if resolved.is_empty() {
2170 String::new()
2171 } else {
2172 let clauses: Vec<String> = resolved
2173 .iter()
2174 .map(|(lhs, p)| pred_to_sql_with_lhs(lhs, p, &mut params))
2175 .collect::<Result<_, _>>()?;
2176 format!(" HAVING {}", clauses.join(" AND "))
2177 }
2178 };
2179 let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
2180 Some(s) => format!(" ORDER BY {s}"),
2181 None => String::new(),
2182 };
2183 let sql =
2184 format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{having_clause}{order_clause}{suffix}");
2185 Ok((sql, params.into_values()))
2186}
2187
2188fn build_count_sql(
2189 schema: &DatasetSchema,
2190 predicates: &[Predicate],
2191) -> Result<(String, Vec<ScalarValue>), AppError> {
2192 let mut params = Params::new();
2193 let clauses: Vec<String> = predicates
2194 .iter()
2195 .map(|p| pred_to_sql(schema, p, &mut params))
2196 .collect::<Result<_, _>>()?;
2197 let table = DatasetSchema::quote_ident(&schema.name);
2198 let where_clause = if clauses.is_empty() {
2199 String::new()
2200 } else {
2201 format!(" WHERE {}", clauses.join(" AND "))
2202 };
2203 let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
2204 Ok((sql, params.into_values()))
2205}
2206
2207fn pred_to_sql(
2208 schema: &DatasetSchema,
2209 pred: &Predicate,
2210 params: &mut Params,
2211) -> Result<String, AppError> {
2212 let info = schema.find(&pred.col)?;
2213 let col = DatasetSchema::quote_ident(&info.name);
2214 pred_to_sql_with_lhs(&col, pred, params)
2215}
2216
2217fn pred_to_sql_with_lhs(
2223 col: &str,
2224 pred: &Predicate,
2225 params: &mut Params,
2226) -> Result<String, AppError> {
2227 match pred.op.as_str() {
2228 "is_null" => return Ok(format!("{col} IS NULL")),
2229 "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
2230 _ => {}
2231 }
2232
2233 let val = pred
2234 .val
2235 .as_ref()
2236 .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
2237
2238 if pred.op == "in" {
2239 let items = val
2240 .as_array()
2241 .filter(|a| !a.is_empty())
2242 .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
2243 let placeholders: Vec<String> = items
2244 .iter()
2245 .map(|item| Ok(params.bind(json_to_scalar(item)?)))
2246 .collect::<Result<_, AppError>>()?;
2247 return Ok(format!("{col} IN ({})", placeholders.join(", ")));
2248 }
2249
2250 let sql_op = match pred.op.as_str() {
2251 "eq" => "=",
2252 "neq" => "!=",
2253 "gt" => ">",
2254 "gte" => ">=",
2255 "lt" => "<",
2256 "lte" => "<=",
2257 "like" => "LIKE",
2258 "ilike" => "ILIKE",
2259 other => return Err(AppError::UnknownOperator(other.into())),
2260 };
2261 let placeholder = params.bind(json_to_scalar(val)?);
2262 Ok(format!("{col} {sql_op} {placeholder}"))
2263}
2264
2265fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
2269 match val {
2270 JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
2271 JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
2272 JsonValue::Null => Ok(ScalarValue::Null),
2273 JsonValue::Number(n) => {
2274 if let Some(i) = n.as_i64() {
2275 Ok(ScalarValue::Int64(Some(i)))
2276 } else if let Some(u) = n.as_u64() {
2277 Ok(ScalarValue::UInt64(Some(u)))
2278 } else if let Some(f) = n.as_f64() {
2279 Ok(ScalarValue::Float64(Some(f)))
2280 } else {
2281 Err(AppError::InvalidValue(
2282 "unsupported numeric literal in predicate".into(),
2283 ))
2284 }
2285 }
2286 _ => Err(AppError::InvalidValue(
2287 "unsupported literal type in predicate".into(),
2288 )),
2289 }
2290}
2291
2292fn json_index_key(val: &JsonValue) -> Option<String> {
2297 match val {
2298 JsonValue::String(s) => Some(s.clone()),
2299 JsonValue::Number(n) => Some(n.to_string()),
2300 JsonValue::Bool(b) => Some(b.to_string()),
2301 _ => None,
2302 }
2303}
2304
2305fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2306 let mut out = Vec::new();
2307 let (mut i, mut j) = (0, 0);
2308 while i < a.len() && j < b.len() {
2309 match a[i].cmp(&b[j]) {
2310 Ordering::Equal => {
2311 out.push(a[i]);
2312 i += 1;
2313 j += 1;
2314 }
2315 Ordering::Less => i += 1,
2316 Ordering::Greater => j += 1,
2317 }
2318 }
2319 out
2320}
2321
2322fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
2323 let mut out = Vec::with_capacity(a.len() + b.len());
2324 let (mut i, mut j) = (0, 0);
2325 while i < a.len() && j < b.len() {
2326 match a[i].cmp(&b[j]) {
2327 Ordering::Less => {
2328 out.push(a[i]);
2329 i += 1;
2330 }
2331 Ordering::Greater => {
2332 out.push(b[j]);
2333 j += 1;
2334 }
2335 Ordering::Equal => {
2336 out.push(a[i]);
2337 i += 1;
2338 j += 1;
2339 }
2340 }
2341 }
2342 out.extend_from_slice(&a[i..]);
2343 out.extend_from_slice(&b[j..]);
2344 out
2345}
2346
2347fn try_index<'a>(index: &'a EqIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2348 if predicates.is_empty() || index.is_empty() {
2349 return None;
2350 }
2351
2352 if let [pred] = predicates
2355 && pred.op.as_str() == "eq"
2356 {
2357 let col_lower = pred.col.to_lowercase();
2358 let col_map = index.get(&col_lower)?;
2359 let key = json_index_key(pred.val.as_ref()?)?;
2360 return Some(match col_map.get(&key) {
2361 Some(rows) => Cow::Borrowed(rows.as_slice()),
2362 None => Cow::Owned(Vec::new()),
2363 });
2364 }
2365
2366 let mut result: Option<Vec<u32>> = None;
2367 for pred in predicates {
2368 let col_lower = pred.col.to_lowercase();
2369 let col_map = index.get(&col_lower)?;
2370
2371 let rows: Vec<u32> = match pred.op.as_str() {
2372 "eq" => {
2373 let key = json_index_key(pred.val.as_ref()?)?;
2374 col_map.get(&key).cloned().unwrap_or_default()
2375 }
2376 "in" => {
2377 let items = pred.val.as_ref()?.as_array()?;
2378 let mut merged: Vec<u32> = Vec::new();
2379 for item in items {
2380 if let Some(r) = col_map.get(&json_index_key(item)?) {
2381 merged = union_sorted(&merged, r);
2382 }
2383 }
2384 merged
2385 }
2386 _ => return None,
2387 };
2388
2389 result = Some(match result {
2390 None => rows,
2391 Some(r) => intersect_sorted(&r, &rows),
2392 });
2393 }
2394 result.map(Cow::Owned)
2395}
2396
2397#[doc(hidden)]
2401pub mod bench {
2402 use super::{EqIndex, FastMap, json_index_key, try_index};
2403 use datapress_core::models::Predicate;
2404 use serde_json::Value as JsonValue;
2405 use std::borrow::Cow;
2406
2407 pub struct BenchIndex(EqIndex);
2409
2410 pub fn single_bucket_index(col: &str, val: &JsonValue, rows: Vec<u32>) -> BenchIndex {
2414 let key = json_index_key(val).expect("benchable index key");
2415 let mut col_map: FastMap<String, Vec<u32>> = FastMap::default();
2416 col_map.insert(key, rows);
2417 let mut index: EqIndex = EqIndex::default();
2418 index.insert(col.to_string(), col_map);
2419 BenchIndex(index)
2420 }
2421
2422 pub fn lookup<'a>(idx: &'a BenchIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
2424 try_index(&idx.0, predicates)
2425 }
2426
2427 pub fn lookup_cloning(idx: &BenchIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
2432 let [pred] = predicates else { return None };
2433 if pred.op.as_str() != "eq" {
2434 return None;
2435 }
2436 let col_lower = pred.col.to_lowercase();
2437 let col_map = idx.0.get(&col_lower)?;
2438 let key = json_index_key(pred.val.as_ref()?)?;
2439 Some(col_map.get(&key).cloned().unwrap_or_default())
2440 }
2441}
2442
2443fn slice_global(
2446 chunks: &[RecordBatch],
2447 schema: &Arc<Schema>,
2448 offset: usize,
2449 limit: usize,
2450) -> Result<RecordBatch, AppError> {
2451 if limit == 0 || chunks.is_empty() {
2452 return Ok(RecordBatch::new_empty(schema.clone()));
2453 }
2454 let mut out = Vec::new();
2455 let mut to_skip = offset;
2456 let mut remaining = limit;
2457 for b in chunks {
2458 if remaining == 0 {
2459 break;
2460 }
2461 let n = b.num_rows();
2462 if to_skip >= n {
2463 to_skip -= n;
2464 continue;
2465 }
2466 let take = remaining.min(n - to_skip);
2467 out.push(b.slice(to_skip, take));
2468 to_skip = 0;
2469 remaining -= take;
2470 }
2471 if out.is_empty() {
2472 return Ok(RecordBatch::new_empty(schema.clone()));
2473 }
2474 compute::concat_batches(schema, out.iter()).map_err(AppError::from)
2475}
2476
2477fn take_page(
2482 chunks: &[RecordBatch],
2483 schema: &Arc<Schema>,
2484 rows: &[u32],
2485 offset: usize,
2486 limit: usize,
2487) -> Result<RecordBatch, AppError> {
2488 let start = offset.min(rows.len());
2489 let len = limit.min(rows.len() - start);
2490 if len == 0 || chunks.is_empty() {
2491 return Ok(RecordBatch::new_empty(schema.clone()));
2492 }
2493
2494 let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
2497 let mut acc: u32 = 0;
2498 offsets.push(0);
2499 for b in chunks {
2500 acc = acc
2501 .checked_add(b.num_rows() as u32)
2502 .expect("row count exceeds u32::MAX");
2503 offsets.push(acc);
2504 }
2505
2506 let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
2509 for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
2510 let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
2511 let local = gid - offsets[bi];
2512 buckets[bi].push((out_pos as u32, local));
2513 }
2514
2515 let mut takens: Vec<RecordBatch> = Vec::new();
2517 let mut dest: Vec<u32> = Vec::with_capacity(len);
2518 for (bi, bucket) in buckets.iter().enumerate() {
2519 if bucket.is_empty() {
2520 continue;
2521 }
2522 let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
2523 let cols: Vec<ArrayRef> = chunks[bi]
2524 .columns()
2525 .iter()
2526 .map(|c| {
2527 arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
2528 .map_err(AppError::from)
2529 })
2530 .collect::<Result<_, _>>()?;
2531 takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
2532 dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
2533 }
2534
2535 let stitched = compute::concat_batches(schema, takens.iter())?;
2537 let mut inv = vec![0u32; len];
2538 for (i, &d) in dest.iter().enumerate() {
2539 inv[d as usize] = i as u32;
2540 }
2541 let perm = UInt32Array::from(inv);
2542 let cols: Vec<ArrayRef> = stitched
2543 .columns()
2544 .iter()
2545 .map(|c| {
2546 arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
2547 .map_err(AppError::from)
2548 })
2549 .collect::<Result<_, _>>()?;
2550 RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
2551}
2552
2553fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
2557 use rayon::prelude::*;
2558
2559 if cfg.mode == IndexMode::None || chunks.is_empty() {
2560 return EqIndex::default();
2561 }
2562
2563 let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
2564 Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
2565 } else {
2566 None
2567 };
2568
2569 let max_card = if cfg.mode == IndexMode::Auto {
2570 Some(cfg.max_cardinality)
2571 } else {
2572 None
2573 };
2574
2575 let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
2577 let mut acc: u32 = 0;
2578 for b in chunks {
2579 batch_offsets.push(acc);
2580 acc = acc
2581 .checked_add(b.num_rows() as u32)
2582 .expect("row count exceeds u32::MAX");
2583 }
2584
2585 let schema = chunks[0].schema();
2586
2587 schema
2588 .fields()
2589 .par_iter()
2590 .enumerate()
2591 .filter_map(|(ci, field)| {
2592 let col_lower = field.name().to_lowercase();
2593 if let Some(a) = &allow
2594 && !a.contains_key(&col_lower)
2595 {
2596 return None;
2597 }
2598
2599 let dtype = field.data_type();
2602 let dict_utf8 = matches!(dtype,
2603 DataType::Dictionary(k, v)
2604 if matches!(k.as_ref(), DataType::Int32)
2605 && matches!(v.as_ref(), DataType::Utf8));
2606 match dtype {
2607 DataType::Utf8
2608 | DataType::Utf8View
2609 | DataType::Boolean
2610 | DataType::Int8
2611 | DataType::Int16
2612 | DataType::Int32
2613 | DataType::Int64 => {}
2614 _ if dict_utf8 => {}
2615 _ => return None,
2616 }
2617
2618 let mut map: FastMap<String, Vec<u32>> = FastMap::default();
2619
2620 for (bi, batch) in chunks.iter().enumerate() {
2621 let base = batch_offsets[bi];
2622 let col = batch.column(ci);
2623
2624 macro_rules! index_col {
2625 ($arr_ty:ty) => {{
2626 let arr = col.as_any().downcast_ref::<$arr_ty>()?;
2627 for row in 0..arr.len() {
2628 if arr.is_null(row) {
2629 continue;
2630 }
2631 let key = arr.value(row).to_string();
2632 let gid = base + row as u32;
2633 if let Some(v) = map.get_mut(&key) {
2634 v.push(gid);
2635 } else {
2636 if let Some(mc) = max_card {
2637 if map.len() >= mc {
2638 return None;
2639 }
2640 }
2641 map.insert(key, vec![gid]);
2642 }
2643 }
2644 }};
2645 }
2646
2647 if dict_utf8 {
2648 let arr = col
2655 .as_any()
2656 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
2657 )?;
2658 let keys = arr.keys();
2659 let values = arr.values().as_any().downcast_ref::<StringArray>()?;
2660 for row in 0..arr.len() {
2661 if arr.is_null(row) {
2662 continue;
2663 }
2664 let k = keys.value(row) as usize;
2665 let s = values.value(k);
2666 let gid = base + row as u32;
2667 if let Some(v) = map.get_mut(s) {
2668 v.push(gid);
2669 } else {
2670 if let Some(mc) = max_card
2671 && map.len() >= mc
2672 {
2673 return None;
2674 }
2675 map.insert(s.to_string(), vec![gid]);
2676 }
2677 }
2678 } else {
2679 match dtype {
2680 DataType::Utf8 => index_col!(StringArray),
2681 DataType::Utf8View => index_col!(StringViewArray),
2682 DataType::Boolean => index_col!(BooleanArray),
2683 DataType::Int8 => index_col!(Int8Array),
2684 DataType::Int16 => index_col!(Int16Array),
2685 DataType::Int32 => index_col!(Int32Array),
2686 DataType::Int64 => index_col!(Int64Array),
2687 _ => unreachable!(),
2688 }
2689 }
2690 }
2691
2692 Some((col_lower, map))
2693 })
2694 .collect()
2695}
2696
2697fn writable_inline(dt: &DataType) -> bool {
2710 match dt {
2711 DataType::Utf8
2712 | DataType::LargeUtf8
2713 | DataType::Utf8View
2714 | DataType::Boolean
2715 | DataType::Int8
2716 | DataType::Int16
2717 | DataType::Int32
2718 | DataType::Int64
2719 | DataType::UInt8
2720 | DataType::UInt16
2721 | DataType::UInt32
2722 | DataType::UInt64
2723 | DataType::Float32
2724 | DataType::Float64
2725 | DataType::Decimal128(_, _)
2726 | DataType::Decimal256(_, _) => true,
2727 DataType::Dictionary(k, v)
2728 if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
2729 {
2730 true
2731 }
2732 _ => false,
2733 }
2734}
2735
2736fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
2742 let schema = batch.schema();
2743 let to_cast: Vec<usize> = schema
2744 .fields()
2745 .iter()
2746 .enumerate()
2747 .filter_map(|(i, f)| {
2748 if writable_inline(f.data_type()) {
2749 None
2750 } else {
2751 Some(i)
2752 }
2753 })
2754 .collect();
2755 if to_cast.is_empty() {
2756 return Ok(batch.clone());
2757 }
2758 let new_fields: Vec<Field> = schema
2759 .fields()
2760 .iter()
2761 .enumerate()
2762 .map(|(i, f)| {
2763 if to_cast.contains(&i) {
2764 Field::new(f.name(), DataType::Utf8, f.is_nullable())
2765 } else {
2766 f.as_ref().clone()
2767 }
2768 })
2769 .collect();
2770 let new_schema = Arc::new(Schema::new(new_fields));
2771 let cols: Vec<ArrayRef> = batch
2772 .columns()
2773 .iter()
2774 .enumerate()
2775 .map(|(i, c)| {
2776 if to_cast.contains(&i) {
2777 compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2778 } else {
2779 Ok(c.clone())
2780 }
2781 })
2782 .collect::<Result<_, _>>()?;
2783 RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2784}
2785
2786#[allow(dead_code)]
2792#[derive(Clone, Copy)]
2793enum CmpOp {
2794 Eq,
2795 Neq,
2796 Gt,
2797 Gte,
2798 Lt,
2799 Lte,
2800 Like,
2801 ILike,
2802}
2803
2804#[allow(dead_code)]
2805fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2806 let arr = col
2807 .as_any()
2808 .downcast_ref::<StringArray>()
2809 .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2810 let s = Scalar::new(StringArray::from(vec![val]));
2811 Ok(eq(arr, &s)?)
2812}
2813
2814#[allow(dead_code)]
2815fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2816 macro_rules! num_cmp {
2817 ($arr_type:ty, $cast:ty) => {{
2818 let n = val
2819 .as_f64()
2820 .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2821 as $cast;
2822 let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2823 let s = Scalar::new(<$arr_type>::from(vec![n]));
2824 Ok(match op {
2825 CmpOp::Eq => eq(arr, &s)?,
2826 CmpOp::Neq => neq(arr, &s)?,
2827 CmpOp::Gt => gt(arr, &s)?,
2828 CmpOp::Gte => gt_eq(arr, &s)?,
2829 CmpOp::Lt => lt(arr, &s)?,
2830 CmpOp::Lte => lt_eq(arr, &s)?,
2831 CmpOp::Like | CmpOp::ILike => {
2832 return Err(AppError::InvalidValue(
2833 "LIKE requires a string column".into(),
2834 ));
2835 }
2836 })
2837 }};
2838 }
2839 match col.data_type() {
2840 DataType::Utf8 => {
2841 let s = val
2842 .as_str()
2843 .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
2844 let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
2845 let sc = Scalar::new(StringArray::from(vec![s]));
2846 Ok(match op {
2847 CmpOp::Eq => eq(arr, &sc)?,
2848 CmpOp::Neq => neq(arr, &sc)?,
2849 CmpOp::Gt => gt(arr, &sc)?,
2850 CmpOp::Gte => gt_eq(arr, &sc)?,
2851 CmpOp::Lt => lt(arr, &sc)?,
2852 CmpOp::Lte => lt_eq(arr, &sc)?,
2853 CmpOp::Like => compute::like(arr, &sc)?,
2854 CmpOp::ILike => compute::ilike(arr, &sc)?,
2855 })
2856 }
2857 DataType::Int8 => num_cmp!(Int8Array, i8),
2858 DataType::Int16 => num_cmp!(Int16Array, i16),
2859 DataType::Int32 => num_cmp!(Int32Array, i32),
2860 DataType::Int64 => num_cmp!(Int64Array, i64),
2861 DataType::Float32 => num_cmp!(Float32Array, f32),
2862 DataType::Float64 => num_cmp!(Float64Array, f64),
2863 dt => Err(AppError::InvalidValue(format!(
2864 "unsupported type for comparison: {dt:?}"
2865 ))),
2866 }
2867}
2868
2869pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
2874 let batch = cast_for_serialize(batch)?;
2879 let schema = batch.schema();
2880 let n_rows = batch.num_rows();
2881
2882 let keys: Vec<Vec<u8>> = schema
2883 .fields()
2884 .iter()
2885 .map(|f| {
2886 let mut k = Vec::with_capacity(f.name().len() + 3);
2887 k.push(b'"');
2888 k.extend_from_slice(f.name().as_bytes());
2889 k.extend_from_slice(b"\":");
2890 k
2891 })
2892 .collect();
2893
2894 let encoders: Vec<ColEnc> = batch
2899 .columns()
2900 .iter()
2901 .map(|c| ColEnc::new(c.as_ref()))
2902 .collect();
2903
2904 let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
2905 let mut itoa_buf = itoa::Buffer::new();
2906 let mut ryu_buf = ryu::Buffer::new();
2907 buf.push(b'[');
2908
2909 for row in 0..n_rows {
2910 if row > 0 {
2911 buf.push(b',');
2912 }
2913 buf.push(b'{');
2914 for (i, (key, enc)) in keys.iter().zip(encoders.iter()).enumerate() {
2915 if i > 0 {
2916 buf.push(b',');
2917 }
2918 buf.extend_from_slice(key);
2919 enc.write(&mut buf, row, &mut itoa_buf, &mut ryu_buf);
2920 }
2921 buf.push(b'}');
2922 }
2923
2924 buf.push(b']');
2925 Ok(unsafe { String::from_utf8_unchecked(buf) })
2926}
2927
2928enum ColEnc<'a> {
2933 Utf8(&'a StringArray),
2934 LargeUtf8(&'a LargeStringArray),
2935 Utf8View(&'a StringViewArray),
2936 DictI32Utf8(
2939 &'a arrow::array::DictionaryArray<arrow::datatypes::Int32Type>,
2940 &'a StringArray,
2941 ),
2942 Bool(&'a BooleanArray),
2943 I8(&'a Int8Array),
2944 I16(&'a Int16Array),
2945 I32(&'a Int32Array),
2946 I64(&'a Int64Array),
2947 U8(&'a UInt8Array),
2948 U16(&'a UInt16Array),
2949 U32(&'a UInt32Array),
2950 U64(&'a UInt64Array),
2951 Dec128(&'a Decimal128Array),
2952 Dec256(&'a Decimal256Array),
2953 F32(&'a Float32Array),
2954 F64(&'a Float64Array),
2955 Other(&'a dyn Array),
2957}
2958
2959impl<'a> ColEnc<'a> {
2960 fn new(col: &'a dyn Array) -> ColEnc<'a> {
2961 macro_rules! dc {
2962 ($t:ty) => {
2963 col.as_any().downcast_ref::<$t>().unwrap()
2964 };
2965 }
2966 match col.data_type() {
2967 DataType::Utf8 => ColEnc::Utf8(dc!(StringArray)),
2968 DataType::LargeUtf8 => ColEnc::LargeUtf8(dc!(LargeStringArray)),
2969 DataType::Utf8View => ColEnc::Utf8View(dc!(StringViewArray)),
2970 DataType::Dictionary(key, value)
2971 if matches!(key.as_ref(), DataType::Int32)
2972 && matches!(value.as_ref(), DataType::Utf8) =>
2973 {
2974 let dict = dc!(arrow::array::DictionaryArray<arrow::datatypes::Int32Type>);
2975 let values = dict
2976 .values()
2977 .as_any()
2978 .downcast_ref::<StringArray>()
2979 .unwrap();
2980 ColEnc::DictI32Utf8(dict, values)
2981 }
2982 DataType::Boolean => ColEnc::Bool(dc!(BooleanArray)),
2983 DataType::Int8 => ColEnc::I8(dc!(Int8Array)),
2984 DataType::Int16 => ColEnc::I16(dc!(Int16Array)),
2985 DataType::Int32 => ColEnc::I32(dc!(Int32Array)),
2986 DataType::Int64 => ColEnc::I64(dc!(Int64Array)),
2987 DataType::UInt8 => ColEnc::U8(dc!(UInt8Array)),
2988 DataType::UInt16 => ColEnc::U16(dc!(UInt16Array)),
2989 DataType::UInt32 => ColEnc::U32(dc!(UInt32Array)),
2990 DataType::UInt64 => ColEnc::U64(dc!(UInt64Array)),
2991 DataType::Decimal128(_, _) => ColEnc::Dec128(dc!(Decimal128Array)),
2992 DataType::Decimal256(_, _) => ColEnc::Dec256(dc!(Decimal256Array)),
2993 DataType::Float32 => ColEnc::F32(dc!(Float32Array)),
2994 DataType::Float64 => ColEnc::F64(dc!(Float64Array)),
2995 _ => ColEnc::Other(col),
2996 }
2997 }
2998
2999 #[inline]
3000 fn write(
3001 &self,
3002 buf: &mut Vec<u8>,
3003 row: usize,
3004 itoa_buf: &mut itoa::Buffer,
3005 ryu_buf: &mut ryu::Buffer,
3006 ) {
3007 macro_rules! int {
3008 ($arr:expr) => {{
3009 if $arr.is_null(row) {
3010 buf.extend_from_slice(b"null");
3011 } else {
3012 buf.extend_from_slice(itoa_buf.format($arr.value(row)).as_bytes());
3013 }
3014 }};
3015 }
3016 match self {
3017 ColEnc::Utf8(a) => {
3018 if a.is_null(row) {
3019 buf.extend_from_slice(b"null");
3020 } else {
3021 write_str(buf, a.value(row));
3022 }
3023 }
3024 ColEnc::LargeUtf8(a) => {
3025 if a.is_null(row) {
3026 buf.extend_from_slice(b"null");
3027 } else {
3028 write_str(buf, a.value(row));
3029 }
3030 }
3031 ColEnc::Utf8View(a) => {
3032 if a.is_null(row) {
3033 buf.extend_from_slice(b"null");
3034 } else {
3035 write_str(buf, a.value(row));
3036 }
3037 }
3038 ColEnc::DictI32Utf8(keys, values) => {
3039 if keys.is_null(row) {
3040 buf.extend_from_slice(b"null");
3041 } else {
3042 let k = keys.keys().value(row) as usize;
3043 write_str(buf, values.value(k));
3044 }
3045 }
3046 ColEnc::Bool(a) => {
3047 if a.is_null(row) {
3048 buf.extend_from_slice(b"null");
3049 } else {
3050 buf.extend_from_slice(if a.value(row) { b"true" } else { b"false" });
3051 }
3052 }
3053 ColEnc::I8(a) => int!(a),
3054 ColEnc::I16(a) => int!(a),
3055 ColEnc::I32(a) => int!(a),
3056 ColEnc::I64(a) => int!(a),
3057 ColEnc::U8(a) => int!(a),
3058 ColEnc::U16(a) => int!(a),
3059 ColEnc::U32(a) => int!(a),
3060 ColEnc::U64(a) => int!(a),
3061 ColEnc::Dec128(a) => {
3062 if a.is_null(row) {
3063 buf.extend_from_slice(b"null");
3064 } else {
3065 write_str(buf, &a.value_as_string(row));
3066 }
3067 }
3068 ColEnc::Dec256(a) => {
3069 if a.is_null(row) {
3070 buf.extend_from_slice(b"null");
3071 } else {
3072 write_str(buf, &a.value_as_string(row));
3073 }
3074 }
3075 ColEnc::F32(a) => {
3076 if a.is_null(row) {
3077 buf.extend_from_slice(b"null");
3078 } else {
3079 let v = a.value(row);
3080 if v.is_finite() {
3081 buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
3082 } else {
3083 buf.extend_from_slice(b"null");
3084 }
3085 }
3086 }
3087 ColEnc::F64(a) => {
3088 if a.is_null(row) {
3089 buf.extend_from_slice(b"null");
3090 } else {
3091 let v = a.value(row);
3092 if v.is_finite() {
3093 buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
3094 } else {
3095 buf.extend_from_slice(b"null");
3096 }
3097 }
3098 }
3099 ColEnc::Other(col) => {
3100 if col.is_null(row) {
3101 buf.extend_from_slice(b"null");
3102 } else {
3103 write_value(buf, *col, row);
3104 }
3105 }
3106 }
3107 }
3108}
3109
3110#[inline]
3111fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
3112 match col.data_type() {
3113 DataType::Utf8 => write_str(
3114 buf,
3115 col.as_any()
3116 .downcast_ref::<StringArray>()
3117 .unwrap()
3118 .value(row),
3119 ),
3120 DataType::LargeUtf8 => write_str(
3121 buf,
3122 col.as_any()
3123 .downcast_ref::<LargeStringArray>()
3124 .unwrap()
3125 .value(row),
3126 ),
3127 DataType::Utf8View => write_str(
3128 buf,
3129 col.as_any()
3130 .downcast_ref::<StringViewArray>()
3131 .unwrap()
3132 .value(row),
3133 ),
3134 DataType::Dictionary(key, value)
3135 if matches!(key.as_ref(), DataType::Int32)
3136 && matches!(value.as_ref(), DataType::Utf8) =>
3137 {
3138 let dict = col
3139 .as_any()
3140 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
3141 .unwrap();
3142 let keys = dict.keys();
3143 let values = dict
3144 .values()
3145 .as_any()
3146 .downcast_ref::<StringArray>()
3147 .unwrap();
3148 let k = keys.value(row) as usize;
3149 write_str(buf, values.value(k));
3150 }
3151 DataType::Boolean => {
3152 let v = col
3153 .as_any()
3154 .downcast_ref::<BooleanArray>()
3155 .unwrap()
3156 .value(row);
3157 buf.extend_from_slice(if v { b"true" } else { b"false" });
3158 }
3159 DataType::Int8 => {
3160 let mut b = itoa::Buffer::new();
3161 buf.extend_from_slice(
3162 b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
3163 .as_bytes(),
3164 );
3165 }
3166 DataType::Int16 => {
3167 let mut b = itoa::Buffer::new();
3168 buf.extend_from_slice(
3169 b.format(
3170 col.as_any()
3171 .downcast_ref::<Int16Array>()
3172 .unwrap()
3173 .value(row),
3174 )
3175 .as_bytes(),
3176 );
3177 }
3178 DataType::Int32 => {
3179 let mut b = itoa::Buffer::new();
3180 buf.extend_from_slice(
3181 b.format(
3182 col.as_any()
3183 .downcast_ref::<Int32Array>()
3184 .unwrap()
3185 .value(row),
3186 )
3187 .as_bytes(),
3188 );
3189 }
3190 DataType::Int64 => {
3191 let mut b = itoa::Buffer::new();
3192 buf.extend_from_slice(
3193 b.format(
3194 col.as_any()
3195 .downcast_ref::<Int64Array>()
3196 .unwrap()
3197 .value(row),
3198 )
3199 .as_bytes(),
3200 );
3201 }
3202 DataType::UInt8 => {
3203 let mut b = itoa::Buffer::new();
3204 buf.extend_from_slice(
3205 b.format(
3206 col.as_any()
3207 .downcast_ref::<UInt8Array>()
3208 .unwrap()
3209 .value(row),
3210 )
3211 .as_bytes(),
3212 );
3213 }
3214 DataType::UInt16 => {
3215 let mut b = itoa::Buffer::new();
3216 buf.extend_from_slice(
3217 b.format(
3218 col.as_any()
3219 .downcast_ref::<UInt16Array>()
3220 .unwrap()
3221 .value(row),
3222 )
3223 .as_bytes(),
3224 );
3225 }
3226 DataType::UInt32 => {
3227 let mut b = itoa::Buffer::new();
3228 buf.extend_from_slice(
3229 b.format(
3230 col.as_any()
3231 .downcast_ref::<UInt32Array>()
3232 .unwrap()
3233 .value(row),
3234 )
3235 .as_bytes(),
3236 );
3237 }
3238 DataType::UInt64 => {
3239 let mut b = itoa::Buffer::new();
3240 buf.extend_from_slice(
3241 b.format(
3242 col.as_any()
3243 .downcast_ref::<UInt64Array>()
3244 .unwrap()
3245 .value(row),
3246 )
3247 .as_bytes(),
3248 );
3249 }
3250 DataType::Decimal128(_, _) => {
3251 let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
3252 write_str(buf, &arr.value_as_string(row));
3253 }
3254 DataType::Decimal256(_, _) => {
3255 let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
3256 write_str(buf, &arr.value_as_string(row));
3257 }
3258 DataType::Float32 => {
3259 let v = col
3260 .as_any()
3261 .downcast_ref::<Float32Array>()
3262 .unwrap()
3263 .value(row);
3264 if v.is_finite() {
3265 let mut b = ryu::Buffer::new();
3266 buf.extend_from_slice(b.format_finite(v).as_bytes());
3267 } else {
3268 buf.extend_from_slice(b"null");
3269 }
3270 }
3271 DataType::Float64 => {
3272 let v = col
3273 .as_any()
3274 .downcast_ref::<Float64Array>()
3275 .unwrap()
3276 .value(row);
3277 if v.is_finite() {
3278 let mut b = ryu::Buffer::new();
3279 buf.extend_from_slice(b.format_finite(v).as_bytes());
3280 } else {
3281 buf.extend_from_slice(b"null");
3282 }
3283 }
3284 other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
3289 }
3290}
3291
3292#[inline]
3293fn write_str(buf: &mut Vec<u8>, s: &str) {
3294 buf.push(b'"');
3295 for &byte in s.as_bytes() {
3296 match byte {
3297 b'"' => buf.extend_from_slice(b"\\\""),
3298 b'\\' => buf.extend_from_slice(b"\\\\"),
3299 b'\n' => buf.extend_from_slice(b"\\n"),
3300 b'\r' => buf.extend_from_slice(b"\\r"),
3301 b'\t' => buf.extend_from_slice(b"\\t"),
3302 0x00..=0x1f => {
3303 buf.extend_from_slice(b"\\u00");
3304 const HEX: &[u8] = b"0123456789abcdef";
3305 buf.push(HEX[(byte >> 4) as usize]);
3306 buf.push(HEX[(byte & 0xf) as usize]);
3307 }
3308 b => buf.push(b),
3309 }
3310 }
3311 buf.push(b'"');
3312}
3313
3314#[async_trait]
3319impl Backend for Store {
3320 fn names(&self) -> Vec<String> {
3321 Store::names(self)
3322 }
3323
3324 fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
3325 let st = self.dataset(name)?;
3326 Ok(DatasetSummary {
3327 name: st.schema.name.clone(),
3328 columns: st.schema.columns.len(),
3329 rows: st.num_rows(),
3330 })
3331 }
3332
3333 fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
3334 let st = self.dataset(name)?;
3335 Ok(Arc::new(st.schema.clone()))
3336 }
3337
3338 fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
3339 let st = self.dataset(name)?;
3340 let mut cols: Vec<String> = st
3343 .schema
3344 .columns
3345 .iter()
3346 .map(|c| c.name.clone())
3347 .filter(|n| st.index.contains_key(n))
3348 .collect();
3349 let mut extras: Vec<String> = st
3352 .index
3353 .keys()
3354 .filter(|n| !cols.iter().any(|c| c == *n))
3355 .cloned()
3356 .collect();
3357 extras.sort();
3358 cols.extend(extras);
3359 Ok(cols)
3360 }
3361
3362 async fn sample(&self, name: &str) -> Result<String, AppError> {
3363 Store::sample(self, name).await
3364 }
3365
3366 async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
3367 Store::query(self, name, req).await
3368 }
3369
3370 async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
3371 Store::query_arrow(self, name, req).await
3372 }
3373
3374 async fn query_arrow_stream(
3375 &self,
3376 name: &str,
3377 req: &QueryRequest,
3378 ) -> Result<ArrowIpcStream, AppError> {
3379 Store::query_arrow_stream(self, name, req).await
3380 }
3381
3382 async fn query_arrow_stream_all(
3383 &self,
3384 name: &str,
3385 req: &QueryRequest,
3386 ) -> Result<ArrowIpcStream, AppError> {
3387 Store::query_arrow_stream_all(self, name, req).await
3388 }
3389
3390 async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
3391 Store::count(self, name, req).await
3392 }
3393
3394 async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
3395 Store::query_sql(self, sql, max_rows).await
3396 }
3397
3398 async fn query_sql_arrow_stream(
3399 &self,
3400 sql: &str,
3401 max_rows: u64,
3402 ) -> Result<ArrowIpcStream, AppError> {
3403 Store::query_sql_arrow_stream(self, sql, max_rows).await
3404 }
3405
3406 async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
3407 Store::parquet(self, name).await
3408 }
3409
3410 async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
3411 Store::reload(self, name).await
3412 }
3413
3414 async fn register(&self, cfg: DatasetConfig) -> Result<DatasetSummary, AppError> {
3415 Store::register(self, cfg).await
3416 }
3417}
3418
3419#[cfg(test)]
3420mod tests {
3421 use super::is_s3_access_denied;
3422
3423 #[test]
3424 fn detects_s3_access_denied_variants() {
3425 for msg in [
3427 "Generic S3 error: Error performing get request: response error \"<Error><Code>AccessDenied</Code></Error>\", status: 403",
3428 "Client error with status 403 Forbidden",
3429 "S3 error: Access Denied",
3430 "request failed: 403 Forbidden",
3431 ] {
3432 assert!(is_s3_access_denied(msg), "should flag: {msg}");
3433 }
3434 }
3435
3436 #[test]
3437 fn ignores_unrelated_errors() {
3438 for msg in [
3439 "Not a Delta table: Generic delta kernel error: No files in log segment",
3440 "object at location data/part.parquet not found",
3441 "failed to infer parquet schema: invalid magic bytes",
3442 ] {
3443 assert!(!is_s3_access_denied(msg), "should not flag: {msg}");
3444 }
3445 }
3446}
3447