1use std::borrow::Cow;
2use std::cmp::Ordering;
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5
6use arc_swap::ArcSwap;
7use arrow::array::{
8 Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
9 Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
10 StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
11};
12use arrow::compute;
13use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
14use arrow::datatypes::{DataType, Field, Schema};
15use async_trait::async_trait;
16use parquet::arrow::ProjectionMask;
17use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
18use serde_json::Value as JsonValue;
19
20use datafusion::datasource::file_format::parquet::ParquetFormat;
21use datafusion::datasource::listing::{
22 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
23};
24use datafusion::datasource::{MemTable, TableProvider};
25use datafusion::prelude::SessionContext;
26use datafusion::scalar::ScalarValue;
27
28use object_store::aws::AmazonS3Builder;
29use url::Url;
30
31use datapress_core::backend::{
32 ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
33};
34use datapress_core::config::{
35 AddressingStyle, AppConfig, DatasetConfig, IndexConfig, IndexMode, Partitioning, ResolvedCreds,
36 S3Config, SourceKind,
37};
38use datapress_core::errors::AppError;
39use datapress_core::models::{CountRequest, Predicate, QueryRequest};
40use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
41
42type FastMap<K, V> = HashMap<K, V, ahash::RandomState>;
51
52type EqIndex = FastMap<String, FastMap<String, Vec<u32>>>;
54
55pub struct DatasetState {
70 pub schema: DatasetSchema,
71 pub data: Vec<RecordBatch>,
72 pub arrow_schema: Arc<Schema>,
73 pub index: EqIndex,
74 pub lazy: bool,
75}
76
77impl DatasetState {
78 pub fn num_rows(&self) -> usize {
80 self.data.iter().map(|b| b.num_rows()).sum()
81 }
82}
83
84pub struct Store {
89 ctx: SessionContext,
90 max_page_size: u64,
91 configs: HashMap<String, DatasetConfig>,
94 datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
96 reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
99}
100
101impl Store {
102 pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
104 if cfg
107 .datasets
108 .iter()
109 .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
110 {
111 deltalake::aws::register_handlers(None);
112 }
113
114 let ctx = SessionContext::new();
120 let mut datasets = HashMap::with_capacity(cfg.datasets.len());
121 let mut configs = HashMap::with_capacity(cfg.datasets.len());
122
123 for d in &cfg.datasets {
124 log::info!(
125 "Loading dataset '{}' ({} @ {})",
126 d.name,
127 d.source.kind.as_str(),
128 d.source.location
129 );
130 let (state, provider) = build_dataset(d, &ctx).await?;
131 ctx.register_table(d.name.as_str(), provider)?;
132 datasets.insert(d.name.clone(), Arc::new(state));
133 configs.insert(d.name.clone(), d.clone());
134 }
135 Ok(Self {
136 ctx,
137 max_page_size: cfg.server.max_page_size.max(1),
138 configs,
139 datasets: ArcSwap::from_pointee(datasets),
140 reload_locks: Mutex::new(HashMap::new()),
141 })
142 }
143
144 pub fn names(&self) -> Vec<String> {
146 let snap = self.datasets.load();
147 let mut v: Vec<String> = snap.keys().cloned().collect();
148 v.sort();
149 v
150 }
151
152 pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
153 self.datasets
154 .load()
155 .get(name)
156 .cloned()
157 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
158 }
159
160 pub async fn sample(&self, name: &str) -> Result<String, AppError> {
163 let st = self.dataset(name)?;
164
165 if st.lazy {
167 let table = DatasetSchema::quote_ident(&st.schema.name);
168 let sql = format!("SELECT * FROM {table} LIMIT 1");
169 let df = self.ctx.sql(&sql).await?;
170 let batches = df.collect().await?;
171 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
172 return Ok("null".into());
173 }
174 let arr = serialize(&batches[0].slice(0, 1))?;
175 let trimmed = arr.trim();
176 let inner = trimmed
177 .strip_prefix('[')
178 .and_then(|s| s.strip_suffix(']'))
179 .unwrap_or(trimmed);
180 return Ok(inner.to_string());
181 }
182
183 let first = match st.data.iter().find(|b| b.num_rows() > 0) {
184 Some(b) => b,
185 None => return Ok("null".into()),
186 };
187 let arr = serialize(&first.slice(0, 1))?;
188 let trimmed = arr.trim();
190 let inner = trimmed
191 .strip_prefix('[')
192 .and_then(|s| s.strip_suffix(']'))
193 .unwrap_or(trimmed);
194 Ok(inner.to_string())
195 }
196
197 pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
202 let cfg = self
204 .configs
205 .get(name)
206 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
207 .clone();
208
209 let lock = {
211 let mut locks = self.reload_locks.lock().unwrap();
212 locks
213 .entry(name.to_string())
214 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
215 .clone()
216 };
217 let _guard = lock.lock().await;
218
219 let started = std::time::Instant::now();
220
221 let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
224 let rows = state.num_rows();
225
226 let _ = self.ctx.deregister_table(name)?;
232 self.ctx.register_table(name, provider)?;
233
234 let mut new_map = (**self.datasets.load()).clone();
235 new_map.insert(name.to_string(), Arc::new(state));
236 self.datasets.store(Arc::new(new_map));
237
238 let elapsed_ms = started.elapsed().as_millis();
239 log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
240 Ok(ReloadStats { rows, elapsed_ms })
241 }
242
243 pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
247 let batch = self.query_batch(name, req).await?;
248 if batch.num_rows() == 0 {
249 return Ok("[]".to_string());
250 }
251 serialize(&batch)
252 }
253
254 fn canonicalize_sql(&self, sql: &str) -> String {
260 let snap = self.datasets.load();
261 let mut tables: HashMap<String, String> = HashMap::with_capacity(snap.len());
262 let mut columns: HashMap<String, String> = HashMap::new();
263 for (name, state) in snap.iter() {
264 tables.insert(name.to_lowercase(), name.clone());
265 for col in &state.schema.columns {
266 columns
267 .entry(col.name.to_lowercase())
268 .or_insert_with(|| col.name.clone());
269 }
270 }
271 datapress_core::sql::canonicalize_identifiers(sql, &tables, &columns)
272 }
273
274 pub async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
281 let cap = max_rows.max(1);
282 let sql = self.canonicalize_sql(sql);
283 let wrapped = format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}");
284 let df = self.ctx.sql(&wrapped).await?;
285 let batches = df.collect().await?;
286 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
287 return Ok("[]".to_string());
288 }
289 let batch = if batches.len() == 1 {
290 batches.into_iter().next().expect("checked len")
291 } else {
292 compute::concat_batches(&batches[0].schema(), batches.iter())?
293 };
294 let batch = if batch.num_rows() as u64 > cap {
297 batch.slice(0, cap as usize)
298 } else {
299 batch
300 };
301 serialize(&batch)
302 }
303
304 pub async fn query_sql_arrow_stream(
308 &self,
309 sql: &str,
310 max_rows: u64,
311 ) -> Result<ArrowIpcStream, AppError> {
312 let cap = max_rows.max(1);
313 let sql = self.canonicalize_sql(sql);
314 let wrapped = format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}");
315 let df = self.ctx.sql(&wrapped).await?;
316 let batches = df.collect().await?;
317 Ok(stream_arrow_batches(batches))
318 }
319
320 pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
324 let batch = self.query_batch(name, req).await?;
325 let schema = batch.schema();
326 let mut buf = Vec::with_capacity(8 * 1024);
327 {
328 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
329 if batch.num_rows() > 0 {
330 w.write(&batch)?;
331 }
332 w.finish()?;
333 }
334 Ok(buf)
335 }
336
337 pub async fn query_arrow_stream(
338 &self,
339 name: &str,
340 req: &QueryRequest,
341 ) -> Result<ArrowIpcStream, AppError> {
342 let batches = self.query_batches(name, req).await?;
343 Ok(stream_arrow_batches(batches))
344 } pub async fn query_arrow_stream_all(
345 &self,
346 name: &str,
347 req: &QueryRequest,
348 ) -> Result<ArrowIpcStream, AppError> {
349 let batches = self.query_batches_all(name, req).await?;
350 Ok(stream_arrow_batches(batches))
351 }
352
353 pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
361 let req = QueryRequest {
363 columns: Vec::new(),
364 predicates: Vec::new(),
365 group_by: Vec::new(),
366 aggregations: Vec::new(),
367 distinct: false,
368 order_by: Vec::new(),
369 limit: None,
370 page: 1,
371 page_size: 1,
372 };
373 let st = self.dataset(name)?;
374 let batches = self.query_batches_all(name, &req).await?;
375 let schema = batches
379 .first()
380 .map(|b| b.schema())
381 .unwrap_or_else(|| st.arrow_schema.clone());
382
383 let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
384 {
385 let props = parquet::file::properties::WriterProperties::builder()
386 .set_compression(parquet::basic::Compression::SNAPPY)
387 .build();
388 let mut writer =
389 parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
390 .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
391 for batch in &batches {
392 if batch.num_rows() > 0 {
393 writer
394 .write(batch)
395 .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
396 }
397 }
398 writer
399 .close()
400 .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
401 }
402 Ok(bytes::Bytes::from(buf))
403 }
404
405 async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
408 let batches = self.query_batches(name, req).await?;
409 if batches.is_empty() {
410 return Ok(RecordBatch::new_empty(Arc::new(
411 arrow::datatypes::Schema::empty(),
412 )));
413 }
414 if batches.len() == 1 {
415 return Ok(batches.into_iter().next().expect("checked len"));
416 }
417 if batches.iter().all(|b| b.num_rows() == 0) {
418 return Ok(RecordBatch::new_empty(batches[0].schema()));
419 }
420 let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
421 Ok(batch)
422 }
423
424 async fn query_batches(
428 &self,
429 name: &str,
430 req: &QueryRequest,
431 ) -> Result<Vec<RecordBatch>, AppError> {
432 let st = self.dataset(name)?;
433
434 let page = req.page.max(1);
435 let page_size = req.page_size.clamp(1, self.max_page_size);
436 let offset = ((page - 1) * page_size) as usize;
437 let limit = page_size as usize;
438
439 self.query_batches_inner(st, req, Some((offset, limit)))
440 .await
441 }
442
443 async fn query_batches_all(
447 &self,
448 name: &str,
449 req: &QueryRequest,
450 ) -> Result<Vec<RecordBatch>, AppError> {
451 let st = self.dataset(name)?;
452 self.query_batches_inner(st, req, None).await
453 }
454
455 async fn query_batches_inner(
456 &self,
457 st: Arc<DatasetState>,
458 req: &QueryRequest,
459 page_window: Option<(usize, usize)>,
460 ) -> Result<Vec<RecordBatch>, AppError> {
461 let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
462
463 let can_fast_path = !st.lazy
470 && req.order_by.is_empty()
471 && (page_window.is_none() || req.limit.is_none())
472 && req.group_by.is_empty()
473 && !req.distinct;
474
475 if can_fast_path {
476 let total = st.num_rows();
477
478 if req.predicates.is_empty() {
481 if page_window.is_none() && req.limit.is_none() {
482 return st
483 .data
484 .iter()
485 .cloned()
486 .map(|batch| project(&st.schema, batch, &req.columns))
487 .collect();
488 }
489 let start = offset.min(total);
490 let len = limit.min(total - start);
491 let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
492 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
493 }
494
495 if let Some(rows) = try_index(&st.index, &req.predicates) {
498 let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
499 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
500 }
501 }
502
503 let (sql, params) = match page_window {
505 Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
506 None => build_query_stream_sql(&st.schema, req)?,
507 };
508 let mut df = self.ctx.sql(&sql).await?;
509 if !params.is_empty() {
510 df = df.with_param_values(params)?;
511 }
512 let batches = df.collect().await?;
513 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
514 let schema = batches
515 .first()
516 .map(|b| b.schema())
517 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
518 return Ok(vec![RecordBatch::new_empty(schema)]);
519 }
520 Ok(batches)
521 }
522}
523
524fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
525 let schema = batches
526 .first()
527 .map(|batch| batch.schema())
528 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
529 let (mut writer, stream) = arrow_ipc_stream_channel(8);
530
531 tokio::task::spawn_blocking(move || {
532 let result = (|| -> Result<(), AppError> {
533 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
534 for batch in batches {
535 if batch.num_rows() > 0 {
536 w.write(&batch)?;
537 }
538 }
539 w.finish()?;
540 Ok(())
541 })();
542 if let Err(err) = result {
543 log::error!("datafusion arrow stream failed: {err}");
544 writer.send_error(err);
545 }
546 });
547
548 stream
549}
550
551impl Store {
552 pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
556 let st = self.dataset(name)?;
557
558 if !st.lazy {
559 if req.predicates.is_empty() {
561 return Ok(st.num_rows() as i64);
562 }
563 if let Some(rows) = try_index(&st.index, &req.predicates) {
565 return Ok(rows.len() as i64);
566 }
567 }
568
569 let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
572 let mut df = self.ctx.sql(&sql).await?;
573 if !params.is_empty() {
574 df = df.with_param_values(params)?;
575 }
576 let batches = df.collect().await?;
577 let n = batches
578 .first()
579 .and_then(|b| {
580 b.column(0)
581 .as_any()
582 .downcast_ref::<arrow::array::Int64Array>()
583 })
584 .filter(|a| !a.is_empty())
585 .map(|a| a.value(0))
586 .unwrap_or(0);
587 Ok(n)
588 }
589}
590
591async fn build_dataset(
596 d: &DatasetConfig,
597 ctx: &SessionContext,
598) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
599 if d.lazy {
605 match (d.source.kind, d.source.is_s3()) {
606 (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
607 (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
608 (SourceKind::Delta, _) => {
609 return Err(AppError::Internal(format!(
610 "dataset '{}': lazy mode is not supported for delta sources",
611 d.name
612 )));
613 }
614 }
615 }
616
617 let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
622 (SourceKind::Parquet, false) => read_local_parquet(d)?,
623 (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
624 (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
625 (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
626 };
627 if raw_batches.is_empty() {
628 return Err(AppError::Internal(format!(
629 "dataset '{}': source produced no batches",
630 d.name
631 )));
632 }
633
634 let chunks = raw_batches;
635 let arrow_sch = chunks[0].schema();
636
637 let columns: Vec<ColumnInfo> = arrow_sch
639 .fields()
640 .iter()
641 .map(|f| {
642 let dt = f.data_type();
643 ColumnInfo {
644 name: f.name().clone(),
645 logical: arrow_to_logical(dt),
646 sql_type: format!("{dt:?}"),
647 nullable: f.is_nullable(),
648 }
649 })
650 .collect();
651 let schema = DatasetSchema::new(&d.name, columns);
652
653 let index = build_eq_index_with_policy(&chunks, &d.index);
658
659 let n_parts = std::thread::available_parallelism()
664 .map(|n| n.get())
665 .unwrap_or(4);
666 let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
667 for (i, b) in chunks.iter().enumerate() {
668 if b.num_rows() == 0 {
669 continue;
670 }
671 parts[i % n_parts].push(b.clone());
672 }
673 parts.retain(|p| !p.is_empty());
674 let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
675
676 let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
677 let mem_mb: usize = chunks
678 .iter()
679 .flat_map(|b| b.columns().iter())
680 .map(|c| c.get_buffer_memory_size())
681 .sum::<usize>()
682 / 1_048_576;
683 log::info!(
684 "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
685 d.name,
686 d.source.kind.as_str(),
687 total_rows,
688 schema.columns.len(),
689 mem_mb,
690 chunks.len(),
691 index.len()
692 );
693
694 Ok((
695 DatasetState {
696 schema,
697 data: chunks,
698 arrow_schema: arrow_sch,
699 index,
700 lazy: false,
701 },
702 provider,
703 ))
704}
705
706async fn build_lazy_local_parquet(
711 d: &DatasetConfig,
712 ctx: &SessionContext,
713) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
714 let (url, part_keys) = lazy_local_listing(d)?;
715
716 let mut opts =
717 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
718 if !part_keys.is_empty() {
719 opts = opts.with_table_partition_cols(
720 part_keys
721 .iter()
722 .map(|k| (k.clone(), DataType::Utf8))
723 .collect(),
724 );
725 }
726
727 let session_state = ctx.state();
728 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
731 AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
732 })?;
733
734 let cfg = ListingTableConfig::new(url)
735 .with_listing_options(opts)
736 .with_schema(file_schema.clone());
737 let table = ListingTable::try_new(cfg).map_err(|e| {
738 AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
739 })?;
740 let provider: Arc<dyn TableProvider> = Arc::new(table);
741
742 let mut fields: Vec<Field> = file_schema
744 .fields()
745 .iter()
746 .map(|f| f.as_ref().clone())
747 .collect();
748 for k in &part_keys {
749 if !fields.iter().any(|f| f.name() == k) {
750 fields.push(Field::new(k, DataType::Utf8, false));
751 }
752 }
753 let arrow_sch = Arc::new(Schema::new(fields));
754
755 let columns: Vec<ColumnInfo> = arrow_sch
756 .fields()
757 .iter()
758 .map(|f| {
759 let dt = f.data_type();
760 ColumnInfo {
761 name: f.name().clone(),
762 logical: arrow_to_logical(dt),
763 sql_type: format!("{dt:?}"),
764 nullable: f.is_nullable(),
765 }
766 })
767 .collect();
768 let schema = DatasetSchema::new(&d.name, columns);
769
770 log::info!(
771 "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
772 d.name,
773 d.source.kind.as_str(),
774 schema.columns.len(),
775 part_keys.len()
776 );
777
778 Ok((
779 DatasetState {
780 schema,
781 data: Vec::new(),
782 arrow_schema: arrow_sch,
783 index: EqIndex::default(),
784 lazy: true,
785 },
786 provider,
787 ))
788}
789
790fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
795 let loc = &d.source.location;
796
797 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
798 let parts: Vec<&str> = loc.split('/').collect();
799 let first_wild = parts
800 .iter()
801 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
802 .unwrap_or(parts.len());
803 let base = parts[..first_wild].join("/");
804 let base = if base.is_empty() {
805 "/".to_string()
806 } else {
807 base
808 };
809 let upper = parts.len().saturating_sub(1);
812 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
813 .iter()
814 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
815 .filter(|k| !k.is_empty())
816 .collect();
817 return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
818 }
819
820 let path = std::path::Path::new(loc);
821 if path.is_dir() {
822 let keys = discover_hive_keys(path);
823 return Ok((dir_url(path, d)?, keys));
824 }
825
826 let url = ListingTableUrl::parse(loc)
827 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
828 Ok((url, Vec::new()))
829}
830
831fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
834 let s = path.to_str().ok_or_else(|| {
835 AppError::Internal(format!(
836 "dataset '{}': non-utf8 path {}",
837 d.name,
838 path.display()
839 ))
840 })?;
841 let s = if s.ends_with('/') {
842 s.to_string()
843 } else {
844 format!("{s}/")
845 };
846 ListingTableUrl::parse(&s)
847 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
848}
849
850fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
854 let mut keys = Vec::new();
855 let mut cur = base.to_path_buf();
856 loop {
857 let Ok(rd) = std::fs::read_dir(&cur) else {
858 break;
859 };
860 let mut next: Option<(String, std::path::PathBuf)> = None;
861 for entry in rd.flatten() {
862 let p = entry.path();
863 if !p.is_dir() {
864 continue;
865 }
866 let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
867 continue;
868 };
869 if let Some((k, v)) = name.split_once('=')
870 && !k.is_empty()
871 && !v.is_empty()
872 {
873 next = Some((k.to_string(), p));
874 break;
875 }
876 }
877 match next {
878 Some((k, p)) => {
879 keys.push(k);
880 cur = p;
881 }
882 None => break,
883 }
884 }
885 keys
886}
887
888async fn build_lazy_s3_parquet(
894 d: &DatasetConfig,
895 ctx: &SessionContext,
896) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
897 register_s3_object_store(d, ctx)?;
898
899 let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
900
901 let mut fields: Vec<Field> = file_schema
903 .fields()
904 .iter()
905 .map(|f| f.as_ref().clone())
906 .collect();
907 for k in &part_keys {
908 if !fields.iter().any(|f| f.name() == k) {
909 fields.push(Field::new(k, DataType::Utf8, false));
910 }
911 }
912 let arrow_sch = Arc::new(Schema::new(fields));
913
914 let columns: Vec<ColumnInfo> = arrow_sch
915 .fields()
916 .iter()
917 .map(|f| {
918 let dt = f.data_type();
919 ColumnInfo {
920 name: f.name().clone(),
921 logical: arrow_to_logical(dt),
922 sql_type: format!("{dt:?}"),
923 nullable: f.is_nullable(),
924 }
925 })
926 .collect();
927 let schema = DatasetSchema::new(&d.name, columns);
928
929 log::info!(
930 "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
931 d.name,
932 d.source.kind.as_str(),
933 schema.columns.len(),
934 part_keys.len()
935 );
936
937 Ok((
938 DatasetState {
939 schema,
940 data: Vec::new(),
941 arrow_schema: arrow_sch,
942 index: EqIndex::default(),
943 lazy: true,
944 },
945 provider,
946 ))
947}
948
949async fn build_s3_listing_table(
955 d: &DatasetConfig,
956 ctx: &SessionContext,
957) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
958 let (url, part_keys) = s3_listing(d, ctx).await?;
959
960 let mut opts =
961 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
962 if !part_keys.is_empty() {
963 opts = opts.with_table_partition_cols(
964 part_keys
965 .iter()
966 .map(|k| (k.clone(), DataType::Utf8))
967 .collect(),
968 );
969 }
970
971 let session_state = ctx.state();
972 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
973 AppError::Internal(format!(
974 "dataset '{}': infer parquet schema on s3: {e}",
975 d.name
976 ))
977 })?;
978
979 let cfg = ListingTableConfig::new(url)
980 .with_listing_options(opts)
981 .with_schema(file_schema.clone());
982 let table = ListingTable::try_new(cfg).map_err(|e| {
983 AppError::Internal(format!(
984 "dataset '{}': ListingTable::try_new (s3): {e}",
985 d.name
986 ))
987 })?;
988 Ok((Arc::new(table), file_schema, part_keys))
989}
990
991async fn s3_listing(
997 d: &DatasetConfig,
998 ctx: &SessionContext,
999) -> Result<(ListingTableUrl, Vec<String>), AppError> {
1000 let s3 = d.s3.clone().unwrap_or_default();
1001 let want_partitions = !matches!(s3.partitioning, Partitioning::None);
1002 let loc = &d.source.location;
1003
1004 if d.source.has_glob() {
1005 let (base, keys) = split_glob_base_keys(loc);
1006 let base = format!("{}/", base.trim_end_matches('/'));
1007 let url = ListingTableUrl::parse(&base).map_err(|e| {
1008 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1009 })?;
1010 let keys = if want_partitions { keys } else { Vec::new() };
1011 return Ok((url, keys));
1012 }
1013
1014 let base = if loc.ends_with('/') {
1015 loc.clone()
1016 } else {
1017 format!("{loc}/")
1018 };
1019 let url = ListingTableUrl::parse(&base).map_err(|e| {
1020 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1021 })?;
1022 let keys = if want_partitions {
1023 discover_s3_hive_keys(ctx, &url).await
1024 } else {
1025 Vec::new()
1026 };
1027 Ok((url, keys))
1028}
1029
1030fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
1034 let parts: Vec<&str> = loc.split('/').collect();
1035 let first_wild = parts
1036 .iter()
1037 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1038 .unwrap_or(parts.len());
1039 let base = parts[..first_wild].join("/");
1040 let base = if base.is_empty() {
1041 "/".to_string()
1042 } else {
1043 base
1044 };
1045 let upper = parts.len().saturating_sub(1);
1046 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1047 .iter()
1048 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1049 .filter(|k| !k.is_empty())
1050 .collect();
1051 (base, keys)
1052}
1053
1054async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
1059 let store = match ctx.runtime_env().object_store(url.object_store()) {
1060 Ok(s) => s,
1061 Err(_) => return Vec::new(),
1062 };
1063 let mut keys = Vec::new();
1064 let mut prefix = url.prefix().clone();
1065 loop {
1066 let listing = match store.list_with_delimiter(Some(&prefix)).await {
1067 Ok(l) => l,
1068 Err(_) => break,
1069 };
1070 let mut next: Option<object_store::path::Path> = None;
1071 for cp in &listing.common_prefixes {
1072 if let Some(seg) = cp.parts().next_back() {
1073 let seg = seg.as_ref().to_string();
1074 if let Some((k, v)) = seg.split_once('=')
1075 && !k.is_empty()
1076 && !v.is_empty()
1077 {
1078 keys.push(k.to_string());
1079 next = Some(cp.clone());
1080 break;
1081 }
1082 }
1083 }
1084 match next {
1085 Some(p) => prefix = p,
1086 None => break,
1087 }
1088 }
1089 keys
1090}
1091
1092fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1109 let files = d.resolve_local_parquet_files()?;
1110 let mut all = Vec::new();
1111 let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1112 None
1113 } else {
1114 Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1115 };
1116
1117 for f in &files {
1118 let file = std::fs::File::open(f)
1119 .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1120
1121 let probe = ParquetRecordBatchReaderBuilder::try_new(
1126 file.try_clone()
1127 .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1128 )?;
1129 let parquet_schema = probe.parquet_schema().clone();
1130 let arrow_schema = probe.schema().clone();
1131 let metadata = probe.metadata().clone();
1132 drop(probe);
1133
1134 let projection = if let Some(w) = &wanted {
1136 let indices: Vec<usize> = arrow_schema
1137 .fields()
1138 .iter()
1139 .enumerate()
1140 .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1141 .map(|(i, _)| i)
1142 .collect();
1143 if indices.is_empty() {
1144 return Err(AppError::Internal(format!(
1145 "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1146 d.name,
1147 d.columns,
1148 f.display()
1149 )));
1150 }
1151 ProjectionMask::roots(&parquet_schema, indices)
1152 } else {
1153 ProjectionMask::all()
1154 };
1155
1156 let mut new_fields: Vec<Field> = arrow_schema
1164 .fields()
1165 .iter()
1166 .map(|f| f.as_ref().clone())
1167 .collect();
1168 if d.dict_encode
1169 && let Some(rg0) = metadata.row_groups().first()
1170 {
1171 for (i, fld) in arrow_schema.fields().iter().enumerate() {
1172 if !matches!(
1173 fld.data_type(),
1174 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1175 ) {
1176 continue;
1177 }
1178 if let Some(col) = rg0.columns().get(i)
1179 && col.dictionary_page_offset().is_some()
1180 {
1181 new_fields[i] = Field::new(
1182 fld.name(),
1183 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1184 fld.is_nullable(),
1185 );
1186 }
1187 }
1188 }
1189 let forced_schema = Arc::new(Schema::new(new_fields));
1190
1191 let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1192 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1193 .with_batch_size(65_536)
1194 .with_projection(projection)
1195 .build()?;
1196 let pairs = hive_pairs(f);
1200 for batch in reader {
1201 let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1202 all.push(if pairs.is_empty() {
1203 batch
1204 } else {
1205 append_partition_cols(&batch, &pairs)?
1206 });
1207 }
1208 }
1209 if all.is_empty() {
1210 return Err(AppError::Internal(format!(
1211 "dataset '{}': parquet source is empty",
1212 d.name
1213 )));
1214 }
1215 Ok(all)
1216}
1217
1218fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1221 path.components()
1222 .filter_map(|c| c.as_os_str().to_str())
1223 .filter_map(|seg| {
1224 let (k, v) = seg.split_once('=')?;
1225 if k.is_empty() || v.is_empty() || v.contains('=') {
1226 return None;
1227 }
1228 Some((k.to_string(), v.to_string()))
1229 })
1230 .collect()
1231}
1232
1233fn append_partition_cols(
1236 batch: &RecordBatch,
1237 pairs: &[(String, String)],
1238) -> Result<RecordBatch, AppError> {
1239 let n = batch.num_rows();
1240 let mut fields: Vec<Field> = batch
1241 .schema()
1242 .fields()
1243 .iter()
1244 .map(|f| f.as_ref().clone())
1245 .collect();
1246 let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1247 for (k, v) in pairs {
1248 if fields.iter().any(|f| f.name() == k) {
1249 continue;
1250 }
1251 fields.push(Field::new(k, DataType::Utf8, false));
1252 cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1253 }
1254 RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1255 .map_err(|e| AppError::Internal(e.to_string()))
1256}
1257
1258async fn read_s3_parquet(
1264 d: &DatasetConfig,
1265 ctx: &SessionContext,
1266) -> Result<Vec<RecordBatch>, AppError> {
1267 register_s3_object_store(d, ctx)?;
1268 let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1269 let df = ctx
1270 .read_table(provider)
1271 .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1272 Ok(df.collect().await?)
1273}
1274
1275async fn read_delta(
1279 d: &DatasetConfig,
1280 opts: HashMap<String, String>,
1281) -> Result<Vec<RecordBatch>, AppError> {
1282 let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1283 AppError::Internal(format!(
1284 "dataset '{}': bad delta location '{}': {e}",
1285 d.name, d.source.location
1286 ))
1287 })?;
1288 let table = deltalake::open_table_with_storage_options(url, opts)
1289 .await
1290 .map_err(|e| {
1291 AppError::Internal(format!(
1292 "dataset '{}': delta open '{}': {e}",
1293 d.name, d.source.location
1294 ))
1295 })?;
1296 let provider = table.table_provider().await.map_err(|e| {
1297 AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1298 })?;
1299 let scan_ctx = SessionContext::new();
1302 let df = scan_ctx
1303 .read_table(provider)
1304 .map_err(|e| AppError::Internal(format!("dataset '{}': delta read_table: {e}", d.name)))?;
1305 Ok(df.collect().await?)
1306}
1307
1308fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1312 let creds = d.resolved_creds();
1313 let region = d.resolved_region();
1314 let s3 = d.s3.clone().unwrap_or_default();
1315 let (bucket, _) = d.source.s3_bucket()?;
1316
1317 let mut opts = HashMap::new();
1318 opts.insert("AWS_REGION".into(), region);
1319 if let Some(ep) = s3.effective_endpoint(bucket) {
1320 opts.insert("AWS_ENDPOINT_URL".into(), ep);
1321 }
1322 if s3.allow_http {
1323 opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1324 }
1325 opts.insert(
1326 "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1327 (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1328 );
1329 if let Some(k) = creds.access_key_id {
1330 opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1331 }
1332 if let Some(s) = creds.secret_access_key {
1333 opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1334 }
1335 if let Some(t) = creds.session_token {
1336 opts.insert("AWS_SESSION_TOKEN".into(), t);
1337 }
1338 opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1340 Ok(opts)
1341}
1342
1343fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1347 let (bucket, _key) = d.source.s3_bucket()?;
1348 let creds = d.resolved_creds();
1349 let region = d.resolved_region();
1350 let s3 = d.s3.clone().unwrap_or_default();
1351
1352 let store = build_s3(bucket, ®ion, &s3, &creds).map_err(|e| {
1353 AppError::Internal(format!(
1354 "dataset '{}': build S3 store for '{bucket}': {e}",
1355 d.name
1356 ))
1357 })?;
1358
1359 let url = Url::parse(&format!("s3://{bucket}"))
1360 .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1361 ctx.register_object_store(&url, Arc::new(store));
1362 Ok(())
1363}
1364
1365fn build_s3(
1366 bucket: &str,
1367 region: &str,
1368 s3: &S3Config,
1369 creds: &ResolvedCreds,
1370) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1371 let mut b = AmazonS3Builder::new()
1372 .with_bucket_name(bucket)
1373 .with_region(region)
1374 .with_allow_http(s3.allow_http)
1375 .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1376 if let Some(ep) = s3.effective_endpoint(bucket) {
1377 b = b.with_endpoint(ep);
1378 }
1379 if let Some(k) = creds.access_key_id.as_deref() {
1380 b = b.with_access_key_id(k);
1381 }
1382 if let Some(s) = creds.secret_access_key.as_deref() {
1383 b = b.with_secret_access_key(s);
1384 }
1385 if let Some(t) = creds.session_token.as_deref() {
1386 b = b.with_token(t);
1387 }
1388 b.build()
1389}
1390
1391fn arrow_to_logical(dt: &DataType) -> LogicalType {
1392 match dt {
1393 DataType::Boolean => LogicalType::Bool,
1394 DataType::Int8
1395 | DataType::Int16
1396 | DataType::Int32
1397 | DataType::Int64
1398 | DataType::UInt8
1399 | DataType::UInt16
1400 | DataType::UInt32
1401 | DataType::UInt64 => LogicalType::Int,
1402 DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1403 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1404 DataType::Dictionary(_, v)
1408 if matches!(
1409 v.as_ref(),
1410 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1411 ) =>
1412 {
1413 LogicalType::Utf8
1414 }
1415 DataType::Date32
1416 | DataType::Date64
1417 | DataType::Time32(_)
1418 | DataType::Time64(_)
1419 | DataType::Timestamp(_, _)
1420 | DataType::Duration(_)
1421 | DataType::Interval(_) => LogicalType::Temporal,
1422 _ => LogicalType::Other,
1423 }
1424}
1425
1426fn project(
1431 schema: &DatasetSchema,
1432 batch: RecordBatch,
1433 columns: &[String],
1434) -> Result<RecordBatch, AppError> {
1435 if columns.is_empty() {
1436 return Ok(batch);
1437 }
1438 let indices: Vec<usize> = columns
1439 .iter()
1440 .map(|c| {
1441 schema
1442 .find(c)
1443 .map(|info| schema.by_name[&info.name.to_lowercase()])
1444 })
1445 .collect::<Result<_, _>>()?;
1446 let fields: Vec<Field> = indices
1447 .iter()
1448 .map(|&i| batch.schema().field(i).clone())
1449 .collect();
1450 let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
1451 Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
1452}
1453
1454#[derive(Default)]
1467struct Params {
1468 values: Vec<ScalarValue>,
1469}
1470
1471impl Params {
1472 fn new() -> Self {
1473 Self::default()
1474 }
1475
1476 fn bind(&mut self, v: ScalarValue) -> String {
1478 self.values.push(v);
1479 format!("${}", self.values.len())
1480 }
1481
1482 fn into_values(self) -> Vec<ScalarValue> {
1483 self.values
1484 }
1485}
1486
1487fn build_query_sql(
1488 schema: &DatasetSchema,
1489 req: &QueryRequest,
1490 max_page_size: u64,
1491) -> Result<(String, Vec<ScalarValue>), AppError> {
1492 let (limit, offset) = req.effective_limit_offset(max_page_size);
1493 build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
1494}
1495
1496fn build_query_stream_sql(
1497 schema: &DatasetSchema,
1498 req: &QueryRequest,
1499) -> Result<(String, Vec<ScalarValue>), AppError> {
1500 let suffix = req
1501 .limit
1502 .map(|limit| format!(" LIMIT {limit}"))
1503 .unwrap_or_default();
1504 build_query_sql_with_suffix(schema, req, &suffix)
1505}
1506
1507fn build_query_sql_with_suffix(
1508 schema: &DatasetSchema,
1509 req: &QueryRequest,
1510 suffix: &str,
1511) -> Result<(String, Vec<ScalarValue>), AppError> {
1512 let agg_plan = req.agg_plan(schema)?;
1513
1514 let cols = if let Some(plan) = &agg_plan {
1515 let mut parts: Vec<String> = plan
1517 .group_cols
1518 .iter()
1519 .map(|c| DatasetSchema::quote_ident(c))
1520 .collect();
1521 for a in &plan.aggs {
1522 let expr = a.sql_expr()?;
1523 parts.push(format!(
1524 "{expr} AS {}",
1525 DatasetSchema::quote_ident(&a.alias)
1526 ));
1527 }
1528 parts.join(", ")
1529 } else if req.columns.is_empty() {
1530 if req.distinct {
1531 "DISTINCT *".to_string()
1532 } else {
1533 "*".to_string()
1534 }
1535 } else {
1536 let list = req
1537 .columns
1538 .iter()
1539 .map(|c| {
1540 schema
1541 .find(c)
1542 .map(|info| DatasetSchema::quote_ident(&info.name))
1543 })
1544 .collect::<Result<Vec<_>, _>>()?
1545 .join(", ");
1546 if req.distinct {
1547 format!("DISTINCT {list}")
1548 } else {
1549 list
1550 }
1551 };
1552
1553 let mut params = Params::new();
1554 let clauses: Vec<String> = req
1555 .predicates
1556 .iter()
1557 .map(|p| pred_to_sql(schema, p, &mut params))
1558 .collect::<Result<_, _>>()?;
1559
1560 let table = DatasetSchema::quote_ident(&schema.name);
1561 let where_clause = if clauses.is_empty() {
1562 String::new()
1563 } else {
1564 format!(" WHERE {}", clauses.join(" AND "))
1565 };
1566 let group_clause = match &agg_plan {
1567 Some(p) => format!(
1568 " GROUP BY {}",
1569 p.group_cols
1570 .iter()
1571 .map(|c| DatasetSchema::quote_ident(c))
1572 .collect::<Vec<_>>()
1573 .join(", "),
1574 ),
1575 None => String::new(),
1576 };
1577 let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
1578 Some(s) => format!(" ORDER BY {s}"),
1579 None => String::new(),
1580 };
1581 let sql =
1582 format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{order_clause}{suffix}");
1583 Ok((sql, params.into_values()))
1584}
1585
1586fn build_count_sql(
1587 schema: &DatasetSchema,
1588 predicates: &[Predicate],
1589) -> Result<(String, Vec<ScalarValue>), AppError> {
1590 let mut params = Params::new();
1591 let clauses: Vec<String> = predicates
1592 .iter()
1593 .map(|p| pred_to_sql(schema, p, &mut params))
1594 .collect::<Result<_, _>>()?;
1595 let table = DatasetSchema::quote_ident(&schema.name);
1596 let where_clause = if clauses.is_empty() {
1597 String::new()
1598 } else {
1599 format!(" WHERE {}", clauses.join(" AND "))
1600 };
1601 let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
1602 Ok((sql, params.into_values()))
1603}
1604
1605fn pred_to_sql(
1606 schema: &DatasetSchema,
1607 pred: &Predicate,
1608 params: &mut Params,
1609) -> Result<String, AppError> {
1610 let info = schema.find(&pred.col)?;
1611 let col = DatasetSchema::quote_ident(&info.name);
1612
1613 match pred.op.as_str() {
1614 "is_null" => return Ok(format!("{col} IS NULL")),
1615 "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
1616 _ => {}
1617 }
1618
1619 let val = pred
1620 .val
1621 .as_ref()
1622 .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
1623
1624 if pred.op == "in" {
1625 let items = val
1626 .as_array()
1627 .filter(|a| !a.is_empty())
1628 .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
1629 let placeholders: Vec<String> = items
1630 .iter()
1631 .map(|item| Ok(params.bind(json_to_scalar(item)?)))
1632 .collect::<Result<_, AppError>>()?;
1633 return Ok(format!("{col} IN ({})", placeholders.join(", ")));
1634 }
1635
1636 let sql_op = match pred.op.as_str() {
1637 "eq" => "=",
1638 "neq" => "!=",
1639 "gt" => ">",
1640 "gte" => ">=",
1641 "lt" => "<",
1642 "lte" => "<=",
1643 "like" => "LIKE",
1644 "ilike" => "ILIKE",
1645 other => return Err(AppError::UnknownOperator(other.into())),
1646 };
1647 let placeholder = params.bind(json_to_scalar(val)?);
1648 Ok(format!("{col} {sql_op} {placeholder}"))
1649}
1650
1651fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
1655 match val {
1656 JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
1657 JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
1658 JsonValue::Null => Ok(ScalarValue::Null),
1659 JsonValue::Number(n) => {
1660 if let Some(i) = n.as_i64() {
1661 Ok(ScalarValue::Int64(Some(i)))
1662 } else if let Some(u) = n.as_u64() {
1663 Ok(ScalarValue::UInt64(Some(u)))
1664 } else if let Some(f) = n.as_f64() {
1665 Ok(ScalarValue::Float64(Some(f)))
1666 } else {
1667 Err(AppError::InvalidValue(
1668 "unsupported numeric literal in predicate".into(),
1669 ))
1670 }
1671 }
1672 _ => Err(AppError::InvalidValue(
1673 "unsupported literal type in predicate".into(),
1674 )),
1675 }
1676}
1677
1678fn json_index_key(val: &JsonValue) -> Option<String> {
1683 match val {
1684 JsonValue::String(s) => Some(s.clone()),
1685 JsonValue::Number(n) => Some(n.to_string()),
1686 JsonValue::Bool(b) => Some(b.to_string()),
1687 _ => None,
1688 }
1689}
1690
1691fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1692 let mut out = Vec::new();
1693 let (mut i, mut j) = (0, 0);
1694 while i < a.len() && j < b.len() {
1695 match a[i].cmp(&b[j]) {
1696 Ordering::Equal => {
1697 out.push(a[i]);
1698 i += 1;
1699 j += 1;
1700 }
1701 Ordering::Less => i += 1,
1702 Ordering::Greater => j += 1,
1703 }
1704 }
1705 out
1706}
1707
1708fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1709 let mut out = Vec::with_capacity(a.len() + b.len());
1710 let (mut i, mut j) = (0, 0);
1711 while i < a.len() && j < b.len() {
1712 match a[i].cmp(&b[j]) {
1713 Ordering::Less => {
1714 out.push(a[i]);
1715 i += 1;
1716 }
1717 Ordering::Greater => {
1718 out.push(b[j]);
1719 j += 1;
1720 }
1721 Ordering::Equal => {
1722 out.push(a[i]);
1723 i += 1;
1724 j += 1;
1725 }
1726 }
1727 }
1728 out.extend_from_slice(&a[i..]);
1729 out.extend_from_slice(&b[j..]);
1730 out
1731}
1732
1733fn try_index<'a>(index: &'a EqIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
1734 if predicates.is_empty() || index.is_empty() {
1735 return None;
1736 }
1737
1738 if let [pred] = predicates
1741 && pred.op.as_str() == "eq"
1742 {
1743 let col_lower = pred.col.to_lowercase();
1744 let col_map = index.get(&col_lower)?;
1745 let key = json_index_key(pred.val.as_ref()?)?;
1746 return Some(match col_map.get(&key) {
1747 Some(rows) => Cow::Borrowed(rows.as_slice()),
1748 None => Cow::Owned(Vec::new()),
1749 });
1750 }
1751
1752 let mut result: Option<Vec<u32>> = None;
1753 for pred in predicates {
1754 let col_lower = pred.col.to_lowercase();
1755 let col_map = index.get(&col_lower)?;
1756
1757 let rows: Vec<u32> = match pred.op.as_str() {
1758 "eq" => {
1759 let key = json_index_key(pred.val.as_ref()?)?;
1760 col_map.get(&key).cloned().unwrap_or_default()
1761 }
1762 "in" => {
1763 let items = pred.val.as_ref()?.as_array()?;
1764 let mut merged: Vec<u32> = Vec::new();
1765 for item in items {
1766 if let Some(r) = col_map.get(&json_index_key(item)?) {
1767 merged = union_sorted(&merged, r);
1768 }
1769 }
1770 merged
1771 }
1772 _ => return None,
1773 };
1774
1775 result = Some(match result {
1776 None => rows,
1777 Some(r) => intersect_sorted(&r, &rows),
1778 });
1779 }
1780 result.map(Cow::Owned)
1781}
1782
1783#[doc(hidden)]
1787pub mod bench {
1788 use super::{EqIndex, FastMap, json_index_key, try_index};
1789 use datapress_core::models::Predicate;
1790 use serde_json::Value as JsonValue;
1791 use std::borrow::Cow;
1792
1793 pub struct BenchIndex(EqIndex);
1795
1796 pub fn single_bucket_index(col: &str, val: &JsonValue, rows: Vec<u32>) -> BenchIndex {
1800 let key = json_index_key(val).expect("benchable index key");
1801 let mut col_map: FastMap<String, Vec<u32>> = FastMap::default();
1802 col_map.insert(key, rows);
1803 let mut index: EqIndex = EqIndex::default();
1804 index.insert(col.to_string(), col_map);
1805 BenchIndex(index)
1806 }
1807
1808 pub fn lookup<'a>(idx: &'a BenchIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
1810 try_index(&idx.0, predicates)
1811 }
1812
1813 pub fn lookup_cloning(idx: &BenchIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
1818 let [pred] = predicates else { return None };
1819 if pred.op.as_str() != "eq" {
1820 return None;
1821 }
1822 let col_lower = pred.col.to_lowercase();
1823 let col_map = idx.0.get(&col_lower)?;
1824 let key = json_index_key(pred.val.as_ref()?)?;
1825 Some(col_map.get(&key).cloned().unwrap_or_default())
1826 }
1827}
1828
1829fn slice_global(
1832 chunks: &[RecordBatch],
1833 schema: &Arc<Schema>,
1834 offset: usize,
1835 limit: usize,
1836) -> Result<RecordBatch, AppError> {
1837 if limit == 0 || chunks.is_empty() {
1838 return Ok(RecordBatch::new_empty(schema.clone()));
1839 }
1840 let mut out = Vec::new();
1841 let mut to_skip = offset;
1842 let mut remaining = limit;
1843 for b in chunks {
1844 if remaining == 0 {
1845 break;
1846 }
1847 let n = b.num_rows();
1848 if to_skip >= n {
1849 to_skip -= n;
1850 continue;
1851 }
1852 let take = remaining.min(n - to_skip);
1853 out.push(b.slice(to_skip, take));
1854 to_skip = 0;
1855 remaining -= take;
1856 }
1857 if out.is_empty() {
1858 return Ok(RecordBatch::new_empty(schema.clone()));
1859 }
1860 compute::concat_batches(schema, out.iter()).map_err(AppError::from)
1861}
1862
1863fn take_page(
1868 chunks: &[RecordBatch],
1869 schema: &Arc<Schema>,
1870 rows: &[u32],
1871 offset: usize,
1872 limit: usize,
1873) -> Result<RecordBatch, AppError> {
1874 let start = offset.min(rows.len());
1875 let len = limit.min(rows.len() - start);
1876 if len == 0 || chunks.is_empty() {
1877 return Ok(RecordBatch::new_empty(schema.clone()));
1878 }
1879
1880 let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
1883 let mut acc: u32 = 0;
1884 offsets.push(0);
1885 for b in chunks {
1886 acc = acc
1887 .checked_add(b.num_rows() as u32)
1888 .expect("row count exceeds u32::MAX");
1889 offsets.push(acc);
1890 }
1891
1892 let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
1895 for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
1896 let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
1897 let local = gid - offsets[bi];
1898 buckets[bi].push((out_pos as u32, local));
1899 }
1900
1901 let mut takens: Vec<RecordBatch> = Vec::new();
1903 let mut dest: Vec<u32> = Vec::with_capacity(len);
1904 for (bi, bucket) in buckets.iter().enumerate() {
1905 if bucket.is_empty() {
1906 continue;
1907 }
1908 let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
1909 let cols: Vec<ArrayRef> = chunks[bi]
1910 .columns()
1911 .iter()
1912 .map(|c| {
1913 arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
1914 .map_err(AppError::from)
1915 })
1916 .collect::<Result<_, _>>()?;
1917 takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
1918 dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
1919 }
1920
1921 let stitched = compute::concat_batches(schema, takens.iter())?;
1923 let mut inv = vec![0u32; len];
1924 for (i, &d) in dest.iter().enumerate() {
1925 inv[d as usize] = i as u32;
1926 }
1927 let perm = UInt32Array::from(inv);
1928 let cols: Vec<ArrayRef> = stitched
1929 .columns()
1930 .iter()
1931 .map(|c| {
1932 arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
1933 .map_err(AppError::from)
1934 })
1935 .collect::<Result<_, _>>()?;
1936 RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
1937}
1938
1939fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
1943 use rayon::prelude::*;
1944
1945 if cfg.mode == IndexMode::None || chunks.is_empty() {
1946 return EqIndex::default();
1947 }
1948
1949 let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
1950 Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
1951 } else {
1952 None
1953 };
1954
1955 let max_card = if cfg.mode == IndexMode::Auto {
1956 Some(cfg.max_cardinality)
1957 } else {
1958 None
1959 };
1960
1961 let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
1963 let mut acc: u32 = 0;
1964 for b in chunks {
1965 batch_offsets.push(acc);
1966 acc = acc
1967 .checked_add(b.num_rows() as u32)
1968 .expect("row count exceeds u32::MAX");
1969 }
1970
1971 let schema = chunks[0].schema();
1972
1973 schema
1974 .fields()
1975 .par_iter()
1976 .enumerate()
1977 .filter_map(|(ci, field)| {
1978 let col_lower = field.name().to_lowercase();
1979 if let Some(a) = &allow
1980 && !a.contains_key(&col_lower)
1981 {
1982 return None;
1983 }
1984
1985 let dtype = field.data_type();
1988 let dict_utf8 = matches!(dtype,
1989 DataType::Dictionary(k, v)
1990 if matches!(k.as_ref(), DataType::Int32)
1991 && matches!(v.as_ref(), DataType::Utf8));
1992 match dtype {
1993 DataType::Utf8
1994 | DataType::Utf8View
1995 | DataType::Boolean
1996 | DataType::Int8
1997 | DataType::Int16
1998 | DataType::Int32
1999 | DataType::Int64 => {}
2000 _ if dict_utf8 => {}
2001 _ => return None,
2002 }
2003
2004 let mut map: FastMap<String, Vec<u32>> = FastMap::default();
2005
2006 for (bi, batch) in chunks.iter().enumerate() {
2007 let base = batch_offsets[bi];
2008 let col = batch.column(ci);
2009
2010 macro_rules! index_col {
2011 ($arr_ty:ty) => {{
2012 let arr = col.as_any().downcast_ref::<$arr_ty>()?;
2013 for row in 0..arr.len() {
2014 if arr.is_null(row) {
2015 continue;
2016 }
2017 let key = arr.value(row).to_string();
2018 let gid = base + row as u32;
2019 if let Some(v) = map.get_mut(&key) {
2020 v.push(gid);
2021 } else {
2022 if let Some(mc) = max_card {
2023 if map.len() >= mc {
2024 return None;
2025 }
2026 }
2027 map.insert(key, vec![gid]);
2028 }
2029 }
2030 }};
2031 }
2032
2033 if dict_utf8 {
2034 let arr = col
2041 .as_any()
2042 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
2043 )?;
2044 let keys = arr.keys();
2045 let values = arr.values().as_any().downcast_ref::<StringArray>()?;
2046 for row in 0..arr.len() {
2047 if arr.is_null(row) {
2048 continue;
2049 }
2050 let k = keys.value(row) as usize;
2051 let s = values.value(k);
2052 let gid = base + row as u32;
2053 if let Some(v) = map.get_mut(s) {
2054 v.push(gid);
2055 } else {
2056 if let Some(mc) = max_card
2057 && map.len() >= mc
2058 {
2059 return None;
2060 }
2061 map.insert(s.to_string(), vec![gid]);
2062 }
2063 }
2064 } else {
2065 match dtype {
2066 DataType::Utf8 => index_col!(StringArray),
2067 DataType::Utf8View => index_col!(StringViewArray),
2068 DataType::Boolean => index_col!(BooleanArray),
2069 DataType::Int8 => index_col!(Int8Array),
2070 DataType::Int16 => index_col!(Int16Array),
2071 DataType::Int32 => index_col!(Int32Array),
2072 DataType::Int64 => index_col!(Int64Array),
2073 _ => unreachable!(),
2074 }
2075 }
2076 }
2077
2078 Some((col_lower, map))
2079 })
2080 .collect()
2081}
2082
2083fn writable_inline(dt: &DataType) -> bool {
2096 match dt {
2097 DataType::Utf8
2098 | DataType::LargeUtf8
2099 | DataType::Utf8View
2100 | DataType::Boolean
2101 | DataType::Int8
2102 | DataType::Int16
2103 | DataType::Int32
2104 | DataType::Int64
2105 | DataType::UInt8
2106 | DataType::UInt16
2107 | DataType::UInt32
2108 | DataType::UInt64
2109 | DataType::Float32
2110 | DataType::Float64
2111 | DataType::Decimal128(_, _)
2112 | DataType::Decimal256(_, _) => true,
2113 DataType::Dictionary(k, v)
2114 if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
2115 {
2116 true
2117 }
2118 _ => false,
2119 }
2120}
2121
2122fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
2128 let schema = batch.schema();
2129 let to_cast: Vec<usize> = schema
2130 .fields()
2131 .iter()
2132 .enumerate()
2133 .filter_map(|(i, f)| {
2134 if writable_inline(f.data_type()) {
2135 None
2136 } else {
2137 Some(i)
2138 }
2139 })
2140 .collect();
2141 if to_cast.is_empty() {
2142 return Ok(batch.clone());
2143 }
2144 let new_fields: Vec<Field> = schema
2145 .fields()
2146 .iter()
2147 .enumerate()
2148 .map(|(i, f)| {
2149 if to_cast.contains(&i) {
2150 Field::new(f.name(), DataType::Utf8, f.is_nullable())
2151 } else {
2152 f.as_ref().clone()
2153 }
2154 })
2155 .collect();
2156 let new_schema = Arc::new(Schema::new(new_fields));
2157 let cols: Vec<ArrayRef> = batch
2158 .columns()
2159 .iter()
2160 .enumerate()
2161 .map(|(i, c)| {
2162 if to_cast.contains(&i) {
2163 compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2164 } else {
2165 Ok(c.clone())
2166 }
2167 })
2168 .collect::<Result<_, _>>()?;
2169 RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2170}
2171
2172#[allow(dead_code)]
2178#[derive(Clone, Copy)]
2179enum CmpOp {
2180 Eq,
2181 Neq,
2182 Gt,
2183 Gte,
2184 Lt,
2185 Lte,
2186 Like,
2187 ILike,
2188}
2189
2190#[allow(dead_code)]
2191fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2192 let arr = col
2193 .as_any()
2194 .downcast_ref::<StringArray>()
2195 .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2196 let s = Scalar::new(StringArray::from(vec![val]));
2197 Ok(eq(arr, &s)?)
2198}
2199
2200#[allow(dead_code)]
2201fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2202 macro_rules! num_cmp {
2203 ($arr_type:ty, $cast:ty) => {{
2204 let n = val
2205 .as_f64()
2206 .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2207 as $cast;
2208 let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2209 let s = Scalar::new(<$arr_type>::from(vec![n]));
2210 Ok(match op {
2211 CmpOp::Eq => eq(arr, &s)?,
2212 CmpOp::Neq => neq(arr, &s)?,
2213 CmpOp::Gt => gt(arr, &s)?,
2214 CmpOp::Gte => gt_eq(arr, &s)?,
2215 CmpOp::Lt => lt(arr, &s)?,
2216 CmpOp::Lte => lt_eq(arr, &s)?,
2217 CmpOp::Like | CmpOp::ILike => {
2218 return Err(AppError::InvalidValue(
2219 "LIKE requires a string column".into(),
2220 ));
2221 }
2222 })
2223 }};
2224 }
2225 match col.data_type() {
2226 DataType::Utf8 => {
2227 let s = val
2228 .as_str()
2229 .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
2230 let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
2231 let sc = Scalar::new(StringArray::from(vec![s]));
2232 Ok(match op {
2233 CmpOp::Eq => eq(arr, &sc)?,
2234 CmpOp::Neq => neq(arr, &sc)?,
2235 CmpOp::Gt => gt(arr, &sc)?,
2236 CmpOp::Gte => gt_eq(arr, &sc)?,
2237 CmpOp::Lt => lt(arr, &sc)?,
2238 CmpOp::Lte => lt_eq(arr, &sc)?,
2239 CmpOp::Like => compute::like(arr, &sc)?,
2240 CmpOp::ILike => compute::ilike(arr, &sc)?,
2241 })
2242 }
2243 DataType::Int8 => num_cmp!(Int8Array, i8),
2244 DataType::Int16 => num_cmp!(Int16Array, i16),
2245 DataType::Int32 => num_cmp!(Int32Array, i32),
2246 DataType::Int64 => num_cmp!(Int64Array, i64),
2247 DataType::Float32 => num_cmp!(Float32Array, f32),
2248 DataType::Float64 => num_cmp!(Float64Array, f64),
2249 dt => Err(AppError::InvalidValue(format!(
2250 "unsupported type for comparison: {dt:?}"
2251 ))),
2252 }
2253}
2254
2255pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
2260 let batch = cast_for_serialize(batch)?;
2265 let schema = batch.schema();
2266 let n_rows = batch.num_rows();
2267
2268 let keys: Vec<Vec<u8>> = schema
2269 .fields()
2270 .iter()
2271 .map(|f| {
2272 let mut k = Vec::with_capacity(f.name().len() + 3);
2273 k.push(b'"');
2274 k.extend_from_slice(f.name().as_bytes());
2275 k.extend_from_slice(b"\":");
2276 k
2277 })
2278 .collect();
2279
2280 let encoders: Vec<ColEnc> = batch
2285 .columns()
2286 .iter()
2287 .map(|c| ColEnc::new(c.as_ref()))
2288 .collect();
2289
2290 let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
2291 let mut itoa_buf = itoa::Buffer::new();
2292 let mut ryu_buf = ryu::Buffer::new();
2293 buf.push(b'[');
2294
2295 for row in 0..n_rows {
2296 if row > 0 {
2297 buf.push(b',');
2298 }
2299 buf.push(b'{');
2300 for (i, (key, enc)) in keys.iter().zip(encoders.iter()).enumerate() {
2301 if i > 0 {
2302 buf.push(b',');
2303 }
2304 buf.extend_from_slice(key);
2305 enc.write(&mut buf, row, &mut itoa_buf, &mut ryu_buf);
2306 }
2307 buf.push(b'}');
2308 }
2309
2310 buf.push(b']');
2311 Ok(unsafe { String::from_utf8_unchecked(buf) })
2312}
2313
2314enum ColEnc<'a> {
2319 Utf8(&'a StringArray),
2320 LargeUtf8(&'a LargeStringArray),
2321 Utf8View(&'a StringViewArray),
2322 DictI32Utf8(
2325 &'a arrow::array::DictionaryArray<arrow::datatypes::Int32Type>,
2326 &'a StringArray,
2327 ),
2328 Bool(&'a BooleanArray),
2329 I8(&'a Int8Array),
2330 I16(&'a Int16Array),
2331 I32(&'a Int32Array),
2332 I64(&'a Int64Array),
2333 U8(&'a UInt8Array),
2334 U16(&'a UInt16Array),
2335 U32(&'a UInt32Array),
2336 U64(&'a UInt64Array),
2337 Dec128(&'a Decimal128Array),
2338 Dec256(&'a Decimal256Array),
2339 F32(&'a Float32Array),
2340 F64(&'a Float64Array),
2341 Other(&'a dyn Array),
2343}
2344
2345impl<'a> ColEnc<'a> {
2346 fn new(col: &'a dyn Array) -> ColEnc<'a> {
2347 macro_rules! dc {
2348 ($t:ty) => {
2349 col.as_any().downcast_ref::<$t>().unwrap()
2350 };
2351 }
2352 match col.data_type() {
2353 DataType::Utf8 => ColEnc::Utf8(dc!(StringArray)),
2354 DataType::LargeUtf8 => ColEnc::LargeUtf8(dc!(LargeStringArray)),
2355 DataType::Utf8View => ColEnc::Utf8View(dc!(StringViewArray)),
2356 DataType::Dictionary(key, value)
2357 if matches!(key.as_ref(), DataType::Int32)
2358 && matches!(value.as_ref(), DataType::Utf8) =>
2359 {
2360 let dict = dc!(arrow::array::DictionaryArray<arrow::datatypes::Int32Type>);
2361 let values = dict
2362 .values()
2363 .as_any()
2364 .downcast_ref::<StringArray>()
2365 .unwrap();
2366 ColEnc::DictI32Utf8(dict, values)
2367 }
2368 DataType::Boolean => ColEnc::Bool(dc!(BooleanArray)),
2369 DataType::Int8 => ColEnc::I8(dc!(Int8Array)),
2370 DataType::Int16 => ColEnc::I16(dc!(Int16Array)),
2371 DataType::Int32 => ColEnc::I32(dc!(Int32Array)),
2372 DataType::Int64 => ColEnc::I64(dc!(Int64Array)),
2373 DataType::UInt8 => ColEnc::U8(dc!(UInt8Array)),
2374 DataType::UInt16 => ColEnc::U16(dc!(UInt16Array)),
2375 DataType::UInt32 => ColEnc::U32(dc!(UInt32Array)),
2376 DataType::UInt64 => ColEnc::U64(dc!(UInt64Array)),
2377 DataType::Decimal128(_, _) => ColEnc::Dec128(dc!(Decimal128Array)),
2378 DataType::Decimal256(_, _) => ColEnc::Dec256(dc!(Decimal256Array)),
2379 DataType::Float32 => ColEnc::F32(dc!(Float32Array)),
2380 DataType::Float64 => ColEnc::F64(dc!(Float64Array)),
2381 _ => ColEnc::Other(col),
2382 }
2383 }
2384
2385 #[inline]
2386 fn write(
2387 &self,
2388 buf: &mut Vec<u8>,
2389 row: usize,
2390 itoa_buf: &mut itoa::Buffer,
2391 ryu_buf: &mut ryu::Buffer,
2392 ) {
2393 macro_rules! int {
2394 ($arr:expr) => {{
2395 if $arr.is_null(row) {
2396 buf.extend_from_slice(b"null");
2397 } else {
2398 buf.extend_from_slice(itoa_buf.format($arr.value(row)).as_bytes());
2399 }
2400 }};
2401 }
2402 match self {
2403 ColEnc::Utf8(a) => {
2404 if a.is_null(row) {
2405 buf.extend_from_slice(b"null");
2406 } else {
2407 write_str(buf, a.value(row));
2408 }
2409 }
2410 ColEnc::LargeUtf8(a) => {
2411 if a.is_null(row) {
2412 buf.extend_from_slice(b"null");
2413 } else {
2414 write_str(buf, a.value(row));
2415 }
2416 }
2417 ColEnc::Utf8View(a) => {
2418 if a.is_null(row) {
2419 buf.extend_from_slice(b"null");
2420 } else {
2421 write_str(buf, a.value(row));
2422 }
2423 }
2424 ColEnc::DictI32Utf8(keys, values) => {
2425 if keys.is_null(row) {
2426 buf.extend_from_slice(b"null");
2427 } else {
2428 let k = keys.keys().value(row) as usize;
2429 write_str(buf, values.value(k));
2430 }
2431 }
2432 ColEnc::Bool(a) => {
2433 if a.is_null(row) {
2434 buf.extend_from_slice(b"null");
2435 } else {
2436 buf.extend_from_slice(if a.value(row) { b"true" } else { b"false" });
2437 }
2438 }
2439 ColEnc::I8(a) => int!(a),
2440 ColEnc::I16(a) => int!(a),
2441 ColEnc::I32(a) => int!(a),
2442 ColEnc::I64(a) => int!(a),
2443 ColEnc::U8(a) => int!(a),
2444 ColEnc::U16(a) => int!(a),
2445 ColEnc::U32(a) => int!(a),
2446 ColEnc::U64(a) => int!(a),
2447 ColEnc::Dec128(a) => {
2448 if a.is_null(row) {
2449 buf.extend_from_slice(b"null");
2450 } else {
2451 write_str(buf, &a.value_as_string(row));
2452 }
2453 }
2454 ColEnc::Dec256(a) => {
2455 if a.is_null(row) {
2456 buf.extend_from_slice(b"null");
2457 } else {
2458 write_str(buf, &a.value_as_string(row));
2459 }
2460 }
2461 ColEnc::F32(a) => {
2462 if a.is_null(row) {
2463 buf.extend_from_slice(b"null");
2464 } else {
2465 let v = a.value(row);
2466 if v.is_finite() {
2467 buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2468 } else {
2469 buf.extend_from_slice(b"null");
2470 }
2471 }
2472 }
2473 ColEnc::F64(a) => {
2474 if a.is_null(row) {
2475 buf.extend_from_slice(b"null");
2476 } else {
2477 let v = a.value(row);
2478 if v.is_finite() {
2479 buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2480 } else {
2481 buf.extend_from_slice(b"null");
2482 }
2483 }
2484 }
2485 ColEnc::Other(col) => {
2486 if col.is_null(row) {
2487 buf.extend_from_slice(b"null");
2488 } else {
2489 write_value(buf, *col, row);
2490 }
2491 }
2492 }
2493 }
2494}
2495
2496#[inline]
2497fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
2498 match col.data_type() {
2499 DataType::Utf8 => write_str(
2500 buf,
2501 col.as_any()
2502 .downcast_ref::<StringArray>()
2503 .unwrap()
2504 .value(row),
2505 ),
2506 DataType::LargeUtf8 => write_str(
2507 buf,
2508 col.as_any()
2509 .downcast_ref::<LargeStringArray>()
2510 .unwrap()
2511 .value(row),
2512 ),
2513 DataType::Utf8View => write_str(
2514 buf,
2515 col.as_any()
2516 .downcast_ref::<StringViewArray>()
2517 .unwrap()
2518 .value(row),
2519 ),
2520 DataType::Dictionary(key, value)
2521 if matches!(key.as_ref(), DataType::Int32)
2522 && matches!(value.as_ref(), DataType::Utf8) =>
2523 {
2524 let dict = col
2525 .as_any()
2526 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
2527 .unwrap();
2528 let keys = dict.keys();
2529 let values = dict
2530 .values()
2531 .as_any()
2532 .downcast_ref::<StringArray>()
2533 .unwrap();
2534 let k = keys.value(row) as usize;
2535 write_str(buf, values.value(k));
2536 }
2537 DataType::Boolean => {
2538 let v = col
2539 .as_any()
2540 .downcast_ref::<BooleanArray>()
2541 .unwrap()
2542 .value(row);
2543 buf.extend_from_slice(if v { b"true" } else { b"false" });
2544 }
2545 DataType::Int8 => {
2546 let mut b = itoa::Buffer::new();
2547 buf.extend_from_slice(
2548 b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
2549 .as_bytes(),
2550 );
2551 }
2552 DataType::Int16 => {
2553 let mut b = itoa::Buffer::new();
2554 buf.extend_from_slice(
2555 b.format(
2556 col.as_any()
2557 .downcast_ref::<Int16Array>()
2558 .unwrap()
2559 .value(row),
2560 )
2561 .as_bytes(),
2562 );
2563 }
2564 DataType::Int32 => {
2565 let mut b = itoa::Buffer::new();
2566 buf.extend_from_slice(
2567 b.format(
2568 col.as_any()
2569 .downcast_ref::<Int32Array>()
2570 .unwrap()
2571 .value(row),
2572 )
2573 .as_bytes(),
2574 );
2575 }
2576 DataType::Int64 => {
2577 let mut b = itoa::Buffer::new();
2578 buf.extend_from_slice(
2579 b.format(
2580 col.as_any()
2581 .downcast_ref::<Int64Array>()
2582 .unwrap()
2583 .value(row),
2584 )
2585 .as_bytes(),
2586 );
2587 }
2588 DataType::UInt8 => {
2589 let mut b = itoa::Buffer::new();
2590 buf.extend_from_slice(
2591 b.format(
2592 col.as_any()
2593 .downcast_ref::<UInt8Array>()
2594 .unwrap()
2595 .value(row),
2596 )
2597 .as_bytes(),
2598 );
2599 }
2600 DataType::UInt16 => {
2601 let mut b = itoa::Buffer::new();
2602 buf.extend_from_slice(
2603 b.format(
2604 col.as_any()
2605 .downcast_ref::<UInt16Array>()
2606 .unwrap()
2607 .value(row),
2608 )
2609 .as_bytes(),
2610 );
2611 }
2612 DataType::UInt32 => {
2613 let mut b = itoa::Buffer::new();
2614 buf.extend_from_slice(
2615 b.format(
2616 col.as_any()
2617 .downcast_ref::<UInt32Array>()
2618 .unwrap()
2619 .value(row),
2620 )
2621 .as_bytes(),
2622 );
2623 }
2624 DataType::UInt64 => {
2625 let mut b = itoa::Buffer::new();
2626 buf.extend_from_slice(
2627 b.format(
2628 col.as_any()
2629 .downcast_ref::<UInt64Array>()
2630 .unwrap()
2631 .value(row),
2632 )
2633 .as_bytes(),
2634 );
2635 }
2636 DataType::Decimal128(_, _) => {
2637 let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
2638 write_str(buf, &arr.value_as_string(row));
2639 }
2640 DataType::Decimal256(_, _) => {
2641 let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
2642 write_str(buf, &arr.value_as_string(row));
2643 }
2644 DataType::Float32 => {
2645 let v = col
2646 .as_any()
2647 .downcast_ref::<Float32Array>()
2648 .unwrap()
2649 .value(row);
2650 if v.is_finite() {
2651 let mut b = ryu::Buffer::new();
2652 buf.extend_from_slice(b.format_finite(v).as_bytes());
2653 } else {
2654 buf.extend_from_slice(b"null");
2655 }
2656 }
2657 DataType::Float64 => {
2658 let v = col
2659 .as_any()
2660 .downcast_ref::<Float64Array>()
2661 .unwrap()
2662 .value(row);
2663 if v.is_finite() {
2664 let mut b = ryu::Buffer::new();
2665 buf.extend_from_slice(b.format_finite(v).as_bytes());
2666 } else {
2667 buf.extend_from_slice(b"null");
2668 }
2669 }
2670 other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
2675 }
2676}
2677
2678#[inline]
2679fn write_str(buf: &mut Vec<u8>, s: &str) {
2680 buf.push(b'"');
2681 for &byte in s.as_bytes() {
2682 match byte {
2683 b'"' => buf.extend_from_slice(b"\\\""),
2684 b'\\' => buf.extend_from_slice(b"\\\\"),
2685 b'\n' => buf.extend_from_slice(b"\\n"),
2686 b'\r' => buf.extend_from_slice(b"\\r"),
2687 b'\t' => buf.extend_from_slice(b"\\t"),
2688 0x00..=0x1f => {
2689 buf.extend_from_slice(b"\\u00");
2690 const HEX: &[u8] = b"0123456789abcdef";
2691 buf.push(HEX[(byte >> 4) as usize]);
2692 buf.push(HEX[(byte & 0xf) as usize]);
2693 }
2694 b => buf.push(b),
2695 }
2696 }
2697 buf.push(b'"');
2698}
2699
2700#[async_trait]
2705impl Backend for Store {
2706 fn names(&self) -> Vec<String> {
2707 Store::names(self)
2708 }
2709
2710 fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
2711 let st = self.dataset(name)?;
2712 Ok(DatasetSummary {
2713 name: st.schema.name.clone(),
2714 columns: st.schema.columns.len(),
2715 rows: st.num_rows(),
2716 })
2717 }
2718
2719 fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
2720 let st = self.dataset(name)?;
2721 Ok(Arc::new(st.schema.clone()))
2722 }
2723
2724 fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
2725 let st = self.dataset(name)?;
2726 let mut cols: Vec<String> = st
2729 .schema
2730 .columns
2731 .iter()
2732 .map(|c| c.name.clone())
2733 .filter(|n| st.index.contains_key(n))
2734 .collect();
2735 let mut extras: Vec<String> = st
2738 .index
2739 .keys()
2740 .filter(|n| !cols.iter().any(|c| c == *n))
2741 .cloned()
2742 .collect();
2743 extras.sort();
2744 cols.extend(extras);
2745 Ok(cols)
2746 }
2747
2748 async fn sample(&self, name: &str) -> Result<String, AppError> {
2749 Store::sample(self, name).await
2750 }
2751
2752 async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
2753 Store::query(self, name, req).await
2754 }
2755
2756 async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
2757 Store::query_arrow(self, name, req).await
2758 }
2759
2760 async fn query_arrow_stream(
2761 &self,
2762 name: &str,
2763 req: &QueryRequest,
2764 ) -> Result<ArrowIpcStream, AppError> {
2765 Store::query_arrow_stream(self, name, req).await
2766 }
2767
2768 async fn query_arrow_stream_all(
2769 &self,
2770 name: &str,
2771 req: &QueryRequest,
2772 ) -> Result<ArrowIpcStream, AppError> {
2773 Store::query_arrow_stream_all(self, name, req).await
2774 }
2775
2776 async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
2777 Store::count(self, name, req).await
2778 }
2779
2780 async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
2781 Store::query_sql(self, sql, max_rows).await
2782 }
2783
2784 async fn query_sql_arrow_stream(
2785 &self,
2786 sql: &str,
2787 max_rows: u64,
2788 ) -> Result<ArrowIpcStream, AppError> {
2789 Store::query_sql_arrow_stream(self, sql, max_rows).await
2790 }
2791
2792 async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
2793 Store::parquet(self, name).await
2794 }
2795
2796 async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
2797 Store::reload(self, name).await
2798 }
2799}