1use std::borrow::Cow;
2use std::cmp::Ordering;
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5
6use arc_swap::ArcSwap;
7use arrow::array::{
8 Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
9 Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
10 StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
11};
12use arrow::compute;
13use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
14use arrow::datatypes::{DataType, Field, Schema};
15use async_trait::async_trait;
16use parquet::arrow::ProjectionMask;
17use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
18use serde_json::Value as JsonValue;
19
20use datafusion::datasource::file_format::parquet::ParquetFormat;
21use datafusion::datasource::listing::{
22 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
23};
24use datafusion::datasource::{MemTable, TableProvider};
25use datafusion::prelude::SessionContext;
26use datafusion::scalar::ScalarValue;
27
28use object_store::aws::AmazonS3Builder;
29use url::Url;
30
31use datapress_core::backend::{
32 ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
33};
34use datapress_core::config::{
35 AddressingStyle, AppConfig, DatasetConfig, IndexConfig, IndexMode, Partitioning, ResolvedCreds,
36 S3Config, SourceKind,
37};
38use datapress_core::errors::AppError;
39use datapress_core::models::{CountRequest, Predicate, QueryRequest};
40use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
41
42type FastMap<K, V> = HashMap<K, V, ahash::RandomState>;
51
52type EqIndex = FastMap<String, FastMap<String, Vec<u32>>>;
54
55pub struct DatasetState {
70 pub schema: DatasetSchema,
71 pub data: Vec<RecordBatch>,
72 pub arrow_schema: Arc<Schema>,
73 pub index: EqIndex,
74 pub lazy: bool,
75}
76
77impl DatasetState {
78 pub fn num_rows(&self) -> usize {
80 self.data.iter().map(|b| b.num_rows()).sum()
81 }
82}
83
84pub struct Store {
89 ctx: SessionContext,
90 max_page_size: u64,
91 configs: HashMap<String, DatasetConfig>,
94 datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
96 reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
99}
100
101impl Store {
102 pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
104 if cfg
107 .datasets
108 .iter()
109 .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
110 {
111 deltalake::aws::register_handlers(None);
112 }
113
114 let ctx = SessionContext::new();
120 let mut datasets = HashMap::with_capacity(cfg.datasets.len());
121 let mut configs = HashMap::with_capacity(cfg.datasets.len());
122
123 for d in &cfg.datasets {
124 let (state, provider) = build_dataset(d, &ctx).await?;
125 ctx.register_table(d.name.as_str(), provider)?;
126 datasets.insert(d.name.clone(), Arc::new(state));
127 configs.insert(d.name.clone(), d.clone());
128 }
129 Ok(Self {
130 ctx,
131 max_page_size: cfg.server.max_page_size.max(1),
132 configs,
133 datasets: ArcSwap::from_pointee(datasets),
134 reload_locks: Mutex::new(HashMap::new()),
135 })
136 }
137
138 pub fn names(&self) -> Vec<String> {
140 let snap = self.datasets.load();
141 let mut v: Vec<String> = snap.keys().cloned().collect();
142 v.sort();
143 v
144 }
145
146 pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
147 self.datasets
148 .load()
149 .get(name)
150 .cloned()
151 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
152 }
153
154 pub async fn sample(&self, name: &str) -> Result<String, AppError> {
157 let st = self.dataset(name)?;
158
159 if st.lazy {
161 let table = DatasetSchema::quote_ident(&st.schema.name);
162 let sql = format!("SELECT * FROM {table} LIMIT 1");
163 let df = self.ctx.sql(&sql).await?;
164 let batches = df.collect().await?;
165 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
166 return Ok("null".into());
167 }
168 let arr = serialize(&batches[0].slice(0, 1))?;
169 let trimmed = arr.trim();
170 let inner = trimmed
171 .strip_prefix('[')
172 .and_then(|s| s.strip_suffix(']'))
173 .unwrap_or(trimmed);
174 return Ok(inner.to_string());
175 }
176
177 let first = match st.data.iter().find(|b| b.num_rows() > 0) {
178 Some(b) => b,
179 None => return Ok("null".into()),
180 };
181 let arr = serialize(&first.slice(0, 1))?;
182 let trimmed = arr.trim();
184 let inner = trimmed
185 .strip_prefix('[')
186 .and_then(|s| s.strip_suffix(']'))
187 .unwrap_or(trimmed);
188 Ok(inner.to_string())
189 }
190
191 pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
196 let cfg = self
198 .configs
199 .get(name)
200 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
201 .clone();
202
203 let lock = {
205 let mut locks = self.reload_locks.lock().unwrap();
206 locks
207 .entry(name.to_string())
208 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
209 .clone()
210 };
211 let _guard = lock.lock().await;
212
213 let started = std::time::Instant::now();
214
215 let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
218 let rows = state.num_rows();
219
220 let _ = self.ctx.deregister_table(name)?;
226 self.ctx.register_table(name, provider)?;
227
228 let mut new_map = (**self.datasets.load()).clone();
229 new_map.insert(name.to_string(), Arc::new(state));
230 self.datasets.store(Arc::new(new_map));
231
232 let elapsed_ms = started.elapsed().as_millis();
233 log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
234 Ok(ReloadStats { rows, elapsed_ms })
235 }
236
237 pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
241 let batch = self.query_batch(name, req).await?;
242 if batch.num_rows() == 0 {
243 return Ok("[]".to_string());
244 }
245 serialize(&batch)
246 }
247
248 fn canonicalize_sql(&self, sql: &str) -> String {
254 let snap = self.datasets.load();
255 let mut tables: HashMap<String, String> = HashMap::with_capacity(snap.len());
256 let mut columns: HashMap<String, String> = HashMap::new();
257 for (name, state) in snap.iter() {
258 tables.insert(name.to_lowercase(), name.clone());
259 for col in &state.schema.columns {
260 columns
261 .entry(col.name.to_lowercase())
262 .or_insert_with(|| col.name.clone());
263 }
264 }
265 datapress_core::sql::canonicalize_identifiers(sql, &tables, &columns)
266 }
267
268 pub async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
275 let cap = max_rows.max(1);
276 let sql = self.canonicalize_sql(sql);
277 let wrapped = format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}");
278 let df = self.ctx.sql(&wrapped).await?;
279 let batches = df.collect().await?;
280 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
281 return Ok("[]".to_string());
282 }
283 let batch = if batches.len() == 1 {
284 batches.into_iter().next().expect("checked len")
285 } else {
286 compute::concat_batches(&batches[0].schema(), batches.iter())?
287 };
288 let batch = if batch.num_rows() as u64 > cap {
291 batch.slice(0, cap as usize)
292 } else {
293 batch
294 };
295 serialize(&batch)
296 }
297
298 pub async fn query_sql_arrow_stream(
302 &self,
303 sql: &str,
304 max_rows: u64,
305 ) -> Result<ArrowIpcStream, AppError> {
306 let cap = max_rows.max(1);
307 let sql = self.canonicalize_sql(sql);
308 let wrapped = format!("SELECT * FROM ({sql}) AS _datapress_sql LIMIT {cap}");
309 let df = self.ctx.sql(&wrapped).await?;
310 let batches = df.collect().await?;
311 Ok(stream_arrow_batches(batches))
312 }
313
314 pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
318 let batch = self.query_batch(name, req).await?;
319 let schema = batch.schema();
320 let mut buf = Vec::with_capacity(8 * 1024);
321 {
322 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
323 if batch.num_rows() > 0 {
324 w.write(&batch)?;
325 }
326 w.finish()?;
327 }
328 Ok(buf)
329 }
330
331 pub async fn query_arrow_stream(
332 &self,
333 name: &str,
334 req: &QueryRequest,
335 ) -> Result<ArrowIpcStream, AppError> {
336 let batches = self.query_batches(name, req).await?;
337 Ok(stream_arrow_batches(batches))
338 } pub async fn query_arrow_stream_all(
339 &self,
340 name: &str,
341 req: &QueryRequest,
342 ) -> Result<ArrowIpcStream, AppError> {
343 let batches = self.query_batches_all(name, req).await?;
344 Ok(stream_arrow_batches(batches))
345 }
346
347 pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
355 let req = QueryRequest {
357 columns: Vec::new(),
358 predicates: Vec::new(),
359 group_by: Vec::new(),
360 aggregations: Vec::new(),
361 distinct: false,
362 order_by: Vec::new(),
363 limit: None,
364 page: 1,
365 page_size: 1,
366 };
367 let st = self.dataset(name)?;
368 let batches = self.query_batches_all(name, &req).await?;
369 let schema = batches
373 .first()
374 .map(|b| b.schema())
375 .unwrap_or_else(|| st.arrow_schema.clone());
376
377 let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
378 {
379 let props = parquet::file::properties::WriterProperties::builder()
380 .set_compression(parquet::basic::Compression::SNAPPY)
381 .build();
382 let mut writer =
383 parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
384 .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
385 for batch in &batches {
386 if batch.num_rows() > 0 {
387 writer
388 .write(batch)
389 .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
390 }
391 }
392 writer
393 .close()
394 .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
395 }
396 Ok(bytes::Bytes::from(buf))
397 }
398
399 async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
402 let batches = self.query_batches(name, req).await?;
403 if batches.is_empty() {
404 return Ok(RecordBatch::new_empty(Arc::new(
405 arrow::datatypes::Schema::empty(),
406 )));
407 }
408 if batches.len() == 1 {
409 return Ok(batches.into_iter().next().expect("checked len"));
410 }
411 if batches.iter().all(|b| b.num_rows() == 0) {
412 return Ok(RecordBatch::new_empty(batches[0].schema()));
413 }
414 let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
415 Ok(batch)
416 }
417
418 async fn query_batches(
422 &self,
423 name: &str,
424 req: &QueryRequest,
425 ) -> Result<Vec<RecordBatch>, AppError> {
426 let st = self.dataset(name)?;
427
428 let page = req.page.max(1);
429 let page_size = req.page_size.clamp(1, self.max_page_size);
430 let offset = ((page - 1) * page_size) as usize;
431 let limit = page_size as usize;
432
433 self.query_batches_inner(st, req, Some((offset, limit)))
434 .await
435 }
436
437 async fn query_batches_all(
441 &self,
442 name: &str,
443 req: &QueryRequest,
444 ) -> Result<Vec<RecordBatch>, AppError> {
445 let st = self.dataset(name)?;
446 self.query_batches_inner(st, req, None).await
447 }
448
449 async fn query_batches_inner(
450 &self,
451 st: Arc<DatasetState>,
452 req: &QueryRequest,
453 page_window: Option<(usize, usize)>,
454 ) -> Result<Vec<RecordBatch>, AppError> {
455 let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
456
457 let can_fast_path = !st.lazy
464 && req.order_by.is_empty()
465 && (page_window.is_none() || req.limit.is_none())
466 && req.group_by.is_empty()
467 && !req.distinct;
468
469 if can_fast_path {
470 let total = st.num_rows();
471
472 if req.predicates.is_empty() {
475 if page_window.is_none() && req.limit.is_none() {
476 return st
477 .data
478 .iter()
479 .cloned()
480 .map(|batch| project(&st.schema, batch, &req.columns))
481 .collect();
482 }
483 let start = offset.min(total);
484 let len = limit.min(total - start);
485 let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
486 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
487 }
488
489 if let Some(rows) = try_index(&st.index, &req.predicates) {
492 let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
493 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
494 }
495 }
496
497 let (sql, params) = match page_window {
499 Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
500 None => build_query_stream_sql(&st.schema, req)?,
501 };
502 let mut df = self.ctx.sql(&sql).await?;
503 if !params.is_empty() {
504 df = df.with_param_values(params)?;
505 }
506 let batches = df.collect().await?;
507 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
508 let schema = batches
509 .first()
510 .map(|b| b.schema())
511 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
512 return Ok(vec![RecordBatch::new_empty(schema)]);
513 }
514 Ok(batches)
515 }
516}
517
518fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
519 let schema = batches
520 .first()
521 .map(|batch| batch.schema())
522 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
523 let (mut writer, stream) = arrow_ipc_stream_channel(8);
524
525 tokio::task::spawn_blocking(move || {
526 let result = (|| -> Result<(), AppError> {
527 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
528 for batch in batches {
529 if batch.num_rows() > 0 {
530 w.write(&batch)?;
531 }
532 }
533 w.finish()?;
534 Ok(())
535 })();
536 if let Err(err) = result {
537 log::error!("datafusion arrow stream failed: {err}");
538 writer.send_error(err);
539 }
540 });
541
542 stream
543}
544
545impl Store {
546 pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
550 let st = self.dataset(name)?;
551
552 if !st.lazy {
553 if req.predicates.is_empty() {
555 return Ok(st.num_rows() as i64);
556 }
557 if let Some(rows) = try_index(&st.index, &req.predicates) {
559 return Ok(rows.len() as i64);
560 }
561 }
562
563 let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
566 let mut df = self.ctx.sql(&sql).await?;
567 if !params.is_empty() {
568 df = df.with_param_values(params)?;
569 }
570 let batches = df.collect().await?;
571 let n = batches
572 .first()
573 .and_then(|b| {
574 b.column(0)
575 .as_any()
576 .downcast_ref::<arrow::array::Int64Array>()
577 })
578 .filter(|a| !a.is_empty())
579 .map(|a| a.value(0))
580 .unwrap_or(0);
581 Ok(n)
582 }
583}
584
585async fn build_dataset(
590 d: &DatasetConfig,
591 ctx: &SessionContext,
592) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
593 if d.lazy {
599 match (d.source.kind, d.source.is_s3()) {
600 (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
601 (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
602 (SourceKind::Delta, _) => {
603 return Err(AppError::Internal(format!(
604 "dataset '{}': lazy mode is not supported for delta sources",
605 d.name
606 )));
607 }
608 }
609 }
610
611 let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
616 (SourceKind::Parquet, false) => read_local_parquet(d)?,
617 (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
618 (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
619 (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
620 };
621 if raw_batches.is_empty() {
622 return Err(AppError::Internal(format!(
623 "dataset '{}': source produced no batches",
624 d.name
625 )));
626 }
627
628 let chunks = raw_batches;
629 let arrow_sch = chunks[0].schema();
630
631 let columns: Vec<ColumnInfo> = arrow_sch
633 .fields()
634 .iter()
635 .map(|f| {
636 let dt = f.data_type();
637 ColumnInfo {
638 name: f.name().clone(),
639 logical: arrow_to_logical(dt),
640 sql_type: format!("{dt:?}"),
641 nullable: f.is_nullable(),
642 }
643 })
644 .collect();
645 let schema = DatasetSchema::new(&d.name, columns);
646
647 let index = build_eq_index_with_policy(&chunks, &d.index);
652
653 let n_parts = std::thread::available_parallelism()
658 .map(|n| n.get())
659 .unwrap_or(4);
660 let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
661 for (i, b) in chunks.iter().enumerate() {
662 if b.num_rows() == 0 {
663 continue;
664 }
665 parts[i % n_parts].push(b.clone());
666 }
667 parts.retain(|p| !p.is_empty());
668 let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
669
670 let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
671 let mem_mb: usize = chunks
672 .iter()
673 .flat_map(|b| b.columns().iter())
674 .map(|c| c.get_buffer_memory_size())
675 .sum::<usize>()
676 / 1_048_576;
677 log::info!(
678 "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
679 d.name,
680 d.source.kind.as_str(),
681 total_rows,
682 schema.columns.len(),
683 mem_mb,
684 chunks.len(),
685 index.len()
686 );
687
688 Ok((
689 DatasetState {
690 schema,
691 data: chunks,
692 arrow_schema: arrow_sch,
693 index,
694 lazy: false,
695 },
696 provider,
697 ))
698}
699
700async fn build_lazy_local_parquet(
705 d: &DatasetConfig,
706 ctx: &SessionContext,
707) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
708 let (url, part_keys) = lazy_local_listing(d)?;
709
710 let mut opts =
711 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
712 if !part_keys.is_empty() {
713 opts = opts.with_table_partition_cols(
714 part_keys
715 .iter()
716 .map(|k| (k.clone(), DataType::Utf8))
717 .collect(),
718 );
719 }
720
721 let session_state = ctx.state();
722 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
725 AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
726 })?;
727
728 let cfg = ListingTableConfig::new(url)
729 .with_listing_options(opts)
730 .with_schema(file_schema.clone());
731 let table = ListingTable::try_new(cfg).map_err(|e| {
732 AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
733 })?;
734 let provider: Arc<dyn TableProvider> = Arc::new(table);
735
736 let mut fields: Vec<Field> = file_schema
738 .fields()
739 .iter()
740 .map(|f| f.as_ref().clone())
741 .collect();
742 for k in &part_keys {
743 if !fields.iter().any(|f| f.name() == k) {
744 fields.push(Field::new(k, DataType::Utf8, false));
745 }
746 }
747 let arrow_sch = Arc::new(Schema::new(fields));
748
749 let columns: Vec<ColumnInfo> = arrow_sch
750 .fields()
751 .iter()
752 .map(|f| {
753 let dt = f.data_type();
754 ColumnInfo {
755 name: f.name().clone(),
756 logical: arrow_to_logical(dt),
757 sql_type: format!("{dt:?}"),
758 nullable: f.is_nullable(),
759 }
760 })
761 .collect();
762 let schema = DatasetSchema::new(&d.name, columns);
763
764 log::info!(
765 "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
766 d.name,
767 d.source.kind.as_str(),
768 schema.columns.len(),
769 part_keys.len()
770 );
771
772 Ok((
773 DatasetState {
774 schema,
775 data: Vec::new(),
776 arrow_schema: arrow_sch,
777 index: EqIndex::default(),
778 lazy: true,
779 },
780 provider,
781 ))
782}
783
784fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
789 let loc = &d.source.location;
790
791 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
792 let parts: Vec<&str> = loc.split('/').collect();
793 let first_wild = parts
794 .iter()
795 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
796 .unwrap_or(parts.len());
797 let base = parts[..first_wild].join("/");
798 let base = if base.is_empty() {
799 "/".to_string()
800 } else {
801 base
802 };
803 let upper = parts.len().saturating_sub(1);
806 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
807 .iter()
808 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
809 .filter(|k| !k.is_empty())
810 .collect();
811 return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
812 }
813
814 let path = std::path::Path::new(loc);
815 if path.is_dir() {
816 let keys = discover_hive_keys(path);
817 return Ok((dir_url(path, d)?, keys));
818 }
819
820 let url = ListingTableUrl::parse(loc)
821 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
822 Ok((url, Vec::new()))
823}
824
825fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
828 let s = path.to_str().ok_or_else(|| {
829 AppError::Internal(format!(
830 "dataset '{}': non-utf8 path {}",
831 d.name,
832 path.display()
833 ))
834 })?;
835 let s = if s.ends_with('/') {
836 s.to_string()
837 } else {
838 format!("{s}/")
839 };
840 ListingTableUrl::parse(&s)
841 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
842}
843
844fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
848 let mut keys = Vec::new();
849 let mut cur = base.to_path_buf();
850 loop {
851 let Ok(rd) = std::fs::read_dir(&cur) else {
852 break;
853 };
854 let mut next: Option<(String, std::path::PathBuf)> = None;
855 for entry in rd.flatten() {
856 let p = entry.path();
857 if !p.is_dir() {
858 continue;
859 }
860 let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
861 continue;
862 };
863 if let Some((k, v)) = name.split_once('=')
864 && !k.is_empty()
865 && !v.is_empty()
866 {
867 next = Some((k.to_string(), p));
868 break;
869 }
870 }
871 match next {
872 Some((k, p)) => {
873 keys.push(k);
874 cur = p;
875 }
876 None => break,
877 }
878 }
879 keys
880}
881
882async fn build_lazy_s3_parquet(
888 d: &DatasetConfig,
889 ctx: &SessionContext,
890) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
891 register_s3_object_store(d, ctx)?;
892
893 let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
894
895 let mut fields: Vec<Field> = file_schema
897 .fields()
898 .iter()
899 .map(|f| f.as_ref().clone())
900 .collect();
901 for k in &part_keys {
902 if !fields.iter().any(|f| f.name() == k) {
903 fields.push(Field::new(k, DataType::Utf8, false));
904 }
905 }
906 let arrow_sch = Arc::new(Schema::new(fields));
907
908 let columns: Vec<ColumnInfo> = arrow_sch
909 .fields()
910 .iter()
911 .map(|f| {
912 let dt = f.data_type();
913 ColumnInfo {
914 name: f.name().clone(),
915 logical: arrow_to_logical(dt),
916 sql_type: format!("{dt:?}"),
917 nullable: f.is_nullable(),
918 }
919 })
920 .collect();
921 let schema = DatasetSchema::new(&d.name, columns);
922
923 log::info!(
924 "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
925 d.name,
926 d.source.kind.as_str(),
927 schema.columns.len(),
928 part_keys.len()
929 );
930
931 Ok((
932 DatasetState {
933 schema,
934 data: Vec::new(),
935 arrow_schema: arrow_sch,
936 index: EqIndex::default(),
937 lazy: true,
938 },
939 provider,
940 ))
941}
942
943async fn build_s3_listing_table(
949 d: &DatasetConfig,
950 ctx: &SessionContext,
951) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
952 let (url, part_keys) = s3_listing(d, ctx).await?;
953
954 let mut opts =
955 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
956 if !part_keys.is_empty() {
957 opts = opts.with_table_partition_cols(
958 part_keys
959 .iter()
960 .map(|k| (k.clone(), DataType::Utf8))
961 .collect(),
962 );
963 }
964
965 let session_state = ctx.state();
966 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
967 AppError::Internal(format!(
968 "dataset '{}': infer parquet schema on s3: {e}",
969 d.name
970 ))
971 })?;
972
973 let cfg = ListingTableConfig::new(url)
974 .with_listing_options(opts)
975 .with_schema(file_schema.clone());
976 let table = ListingTable::try_new(cfg).map_err(|e| {
977 AppError::Internal(format!(
978 "dataset '{}': ListingTable::try_new (s3): {e}",
979 d.name
980 ))
981 })?;
982 Ok((Arc::new(table), file_schema, part_keys))
983}
984
985async fn s3_listing(
991 d: &DatasetConfig,
992 ctx: &SessionContext,
993) -> Result<(ListingTableUrl, Vec<String>), AppError> {
994 let s3 = d.s3.clone().unwrap_or_default();
995 let want_partitions = !matches!(s3.partitioning, Partitioning::None);
996 let loc = &d.source.location;
997
998 if d.source.has_glob() {
999 let (base, keys) = split_glob_base_keys(loc);
1000 let base = format!("{}/", base.trim_end_matches('/'));
1001 let url = ListingTableUrl::parse(&base).map_err(|e| {
1002 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1003 })?;
1004 let keys = if want_partitions { keys } else { Vec::new() };
1005 return Ok((url, keys));
1006 }
1007
1008 let base = if loc.ends_with('/') {
1009 loc.clone()
1010 } else {
1011 format!("{loc}/")
1012 };
1013 let url = ListingTableUrl::parse(&base).map_err(|e| {
1014 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
1015 })?;
1016 let keys = if want_partitions {
1017 discover_s3_hive_keys(ctx, &url).await
1018 } else {
1019 Vec::new()
1020 };
1021 Ok((url, keys))
1022}
1023
1024fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
1028 let parts: Vec<&str> = loc.split('/').collect();
1029 let first_wild = parts
1030 .iter()
1031 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
1032 .unwrap_or(parts.len());
1033 let base = parts[..first_wild].join("/");
1034 let base = if base.is_empty() {
1035 "/".to_string()
1036 } else {
1037 base
1038 };
1039 let upper = parts.len().saturating_sub(1);
1040 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
1041 .iter()
1042 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
1043 .filter(|k| !k.is_empty())
1044 .collect();
1045 (base, keys)
1046}
1047
1048async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
1053 let store = match ctx.runtime_env().object_store(url.object_store()) {
1054 Ok(s) => s,
1055 Err(_) => return Vec::new(),
1056 };
1057 let mut keys = Vec::new();
1058 let mut prefix = url.prefix().clone();
1059 loop {
1060 let listing = match store.list_with_delimiter(Some(&prefix)).await {
1061 Ok(l) => l,
1062 Err(_) => break,
1063 };
1064 let mut next: Option<object_store::path::Path> = None;
1065 for cp in &listing.common_prefixes {
1066 if let Some(seg) = cp.parts().next_back() {
1067 let seg = seg.as_ref().to_string();
1068 if let Some((k, v)) = seg.split_once('=')
1069 && !k.is_empty()
1070 && !v.is_empty()
1071 {
1072 keys.push(k.to_string());
1073 next = Some(cp.clone());
1074 break;
1075 }
1076 }
1077 }
1078 match next {
1079 Some(p) => prefix = p,
1080 None => break,
1081 }
1082 }
1083 keys
1084}
1085
1086fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1103 let files = d.resolve_local_parquet_files()?;
1104 let mut all = Vec::new();
1105 let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1106 None
1107 } else {
1108 Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1109 };
1110
1111 for f in &files {
1112 let file = std::fs::File::open(f)
1113 .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1114
1115 let probe = ParquetRecordBatchReaderBuilder::try_new(
1120 file.try_clone()
1121 .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1122 )?;
1123 let parquet_schema = probe.parquet_schema().clone();
1124 let arrow_schema = probe.schema().clone();
1125 let metadata = probe.metadata().clone();
1126 drop(probe);
1127
1128 let projection = if let Some(w) = &wanted {
1130 let indices: Vec<usize> = arrow_schema
1131 .fields()
1132 .iter()
1133 .enumerate()
1134 .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1135 .map(|(i, _)| i)
1136 .collect();
1137 if indices.is_empty() {
1138 return Err(AppError::Internal(format!(
1139 "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1140 d.name,
1141 d.columns,
1142 f.display()
1143 )));
1144 }
1145 ProjectionMask::roots(&parquet_schema, indices)
1146 } else {
1147 ProjectionMask::all()
1148 };
1149
1150 let mut new_fields: Vec<Field> = arrow_schema
1158 .fields()
1159 .iter()
1160 .map(|f| f.as_ref().clone())
1161 .collect();
1162 if d.dict_encode
1163 && let Some(rg0) = metadata.row_groups().first()
1164 {
1165 for (i, fld) in arrow_schema.fields().iter().enumerate() {
1166 if !matches!(
1167 fld.data_type(),
1168 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1169 ) {
1170 continue;
1171 }
1172 if let Some(col) = rg0.columns().get(i)
1173 && col.dictionary_page_offset().is_some()
1174 {
1175 new_fields[i] = Field::new(
1176 fld.name(),
1177 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1178 fld.is_nullable(),
1179 );
1180 }
1181 }
1182 }
1183 let forced_schema = Arc::new(Schema::new(new_fields));
1184
1185 let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1186 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1187 .with_batch_size(65_536)
1188 .with_projection(projection)
1189 .build()?;
1190 let pairs = hive_pairs(f);
1194 for batch in reader {
1195 let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1196 all.push(if pairs.is_empty() {
1197 batch
1198 } else {
1199 append_partition_cols(&batch, &pairs)?
1200 });
1201 }
1202 }
1203 if all.is_empty() {
1204 return Err(AppError::Internal(format!(
1205 "dataset '{}': parquet source is empty",
1206 d.name
1207 )));
1208 }
1209 Ok(all)
1210}
1211
1212fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1215 path.components()
1216 .filter_map(|c| c.as_os_str().to_str())
1217 .filter_map(|seg| {
1218 let (k, v) = seg.split_once('=')?;
1219 if k.is_empty() || v.is_empty() || v.contains('=') {
1220 return None;
1221 }
1222 Some((k.to_string(), v.to_string()))
1223 })
1224 .collect()
1225}
1226
1227fn append_partition_cols(
1230 batch: &RecordBatch,
1231 pairs: &[(String, String)],
1232) -> Result<RecordBatch, AppError> {
1233 let n = batch.num_rows();
1234 let mut fields: Vec<Field> = batch
1235 .schema()
1236 .fields()
1237 .iter()
1238 .map(|f| f.as_ref().clone())
1239 .collect();
1240 let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1241 for (k, v) in pairs {
1242 if fields.iter().any(|f| f.name() == k) {
1243 continue;
1244 }
1245 fields.push(Field::new(k, DataType::Utf8, false));
1246 cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1247 }
1248 RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1249 .map_err(|e| AppError::Internal(e.to_string()))
1250}
1251
1252async fn read_s3_parquet(
1258 d: &DatasetConfig,
1259 ctx: &SessionContext,
1260) -> Result<Vec<RecordBatch>, AppError> {
1261 register_s3_object_store(d, ctx)?;
1262 let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1263 let df = ctx
1264 .read_table(provider)
1265 .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1266 Ok(df.collect().await?)
1267}
1268
1269async fn read_delta(
1273 d: &DatasetConfig,
1274 opts: HashMap<String, String>,
1275) -> Result<Vec<RecordBatch>, AppError> {
1276 let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1277 AppError::Internal(format!(
1278 "dataset '{}': bad delta location '{}': {e}",
1279 d.name, d.source.location
1280 ))
1281 })?;
1282 let table = deltalake::open_table_with_storage_options(url, opts)
1283 .await
1284 .map_err(|e| {
1285 AppError::Internal(format!(
1286 "dataset '{}': delta open '{}': {e}",
1287 d.name, d.source.location
1288 ))
1289 })?;
1290 let provider = table.table_provider().await.map_err(|e| {
1291 AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1292 })?;
1293 let scan_ctx = SessionContext::new();
1296 let df = scan_ctx
1297 .read_table(provider)
1298 .map_err(|e| AppError::Internal(format!("dataset '{}': delta read_table: {e}", d.name)))?;
1299 Ok(df.collect().await?)
1300}
1301
1302fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1306 let creds = d.resolved_creds();
1307 let region = d.resolved_region();
1308 let s3 = d.s3.clone().unwrap_or_default();
1309 let (bucket, _) = d.source.s3_bucket()?;
1310
1311 let mut opts = HashMap::new();
1312 opts.insert("AWS_REGION".into(), region);
1313 if let Some(ep) = s3.effective_endpoint(bucket) {
1314 opts.insert("AWS_ENDPOINT_URL".into(), ep);
1315 }
1316 if s3.allow_http {
1317 opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1318 }
1319 opts.insert(
1320 "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1321 (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1322 );
1323 if let Some(k) = creds.access_key_id {
1324 opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1325 }
1326 if let Some(s) = creds.secret_access_key {
1327 opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1328 }
1329 if let Some(t) = creds.session_token {
1330 opts.insert("AWS_SESSION_TOKEN".into(), t);
1331 }
1332 opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1334 Ok(opts)
1335}
1336
1337fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1341 let (bucket, _key) = d.source.s3_bucket()?;
1342 let creds = d.resolved_creds();
1343 let region = d.resolved_region();
1344 let s3 = d.s3.clone().unwrap_or_default();
1345
1346 let store = build_s3(bucket, ®ion, &s3, &creds).map_err(|e| {
1347 AppError::Internal(format!(
1348 "dataset '{}': build S3 store for '{bucket}': {e}",
1349 d.name
1350 ))
1351 })?;
1352
1353 let url = Url::parse(&format!("s3://{bucket}"))
1354 .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1355 ctx.register_object_store(&url, Arc::new(store));
1356 Ok(())
1357}
1358
1359fn build_s3(
1360 bucket: &str,
1361 region: &str,
1362 s3: &S3Config,
1363 creds: &ResolvedCreds,
1364) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1365 let mut b = AmazonS3Builder::new()
1366 .with_bucket_name(bucket)
1367 .with_region(region)
1368 .with_allow_http(s3.allow_http)
1369 .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1370 if let Some(ep) = s3.effective_endpoint(bucket) {
1371 b = b.with_endpoint(ep);
1372 }
1373 if let Some(k) = creds.access_key_id.as_deref() {
1374 b = b.with_access_key_id(k);
1375 }
1376 if let Some(s) = creds.secret_access_key.as_deref() {
1377 b = b.with_secret_access_key(s);
1378 }
1379 if let Some(t) = creds.session_token.as_deref() {
1380 b = b.with_token(t);
1381 }
1382 b.build()
1383}
1384
1385fn arrow_to_logical(dt: &DataType) -> LogicalType {
1386 match dt {
1387 DataType::Boolean => LogicalType::Bool,
1388 DataType::Int8
1389 | DataType::Int16
1390 | DataType::Int32
1391 | DataType::Int64
1392 | DataType::UInt8
1393 | DataType::UInt16
1394 | DataType::UInt32
1395 | DataType::UInt64 => LogicalType::Int,
1396 DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1397 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1398 DataType::Dictionary(_, v)
1402 if matches!(
1403 v.as_ref(),
1404 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1405 ) =>
1406 {
1407 LogicalType::Utf8
1408 }
1409 DataType::Date32
1410 | DataType::Date64
1411 | DataType::Time32(_)
1412 | DataType::Time64(_)
1413 | DataType::Timestamp(_, _)
1414 | DataType::Duration(_)
1415 | DataType::Interval(_) => LogicalType::Temporal,
1416 _ => LogicalType::Other,
1417 }
1418}
1419
1420fn project(
1425 schema: &DatasetSchema,
1426 batch: RecordBatch,
1427 columns: &[String],
1428) -> Result<RecordBatch, AppError> {
1429 if columns.is_empty() {
1430 return Ok(batch);
1431 }
1432 let indices: Vec<usize> = columns
1433 .iter()
1434 .map(|c| {
1435 schema
1436 .find(c)
1437 .map(|info| schema.by_name[&info.name.to_lowercase()])
1438 })
1439 .collect::<Result<_, _>>()?;
1440 let fields: Vec<Field> = indices
1441 .iter()
1442 .map(|&i| batch.schema().field(i).clone())
1443 .collect();
1444 let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
1445 Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
1446}
1447
1448#[derive(Default)]
1461struct Params {
1462 values: Vec<ScalarValue>,
1463}
1464
1465impl Params {
1466 fn new() -> Self {
1467 Self::default()
1468 }
1469
1470 fn bind(&mut self, v: ScalarValue) -> String {
1472 self.values.push(v);
1473 format!("${}", self.values.len())
1474 }
1475
1476 fn into_values(self) -> Vec<ScalarValue> {
1477 self.values
1478 }
1479}
1480
1481fn build_query_sql(
1482 schema: &DatasetSchema,
1483 req: &QueryRequest,
1484 max_page_size: u64,
1485) -> Result<(String, Vec<ScalarValue>), AppError> {
1486 let (limit, offset) = req.effective_limit_offset(max_page_size);
1487 build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
1488}
1489
1490fn build_query_stream_sql(
1491 schema: &DatasetSchema,
1492 req: &QueryRequest,
1493) -> Result<(String, Vec<ScalarValue>), AppError> {
1494 let suffix = req
1495 .limit
1496 .map(|limit| format!(" LIMIT {limit}"))
1497 .unwrap_or_default();
1498 build_query_sql_with_suffix(schema, req, &suffix)
1499}
1500
1501fn build_query_sql_with_suffix(
1502 schema: &DatasetSchema,
1503 req: &QueryRequest,
1504 suffix: &str,
1505) -> Result<(String, Vec<ScalarValue>), AppError> {
1506 let agg_plan = req.agg_plan(schema)?;
1507
1508 let cols = if let Some(plan) = &agg_plan {
1509 let mut parts: Vec<String> = plan
1511 .group_cols
1512 .iter()
1513 .map(|c| DatasetSchema::quote_ident(c))
1514 .collect();
1515 for a in &plan.aggs {
1516 let expr = a.sql_expr()?;
1517 parts.push(format!(
1518 "{expr} AS {}",
1519 DatasetSchema::quote_ident(&a.alias)
1520 ));
1521 }
1522 parts.join(", ")
1523 } else if req.columns.is_empty() {
1524 if req.distinct {
1525 "DISTINCT *".to_string()
1526 } else {
1527 "*".to_string()
1528 }
1529 } else {
1530 let list = req
1531 .columns
1532 .iter()
1533 .map(|c| {
1534 schema
1535 .find(c)
1536 .map(|info| DatasetSchema::quote_ident(&info.name))
1537 })
1538 .collect::<Result<Vec<_>, _>>()?
1539 .join(", ");
1540 if req.distinct {
1541 format!("DISTINCT {list}")
1542 } else {
1543 list
1544 }
1545 };
1546
1547 let mut params = Params::new();
1548 let clauses: Vec<String> = req
1549 .predicates
1550 .iter()
1551 .map(|p| pred_to_sql(schema, p, &mut params))
1552 .collect::<Result<_, _>>()?;
1553
1554 let table = DatasetSchema::quote_ident(&schema.name);
1555 let where_clause = if clauses.is_empty() {
1556 String::new()
1557 } else {
1558 format!(" WHERE {}", clauses.join(" AND "))
1559 };
1560 let group_clause = match &agg_plan {
1561 Some(p) => format!(
1562 " GROUP BY {}",
1563 p.group_cols
1564 .iter()
1565 .map(|c| DatasetSchema::quote_ident(c))
1566 .collect::<Vec<_>>()
1567 .join(", "),
1568 ),
1569 None => String::new(),
1570 };
1571 let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
1572 Some(s) => format!(" ORDER BY {s}"),
1573 None => String::new(),
1574 };
1575 let sql =
1576 format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{order_clause}{suffix}");
1577 Ok((sql, params.into_values()))
1578}
1579
1580fn build_count_sql(
1581 schema: &DatasetSchema,
1582 predicates: &[Predicate],
1583) -> Result<(String, Vec<ScalarValue>), AppError> {
1584 let mut params = Params::new();
1585 let clauses: Vec<String> = predicates
1586 .iter()
1587 .map(|p| pred_to_sql(schema, p, &mut params))
1588 .collect::<Result<_, _>>()?;
1589 let table = DatasetSchema::quote_ident(&schema.name);
1590 let where_clause = if clauses.is_empty() {
1591 String::new()
1592 } else {
1593 format!(" WHERE {}", clauses.join(" AND "))
1594 };
1595 let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
1596 Ok((sql, params.into_values()))
1597}
1598
1599fn pred_to_sql(
1600 schema: &DatasetSchema,
1601 pred: &Predicate,
1602 params: &mut Params,
1603) -> Result<String, AppError> {
1604 let info = schema.find(&pred.col)?;
1605 let col = DatasetSchema::quote_ident(&info.name);
1606
1607 match pred.op.as_str() {
1608 "is_null" => return Ok(format!("{col} IS NULL")),
1609 "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
1610 _ => {}
1611 }
1612
1613 let val = pred
1614 .val
1615 .as_ref()
1616 .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
1617
1618 if pred.op == "in" {
1619 let items = val
1620 .as_array()
1621 .filter(|a| !a.is_empty())
1622 .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
1623 let placeholders: Vec<String> = items
1624 .iter()
1625 .map(|item| Ok(params.bind(json_to_scalar(item)?)))
1626 .collect::<Result<_, AppError>>()?;
1627 return Ok(format!("{col} IN ({})", placeholders.join(", ")));
1628 }
1629
1630 let sql_op = match pred.op.as_str() {
1631 "eq" => "=",
1632 "neq" => "!=",
1633 "gt" => ">",
1634 "gte" => ">=",
1635 "lt" => "<",
1636 "lte" => "<=",
1637 "like" => "LIKE",
1638 "ilike" => "ILIKE",
1639 other => return Err(AppError::UnknownOperator(other.into())),
1640 };
1641 let placeholder = params.bind(json_to_scalar(val)?);
1642 Ok(format!("{col} {sql_op} {placeholder}"))
1643}
1644
1645fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
1649 match val {
1650 JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
1651 JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
1652 JsonValue::Null => Ok(ScalarValue::Null),
1653 JsonValue::Number(n) => {
1654 if let Some(i) = n.as_i64() {
1655 Ok(ScalarValue::Int64(Some(i)))
1656 } else if let Some(u) = n.as_u64() {
1657 Ok(ScalarValue::UInt64(Some(u)))
1658 } else if let Some(f) = n.as_f64() {
1659 Ok(ScalarValue::Float64(Some(f)))
1660 } else {
1661 Err(AppError::InvalidValue(
1662 "unsupported numeric literal in predicate".into(),
1663 ))
1664 }
1665 }
1666 _ => Err(AppError::InvalidValue(
1667 "unsupported literal type in predicate".into(),
1668 )),
1669 }
1670}
1671
1672fn json_index_key(val: &JsonValue) -> Option<String> {
1677 match val {
1678 JsonValue::String(s) => Some(s.clone()),
1679 JsonValue::Number(n) => Some(n.to_string()),
1680 JsonValue::Bool(b) => Some(b.to_string()),
1681 _ => None,
1682 }
1683}
1684
1685fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1686 let mut out = Vec::new();
1687 let (mut i, mut j) = (0, 0);
1688 while i < a.len() && j < b.len() {
1689 match a[i].cmp(&b[j]) {
1690 Ordering::Equal => {
1691 out.push(a[i]);
1692 i += 1;
1693 j += 1;
1694 }
1695 Ordering::Less => i += 1,
1696 Ordering::Greater => j += 1,
1697 }
1698 }
1699 out
1700}
1701
1702fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1703 let mut out = Vec::with_capacity(a.len() + b.len());
1704 let (mut i, mut j) = (0, 0);
1705 while i < a.len() && j < b.len() {
1706 match a[i].cmp(&b[j]) {
1707 Ordering::Less => {
1708 out.push(a[i]);
1709 i += 1;
1710 }
1711 Ordering::Greater => {
1712 out.push(b[j]);
1713 j += 1;
1714 }
1715 Ordering::Equal => {
1716 out.push(a[i]);
1717 i += 1;
1718 j += 1;
1719 }
1720 }
1721 }
1722 out.extend_from_slice(&a[i..]);
1723 out.extend_from_slice(&b[j..]);
1724 out
1725}
1726
1727fn try_index<'a>(index: &'a EqIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
1728 if predicates.is_empty() || index.is_empty() {
1729 return None;
1730 }
1731
1732 if let [pred] = predicates
1735 && pred.op.as_str() == "eq"
1736 {
1737 let col_lower = pred.col.to_lowercase();
1738 let col_map = index.get(&col_lower)?;
1739 let key = json_index_key(pred.val.as_ref()?)?;
1740 return Some(match col_map.get(&key) {
1741 Some(rows) => Cow::Borrowed(rows.as_slice()),
1742 None => Cow::Owned(Vec::new()),
1743 });
1744 }
1745
1746 let mut result: Option<Vec<u32>> = None;
1747 for pred in predicates {
1748 let col_lower = pred.col.to_lowercase();
1749 let col_map = index.get(&col_lower)?;
1750
1751 let rows: Vec<u32> = match pred.op.as_str() {
1752 "eq" => {
1753 let key = json_index_key(pred.val.as_ref()?)?;
1754 col_map.get(&key).cloned().unwrap_or_default()
1755 }
1756 "in" => {
1757 let items = pred.val.as_ref()?.as_array()?;
1758 let mut merged: Vec<u32> = Vec::new();
1759 for item in items {
1760 if let Some(r) = col_map.get(&json_index_key(item)?) {
1761 merged = union_sorted(&merged, r);
1762 }
1763 }
1764 merged
1765 }
1766 _ => return None,
1767 };
1768
1769 result = Some(match result {
1770 None => rows,
1771 Some(r) => intersect_sorted(&r, &rows),
1772 });
1773 }
1774 result.map(Cow::Owned)
1775}
1776
1777#[doc(hidden)]
1781pub mod bench {
1782 use super::{EqIndex, FastMap, json_index_key, try_index};
1783 use datapress_core::models::Predicate;
1784 use serde_json::Value as JsonValue;
1785 use std::borrow::Cow;
1786
1787 pub struct BenchIndex(EqIndex);
1789
1790 pub fn single_bucket_index(col: &str, val: &JsonValue, rows: Vec<u32>) -> BenchIndex {
1794 let key = json_index_key(val).expect("benchable index key");
1795 let mut col_map: FastMap<String, Vec<u32>> = FastMap::default();
1796 col_map.insert(key, rows);
1797 let mut index: EqIndex = EqIndex::default();
1798 index.insert(col.to_string(), col_map);
1799 BenchIndex(index)
1800 }
1801
1802 pub fn lookup<'a>(idx: &'a BenchIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
1804 try_index(&idx.0, predicates)
1805 }
1806
1807 pub fn lookup_cloning(idx: &BenchIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
1812 let [pred] = predicates else { return None };
1813 if pred.op.as_str() != "eq" {
1814 return None;
1815 }
1816 let col_lower = pred.col.to_lowercase();
1817 let col_map = idx.0.get(&col_lower)?;
1818 let key = json_index_key(pred.val.as_ref()?)?;
1819 Some(col_map.get(&key).cloned().unwrap_or_default())
1820 }
1821}
1822
1823fn slice_global(
1826 chunks: &[RecordBatch],
1827 schema: &Arc<Schema>,
1828 offset: usize,
1829 limit: usize,
1830) -> Result<RecordBatch, AppError> {
1831 if limit == 0 || chunks.is_empty() {
1832 return Ok(RecordBatch::new_empty(schema.clone()));
1833 }
1834 let mut out = Vec::new();
1835 let mut to_skip = offset;
1836 let mut remaining = limit;
1837 for b in chunks {
1838 if remaining == 0 {
1839 break;
1840 }
1841 let n = b.num_rows();
1842 if to_skip >= n {
1843 to_skip -= n;
1844 continue;
1845 }
1846 let take = remaining.min(n - to_skip);
1847 out.push(b.slice(to_skip, take));
1848 to_skip = 0;
1849 remaining -= take;
1850 }
1851 if out.is_empty() {
1852 return Ok(RecordBatch::new_empty(schema.clone()));
1853 }
1854 compute::concat_batches(schema, out.iter()).map_err(AppError::from)
1855}
1856
1857fn take_page(
1862 chunks: &[RecordBatch],
1863 schema: &Arc<Schema>,
1864 rows: &[u32],
1865 offset: usize,
1866 limit: usize,
1867) -> Result<RecordBatch, AppError> {
1868 let start = offset.min(rows.len());
1869 let len = limit.min(rows.len() - start);
1870 if len == 0 || chunks.is_empty() {
1871 return Ok(RecordBatch::new_empty(schema.clone()));
1872 }
1873
1874 let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
1877 let mut acc: u32 = 0;
1878 offsets.push(0);
1879 for b in chunks {
1880 acc = acc
1881 .checked_add(b.num_rows() as u32)
1882 .expect("row count exceeds u32::MAX");
1883 offsets.push(acc);
1884 }
1885
1886 let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
1889 for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
1890 let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
1891 let local = gid - offsets[bi];
1892 buckets[bi].push((out_pos as u32, local));
1893 }
1894
1895 let mut takens: Vec<RecordBatch> = Vec::new();
1897 let mut dest: Vec<u32> = Vec::with_capacity(len);
1898 for (bi, bucket) in buckets.iter().enumerate() {
1899 if bucket.is_empty() {
1900 continue;
1901 }
1902 let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
1903 let cols: Vec<ArrayRef> = chunks[bi]
1904 .columns()
1905 .iter()
1906 .map(|c| {
1907 arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
1908 .map_err(AppError::from)
1909 })
1910 .collect::<Result<_, _>>()?;
1911 takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
1912 dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
1913 }
1914
1915 let stitched = compute::concat_batches(schema, takens.iter())?;
1917 let mut inv = vec![0u32; len];
1918 for (i, &d) in dest.iter().enumerate() {
1919 inv[d as usize] = i as u32;
1920 }
1921 let perm = UInt32Array::from(inv);
1922 let cols: Vec<ArrayRef> = stitched
1923 .columns()
1924 .iter()
1925 .map(|c| {
1926 arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
1927 .map_err(AppError::from)
1928 })
1929 .collect::<Result<_, _>>()?;
1930 RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
1931}
1932
1933fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
1937 use rayon::prelude::*;
1938
1939 if cfg.mode == IndexMode::None || chunks.is_empty() {
1940 return EqIndex::default();
1941 }
1942
1943 let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
1944 Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
1945 } else {
1946 None
1947 };
1948
1949 let max_card = if cfg.mode == IndexMode::Auto {
1950 Some(cfg.max_cardinality)
1951 } else {
1952 None
1953 };
1954
1955 let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
1957 let mut acc: u32 = 0;
1958 for b in chunks {
1959 batch_offsets.push(acc);
1960 acc = acc
1961 .checked_add(b.num_rows() as u32)
1962 .expect("row count exceeds u32::MAX");
1963 }
1964
1965 let schema = chunks[0].schema();
1966
1967 schema
1968 .fields()
1969 .par_iter()
1970 .enumerate()
1971 .filter_map(|(ci, field)| {
1972 let col_lower = field.name().to_lowercase();
1973 if let Some(a) = &allow
1974 && !a.contains_key(&col_lower)
1975 {
1976 return None;
1977 }
1978
1979 let dtype = field.data_type();
1982 let dict_utf8 = matches!(dtype,
1983 DataType::Dictionary(k, v)
1984 if matches!(k.as_ref(), DataType::Int32)
1985 && matches!(v.as_ref(), DataType::Utf8));
1986 match dtype {
1987 DataType::Utf8
1988 | DataType::Utf8View
1989 | DataType::Boolean
1990 | DataType::Int8
1991 | DataType::Int16
1992 | DataType::Int32
1993 | DataType::Int64 => {}
1994 _ if dict_utf8 => {}
1995 _ => return None,
1996 }
1997
1998 let mut map: FastMap<String, Vec<u32>> = FastMap::default();
1999
2000 for (bi, batch) in chunks.iter().enumerate() {
2001 let base = batch_offsets[bi];
2002 let col = batch.column(ci);
2003
2004 macro_rules! index_col {
2005 ($arr_ty:ty) => {{
2006 let arr = col.as_any().downcast_ref::<$arr_ty>()?;
2007 for row in 0..arr.len() {
2008 if arr.is_null(row) {
2009 continue;
2010 }
2011 let key = arr.value(row).to_string();
2012 let gid = base + row as u32;
2013 if let Some(v) = map.get_mut(&key) {
2014 v.push(gid);
2015 } else {
2016 if let Some(mc) = max_card {
2017 if map.len() >= mc {
2018 return None;
2019 }
2020 }
2021 map.insert(key, vec![gid]);
2022 }
2023 }
2024 }};
2025 }
2026
2027 if dict_utf8 {
2028 let arr = col
2035 .as_any()
2036 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
2037 )?;
2038 let keys = arr.keys();
2039 let values = arr.values().as_any().downcast_ref::<StringArray>()?;
2040 for row in 0..arr.len() {
2041 if arr.is_null(row) {
2042 continue;
2043 }
2044 let k = keys.value(row) as usize;
2045 let s = values.value(k);
2046 let gid = base + row as u32;
2047 if let Some(v) = map.get_mut(s) {
2048 v.push(gid);
2049 } else {
2050 if let Some(mc) = max_card
2051 && map.len() >= mc
2052 {
2053 return None;
2054 }
2055 map.insert(s.to_string(), vec![gid]);
2056 }
2057 }
2058 } else {
2059 match dtype {
2060 DataType::Utf8 => index_col!(StringArray),
2061 DataType::Utf8View => index_col!(StringViewArray),
2062 DataType::Boolean => index_col!(BooleanArray),
2063 DataType::Int8 => index_col!(Int8Array),
2064 DataType::Int16 => index_col!(Int16Array),
2065 DataType::Int32 => index_col!(Int32Array),
2066 DataType::Int64 => index_col!(Int64Array),
2067 _ => unreachable!(),
2068 }
2069 }
2070 }
2071
2072 Some((col_lower, map))
2073 })
2074 .collect()
2075}
2076
2077fn writable_inline(dt: &DataType) -> bool {
2090 match dt {
2091 DataType::Utf8
2092 | DataType::LargeUtf8
2093 | DataType::Utf8View
2094 | DataType::Boolean
2095 | DataType::Int8
2096 | DataType::Int16
2097 | DataType::Int32
2098 | DataType::Int64
2099 | DataType::UInt8
2100 | DataType::UInt16
2101 | DataType::UInt32
2102 | DataType::UInt64
2103 | DataType::Float32
2104 | DataType::Float64
2105 | DataType::Decimal128(_, _)
2106 | DataType::Decimal256(_, _) => true,
2107 DataType::Dictionary(k, v)
2108 if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
2109 {
2110 true
2111 }
2112 _ => false,
2113 }
2114}
2115
2116fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
2122 let schema = batch.schema();
2123 let to_cast: Vec<usize> = schema
2124 .fields()
2125 .iter()
2126 .enumerate()
2127 .filter_map(|(i, f)| {
2128 if writable_inline(f.data_type()) {
2129 None
2130 } else {
2131 Some(i)
2132 }
2133 })
2134 .collect();
2135 if to_cast.is_empty() {
2136 return Ok(batch.clone());
2137 }
2138 let new_fields: Vec<Field> = schema
2139 .fields()
2140 .iter()
2141 .enumerate()
2142 .map(|(i, f)| {
2143 if to_cast.contains(&i) {
2144 Field::new(f.name(), DataType::Utf8, f.is_nullable())
2145 } else {
2146 f.as_ref().clone()
2147 }
2148 })
2149 .collect();
2150 let new_schema = Arc::new(Schema::new(new_fields));
2151 let cols: Vec<ArrayRef> = batch
2152 .columns()
2153 .iter()
2154 .enumerate()
2155 .map(|(i, c)| {
2156 if to_cast.contains(&i) {
2157 compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2158 } else {
2159 Ok(c.clone())
2160 }
2161 })
2162 .collect::<Result<_, _>>()?;
2163 RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2164}
2165
2166#[allow(dead_code)]
2172#[derive(Clone, Copy)]
2173enum CmpOp {
2174 Eq,
2175 Neq,
2176 Gt,
2177 Gte,
2178 Lt,
2179 Lte,
2180 Like,
2181 ILike,
2182}
2183
2184#[allow(dead_code)]
2185fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2186 let arr = col
2187 .as_any()
2188 .downcast_ref::<StringArray>()
2189 .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2190 let s = Scalar::new(StringArray::from(vec![val]));
2191 Ok(eq(arr, &s)?)
2192}
2193
2194#[allow(dead_code)]
2195fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2196 macro_rules! num_cmp {
2197 ($arr_type:ty, $cast:ty) => {{
2198 let n = val
2199 .as_f64()
2200 .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2201 as $cast;
2202 let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2203 let s = Scalar::new(<$arr_type>::from(vec![n]));
2204 Ok(match op {
2205 CmpOp::Eq => eq(arr, &s)?,
2206 CmpOp::Neq => neq(arr, &s)?,
2207 CmpOp::Gt => gt(arr, &s)?,
2208 CmpOp::Gte => gt_eq(arr, &s)?,
2209 CmpOp::Lt => lt(arr, &s)?,
2210 CmpOp::Lte => lt_eq(arr, &s)?,
2211 CmpOp::Like | CmpOp::ILike => {
2212 return Err(AppError::InvalidValue(
2213 "LIKE requires a string column".into(),
2214 ));
2215 }
2216 })
2217 }};
2218 }
2219 match col.data_type() {
2220 DataType::Utf8 => {
2221 let s = val
2222 .as_str()
2223 .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
2224 let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
2225 let sc = Scalar::new(StringArray::from(vec![s]));
2226 Ok(match op {
2227 CmpOp::Eq => eq(arr, &sc)?,
2228 CmpOp::Neq => neq(arr, &sc)?,
2229 CmpOp::Gt => gt(arr, &sc)?,
2230 CmpOp::Gte => gt_eq(arr, &sc)?,
2231 CmpOp::Lt => lt(arr, &sc)?,
2232 CmpOp::Lte => lt_eq(arr, &sc)?,
2233 CmpOp::Like => compute::like(arr, &sc)?,
2234 CmpOp::ILike => compute::ilike(arr, &sc)?,
2235 })
2236 }
2237 DataType::Int8 => num_cmp!(Int8Array, i8),
2238 DataType::Int16 => num_cmp!(Int16Array, i16),
2239 DataType::Int32 => num_cmp!(Int32Array, i32),
2240 DataType::Int64 => num_cmp!(Int64Array, i64),
2241 DataType::Float32 => num_cmp!(Float32Array, f32),
2242 DataType::Float64 => num_cmp!(Float64Array, f64),
2243 dt => Err(AppError::InvalidValue(format!(
2244 "unsupported type for comparison: {dt:?}"
2245 ))),
2246 }
2247}
2248
2249pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
2254 let batch = cast_for_serialize(batch)?;
2259 let schema = batch.schema();
2260 let n_rows = batch.num_rows();
2261
2262 let keys: Vec<Vec<u8>> = schema
2263 .fields()
2264 .iter()
2265 .map(|f| {
2266 let mut k = Vec::with_capacity(f.name().len() + 3);
2267 k.push(b'"');
2268 k.extend_from_slice(f.name().as_bytes());
2269 k.extend_from_slice(b"\":");
2270 k
2271 })
2272 .collect();
2273
2274 let encoders: Vec<ColEnc> = batch
2279 .columns()
2280 .iter()
2281 .map(|c| ColEnc::new(c.as_ref()))
2282 .collect();
2283
2284 let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
2285 let mut itoa_buf = itoa::Buffer::new();
2286 let mut ryu_buf = ryu::Buffer::new();
2287 buf.push(b'[');
2288
2289 for row in 0..n_rows {
2290 if row > 0 {
2291 buf.push(b',');
2292 }
2293 buf.push(b'{');
2294 for (i, (key, enc)) in keys.iter().zip(encoders.iter()).enumerate() {
2295 if i > 0 {
2296 buf.push(b',');
2297 }
2298 buf.extend_from_slice(key);
2299 enc.write(&mut buf, row, &mut itoa_buf, &mut ryu_buf);
2300 }
2301 buf.push(b'}');
2302 }
2303
2304 buf.push(b']');
2305 Ok(unsafe { String::from_utf8_unchecked(buf) })
2306}
2307
2308enum ColEnc<'a> {
2313 Utf8(&'a StringArray),
2314 LargeUtf8(&'a LargeStringArray),
2315 Utf8View(&'a StringViewArray),
2316 DictI32Utf8(
2319 &'a arrow::array::DictionaryArray<arrow::datatypes::Int32Type>,
2320 &'a StringArray,
2321 ),
2322 Bool(&'a BooleanArray),
2323 I8(&'a Int8Array),
2324 I16(&'a Int16Array),
2325 I32(&'a Int32Array),
2326 I64(&'a Int64Array),
2327 U8(&'a UInt8Array),
2328 U16(&'a UInt16Array),
2329 U32(&'a UInt32Array),
2330 U64(&'a UInt64Array),
2331 Dec128(&'a Decimal128Array),
2332 Dec256(&'a Decimal256Array),
2333 F32(&'a Float32Array),
2334 F64(&'a Float64Array),
2335 Other(&'a dyn Array),
2337}
2338
2339impl<'a> ColEnc<'a> {
2340 fn new(col: &'a dyn Array) -> ColEnc<'a> {
2341 macro_rules! dc {
2342 ($t:ty) => {
2343 col.as_any().downcast_ref::<$t>().unwrap()
2344 };
2345 }
2346 match col.data_type() {
2347 DataType::Utf8 => ColEnc::Utf8(dc!(StringArray)),
2348 DataType::LargeUtf8 => ColEnc::LargeUtf8(dc!(LargeStringArray)),
2349 DataType::Utf8View => ColEnc::Utf8View(dc!(StringViewArray)),
2350 DataType::Dictionary(key, value)
2351 if matches!(key.as_ref(), DataType::Int32)
2352 && matches!(value.as_ref(), DataType::Utf8) =>
2353 {
2354 let dict = dc!(arrow::array::DictionaryArray<arrow::datatypes::Int32Type>);
2355 let values = dict
2356 .values()
2357 .as_any()
2358 .downcast_ref::<StringArray>()
2359 .unwrap();
2360 ColEnc::DictI32Utf8(dict, values)
2361 }
2362 DataType::Boolean => ColEnc::Bool(dc!(BooleanArray)),
2363 DataType::Int8 => ColEnc::I8(dc!(Int8Array)),
2364 DataType::Int16 => ColEnc::I16(dc!(Int16Array)),
2365 DataType::Int32 => ColEnc::I32(dc!(Int32Array)),
2366 DataType::Int64 => ColEnc::I64(dc!(Int64Array)),
2367 DataType::UInt8 => ColEnc::U8(dc!(UInt8Array)),
2368 DataType::UInt16 => ColEnc::U16(dc!(UInt16Array)),
2369 DataType::UInt32 => ColEnc::U32(dc!(UInt32Array)),
2370 DataType::UInt64 => ColEnc::U64(dc!(UInt64Array)),
2371 DataType::Decimal128(_, _) => ColEnc::Dec128(dc!(Decimal128Array)),
2372 DataType::Decimal256(_, _) => ColEnc::Dec256(dc!(Decimal256Array)),
2373 DataType::Float32 => ColEnc::F32(dc!(Float32Array)),
2374 DataType::Float64 => ColEnc::F64(dc!(Float64Array)),
2375 _ => ColEnc::Other(col),
2376 }
2377 }
2378
2379 #[inline]
2380 fn write(
2381 &self,
2382 buf: &mut Vec<u8>,
2383 row: usize,
2384 itoa_buf: &mut itoa::Buffer,
2385 ryu_buf: &mut ryu::Buffer,
2386 ) {
2387 macro_rules! int {
2388 ($arr:expr) => {{
2389 if $arr.is_null(row) {
2390 buf.extend_from_slice(b"null");
2391 } else {
2392 buf.extend_from_slice(itoa_buf.format($arr.value(row)).as_bytes());
2393 }
2394 }};
2395 }
2396 match self {
2397 ColEnc::Utf8(a) => {
2398 if a.is_null(row) {
2399 buf.extend_from_slice(b"null");
2400 } else {
2401 write_str(buf, a.value(row));
2402 }
2403 }
2404 ColEnc::LargeUtf8(a) => {
2405 if a.is_null(row) {
2406 buf.extend_from_slice(b"null");
2407 } else {
2408 write_str(buf, a.value(row));
2409 }
2410 }
2411 ColEnc::Utf8View(a) => {
2412 if a.is_null(row) {
2413 buf.extend_from_slice(b"null");
2414 } else {
2415 write_str(buf, a.value(row));
2416 }
2417 }
2418 ColEnc::DictI32Utf8(keys, values) => {
2419 if keys.is_null(row) {
2420 buf.extend_from_slice(b"null");
2421 } else {
2422 let k = keys.keys().value(row) as usize;
2423 write_str(buf, values.value(k));
2424 }
2425 }
2426 ColEnc::Bool(a) => {
2427 if a.is_null(row) {
2428 buf.extend_from_slice(b"null");
2429 } else {
2430 buf.extend_from_slice(if a.value(row) { b"true" } else { b"false" });
2431 }
2432 }
2433 ColEnc::I8(a) => int!(a),
2434 ColEnc::I16(a) => int!(a),
2435 ColEnc::I32(a) => int!(a),
2436 ColEnc::I64(a) => int!(a),
2437 ColEnc::U8(a) => int!(a),
2438 ColEnc::U16(a) => int!(a),
2439 ColEnc::U32(a) => int!(a),
2440 ColEnc::U64(a) => int!(a),
2441 ColEnc::Dec128(a) => {
2442 if a.is_null(row) {
2443 buf.extend_from_slice(b"null");
2444 } else {
2445 write_str(buf, &a.value_as_string(row));
2446 }
2447 }
2448 ColEnc::Dec256(a) => {
2449 if a.is_null(row) {
2450 buf.extend_from_slice(b"null");
2451 } else {
2452 write_str(buf, &a.value_as_string(row));
2453 }
2454 }
2455 ColEnc::F32(a) => {
2456 if a.is_null(row) {
2457 buf.extend_from_slice(b"null");
2458 } else {
2459 let v = a.value(row);
2460 if v.is_finite() {
2461 buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2462 } else {
2463 buf.extend_from_slice(b"null");
2464 }
2465 }
2466 }
2467 ColEnc::F64(a) => {
2468 if a.is_null(row) {
2469 buf.extend_from_slice(b"null");
2470 } else {
2471 let v = a.value(row);
2472 if v.is_finite() {
2473 buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2474 } else {
2475 buf.extend_from_slice(b"null");
2476 }
2477 }
2478 }
2479 ColEnc::Other(col) => {
2480 if col.is_null(row) {
2481 buf.extend_from_slice(b"null");
2482 } else {
2483 write_value(buf, *col, row);
2484 }
2485 }
2486 }
2487 }
2488}
2489
2490#[inline]
2491fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
2492 match col.data_type() {
2493 DataType::Utf8 => write_str(
2494 buf,
2495 col.as_any()
2496 .downcast_ref::<StringArray>()
2497 .unwrap()
2498 .value(row),
2499 ),
2500 DataType::LargeUtf8 => write_str(
2501 buf,
2502 col.as_any()
2503 .downcast_ref::<LargeStringArray>()
2504 .unwrap()
2505 .value(row),
2506 ),
2507 DataType::Utf8View => write_str(
2508 buf,
2509 col.as_any()
2510 .downcast_ref::<StringViewArray>()
2511 .unwrap()
2512 .value(row),
2513 ),
2514 DataType::Dictionary(key, value)
2515 if matches!(key.as_ref(), DataType::Int32)
2516 && matches!(value.as_ref(), DataType::Utf8) =>
2517 {
2518 let dict = col
2519 .as_any()
2520 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
2521 .unwrap();
2522 let keys = dict.keys();
2523 let values = dict
2524 .values()
2525 .as_any()
2526 .downcast_ref::<StringArray>()
2527 .unwrap();
2528 let k = keys.value(row) as usize;
2529 write_str(buf, values.value(k));
2530 }
2531 DataType::Boolean => {
2532 let v = col
2533 .as_any()
2534 .downcast_ref::<BooleanArray>()
2535 .unwrap()
2536 .value(row);
2537 buf.extend_from_slice(if v { b"true" } else { b"false" });
2538 }
2539 DataType::Int8 => {
2540 let mut b = itoa::Buffer::new();
2541 buf.extend_from_slice(
2542 b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
2543 .as_bytes(),
2544 );
2545 }
2546 DataType::Int16 => {
2547 let mut b = itoa::Buffer::new();
2548 buf.extend_from_slice(
2549 b.format(
2550 col.as_any()
2551 .downcast_ref::<Int16Array>()
2552 .unwrap()
2553 .value(row),
2554 )
2555 .as_bytes(),
2556 );
2557 }
2558 DataType::Int32 => {
2559 let mut b = itoa::Buffer::new();
2560 buf.extend_from_slice(
2561 b.format(
2562 col.as_any()
2563 .downcast_ref::<Int32Array>()
2564 .unwrap()
2565 .value(row),
2566 )
2567 .as_bytes(),
2568 );
2569 }
2570 DataType::Int64 => {
2571 let mut b = itoa::Buffer::new();
2572 buf.extend_from_slice(
2573 b.format(
2574 col.as_any()
2575 .downcast_ref::<Int64Array>()
2576 .unwrap()
2577 .value(row),
2578 )
2579 .as_bytes(),
2580 );
2581 }
2582 DataType::UInt8 => {
2583 let mut b = itoa::Buffer::new();
2584 buf.extend_from_slice(
2585 b.format(
2586 col.as_any()
2587 .downcast_ref::<UInt8Array>()
2588 .unwrap()
2589 .value(row),
2590 )
2591 .as_bytes(),
2592 );
2593 }
2594 DataType::UInt16 => {
2595 let mut b = itoa::Buffer::new();
2596 buf.extend_from_slice(
2597 b.format(
2598 col.as_any()
2599 .downcast_ref::<UInt16Array>()
2600 .unwrap()
2601 .value(row),
2602 )
2603 .as_bytes(),
2604 );
2605 }
2606 DataType::UInt32 => {
2607 let mut b = itoa::Buffer::new();
2608 buf.extend_from_slice(
2609 b.format(
2610 col.as_any()
2611 .downcast_ref::<UInt32Array>()
2612 .unwrap()
2613 .value(row),
2614 )
2615 .as_bytes(),
2616 );
2617 }
2618 DataType::UInt64 => {
2619 let mut b = itoa::Buffer::new();
2620 buf.extend_from_slice(
2621 b.format(
2622 col.as_any()
2623 .downcast_ref::<UInt64Array>()
2624 .unwrap()
2625 .value(row),
2626 )
2627 .as_bytes(),
2628 );
2629 }
2630 DataType::Decimal128(_, _) => {
2631 let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
2632 write_str(buf, &arr.value_as_string(row));
2633 }
2634 DataType::Decimal256(_, _) => {
2635 let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
2636 write_str(buf, &arr.value_as_string(row));
2637 }
2638 DataType::Float32 => {
2639 let v = col
2640 .as_any()
2641 .downcast_ref::<Float32Array>()
2642 .unwrap()
2643 .value(row);
2644 if v.is_finite() {
2645 let mut b = ryu::Buffer::new();
2646 buf.extend_from_slice(b.format_finite(v).as_bytes());
2647 } else {
2648 buf.extend_from_slice(b"null");
2649 }
2650 }
2651 DataType::Float64 => {
2652 let v = col
2653 .as_any()
2654 .downcast_ref::<Float64Array>()
2655 .unwrap()
2656 .value(row);
2657 if v.is_finite() {
2658 let mut b = ryu::Buffer::new();
2659 buf.extend_from_slice(b.format_finite(v).as_bytes());
2660 } else {
2661 buf.extend_from_slice(b"null");
2662 }
2663 }
2664 other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
2669 }
2670}
2671
2672#[inline]
2673fn write_str(buf: &mut Vec<u8>, s: &str) {
2674 buf.push(b'"');
2675 for &byte in s.as_bytes() {
2676 match byte {
2677 b'"' => buf.extend_from_slice(b"\\\""),
2678 b'\\' => buf.extend_from_slice(b"\\\\"),
2679 b'\n' => buf.extend_from_slice(b"\\n"),
2680 b'\r' => buf.extend_from_slice(b"\\r"),
2681 b'\t' => buf.extend_from_slice(b"\\t"),
2682 0x00..=0x1f => {
2683 buf.extend_from_slice(b"\\u00");
2684 const HEX: &[u8] = b"0123456789abcdef";
2685 buf.push(HEX[(byte >> 4) as usize]);
2686 buf.push(HEX[(byte & 0xf) as usize]);
2687 }
2688 b => buf.push(b),
2689 }
2690 }
2691 buf.push(b'"');
2692}
2693
2694#[async_trait]
2699impl Backend for Store {
2700 fn names(&self) -> Vec<String> {
2701 Store::names(self)
2702 }
2703
2704 fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
2705 let st = self.dataset(name)?;
2706 Ok(DatasetSummary {
2707 name: st.schema.name.clone(),
2708 columns: st.schema.columns.len(),
2709 rows: st.num_rows(),
2710 })
2711 }
2712
2713 fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
2714 let st = self.dataset(name)?;
2715 Ok(Arc::new(st.schema.clone()))
2716 }
2717
2718 fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
2719 let st = self.dataset(name)?;
2720 let mut cols: Vec<String> = st
2723 .schema
2724 .columns
2725 .iter()
2726 .map(|c| c.name.clone())
2727 .filter(|n| st.index.contains_key(n))
2728 .collect();
2729 let mut extras: Vec<String> = st
2732 .index
2733 .keys()
2734 .filter(|n| !cols.iter().any(|c| c == *n))
2735 .cloned()
2736 .collect();
2737 extras.sort();
2738 cols.extend(extras);
2739 Ok(cols)
2740 }
2741
2742 async fn sample(&self, name: &str) -> Result<String, AppError> {
2743 Store::sample(self, name).await
2744 }
2745
2746 async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
2747 Store::query(self, name, req).await
2748 }
2749
2750 async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
2751 Store::query_arrow(self, name, req).await
2752 }
2753
2754 async fn query_arrow_stream(
2755 &self,
2756 name: &str,
2757 req: &QueryRequest,
2758 ) -> Result<ArrowIpcStream, AppError> {
2759 Store::query_arrow_stream(self, name, req).await
2760 }
2761
2762 async fn query_arrow_stream_all(
2763 &self,
2764 name: &str,
2765 req: &QueryRequest,
2766 ) -> Result<ArrowIpcStream, AppError> {
2767 Store::query_arrow_stream_all(self, name, req).await
2768 }
2769
2770 async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
2771 Store::count(self, name, req).await
2772 }
2773
2774 async fn query_sql(&self, sql: &str, max_rows: u64) -> Result<String, AppError> {
2775 Store::query_sql(self, sql, max_rows).await
2776 }
2777
2778 async fn query_sql_arrow_stream(
2779 &self,
2780 sql: &str,
2781 max_rows: u64,
2782 ) -> Result<ArrowIpcStream, AppError> {
2783 Store::query_sql_arrow_stream(self, sql, max_rows).await
2784 }
2785
2786 async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
2787 Store::parquet(self, name).await
2788 }
2789
2790 async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
2791 Store::reload(self, name).await
2792 }
2793}