1use std::borrow::Cow;
2use std::cmp::Ordering;
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5
6use arc_swap::ArcSwap;
7use arrow::array::{
8 Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
9 Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch, Scalar,
10 StringArray, StringViewArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
11};
12use arrow::compute;
13use arrow::compute::kernels::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
14use arrow::datatypes::{DataType, Field, Schema};
15use async_trait::async_trait;
16use parquet::arrow::ProjectionMask;
17use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
18use serde_json::Value as JsonValue;
19
20use datafusion::datasource::file_format::parquet::ParquetFormat;
21use datafusion::datasource::listing::{
22 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
23};
24use datafusion::datasource::{MemTable, TableProvider};
25use datafusion::prelude::SessionContext;
26use datafusion::scalar::ScalarValue;
27
28use object_store::aws::AmazonS3Builder;
29use url::Url;
30
31use datapress_core::backend::{
32 ArrowIpcStream, Backend, DatasetSummary, ReloadStats, arrow_ipc_stream_channel,
33};
34use datapress_core::config::{
35 AddressingStyle, AppConfig, DatasetConfig, IndexConfig, IndexMode, Partitioning, ResolvedCreds,
36 S3Config, SourceKind,
37};
38use datapress_core::errors::AppError;
39use datapress_core::models::{CountRequest, Predicate, QueryRequest};
40use datapress_core::schema::{ColumnInfo, DatasetSchema, LogicalType};
41
42type FastMap<K, V> = HashMap<K, V, ahash::RandomState>;
51
52type EqIndex = FastMap<String, FastMap<String, Vec<u32>>>;
54
55pub struct DatasetState {
70 pub schema: DatasetSchema,
71 pub data: Vec<RecordBatch>,
72 pub arrow_schema: Arc<Schema>,
73 pub index: EqIndex,
74 pub lazy: bool,
75}
76
77impl DatasetState {
78 pub fn num_rows(&self) -> usize {
80 self.data.iter().map(|b| b.num_rows()).sum()
81 }
82}
83
84pub struct Store {
89 ctx: SessionContext,
90 max_page_size: u64,
91 configs: HashMap<String, DatasetConfig>,
94 datasets: ArcSwap<HashMap<String, Arc<DatasetState>>>,
96 reload_locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
99}
100
101impl Store {
102 pub async fn load(cfg: &AppConfig) -> Result<Self, AppError> {
104 if cfg
107 .datasets
108 .iter()
109 .any(|d| d.source.kind == SourceKind::Delta && d.source.is_s3())
110 {
111 deltalake::aws::register_handlers(None);
112 }
113
114 let ctx = SessionContext::new();
115 let mut datasets = HashMap::with_capacity(cfg.datasets.len());
116 let mut configs = HashMap::with_capacity(cfg.datasets.len());
117
118 for d in &cfg.datasets {
119 let (state, provider) = build_dataset(d, &ctx).await?;
120 ctx.register_table(d.name.as_str(), provider)?;
121 datasets.insert(d.name.clone(), Arc::new(state));
122 configs.insert(d.name.clone(), d.clone());
123 }
124 Ok(Self {
125 ctx,
126 max_page_size: cfg.server.max_page_size.max(1),
127 configs,
128 datasets: ArcSwap::from_pointee(datasets),
129 reload_locks: Mutex::new(HashMap::new()),
130 })
131 }
132
133 pub fn names(&self) -> Vec<String> {
135 let snap = self.datasets.load();
136 let mut v: Vec<String> = snap.keys().cloned().collect();
137 v.sort();
138 v
139 }
140
141 pub fn dataset(&self, name: &str) -> Result<Arc<DatasetState>, AppError> {
142 self.datasets
143 .load()
144 .get(name)
145 .cloned()
146 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))
147 }
148
149 pub async fn sample(&self, name: &str) -> Result<String, AppError> {
152 let st = self.dataset(name)?;
153
154 if st.lazy {
156 let table = DatasetSchema::quote_ident(&st.schema.name);
157 let sql = format!("SELECT * FROM {table} LIMIT 1");
158 let df = self.ctx.sql(&sql).await?;
159 let batches = df.collect().await?;
160 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
161 return Ok("null".into());
162 }
163 let arr = serialize(&batches[0].slice(0, 1))?;
164 let trimmed = arr.trim();
165 let inner = trimmed
166 .strip_prefix('[')
167 .and_then(|s| s.strip_suffix(']'))
168 .unwrap_or(trimmed);
169 return Ok(inner.to_string());
170 }
171
172 let first = match st.data.iter().find(|b| b.num_rows() > 0) {
173 Some(b) => b,
174 None => return Ok("null".into()),
175 };
176 let arr = serialize(&first.slice(0, 1))?;
177 let trimmed = arr.trim();
179 let inner = trimmed
180 .strip_prefix('[')
181 .and_then(|s| s.strip_suffix(']'))
182 .unwrap_or(trimmed);
183 Ok(inner.to_string())
184 }
185
186 pub async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
191 let cfg = self
193 .configs
194 .get(name)
195 .ok_or_else(|| AppError::NotFound(format!("dataset: {name}")))?
196 .clone();
197
198 let lock = {
200 let mut locks = self.reload_locks.lock().unwrap();
201 locks
202 .entry(name.to_string())
203 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
204 .clone()
205 };
206 let _guard = lock.lock().await;
207
208 let started = std::time::Instant::now();
209
210 let (state, provider) = build_dataset(&cfg, &self.ctx).await?;
213 let rows = state.num_rows();
214
215 let _ = self.ctx.deregister_table(name)?;
221 self.ctx.register_table(name, provider)?;
222
223 let mut new_map = (**self.datasets.load()).clone();
224 new_map.insert(name.to_string(), Arc::new(state));
225 self.datasets.store(Arc::new(new_map));
226
227 let elapsed_ms = started.elapsed().as_millis();
228 log::info!("reloaded dataset '{name}': {rows} rows in {elapsed_ms} ms");
229 Ok(ReloadStats { rows, elapsed_ms })
230 }
231
232 pub async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
236 let batch = self.query_batch(name, req).await?;
237 if batch.num_rows() == 0 {
238 return Ok("[]".to_string());
239 }
240 serialize(&batch)
241 }
242
243 pub async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
247 let batch = self.query_batch(name, req).await?;
248 let schema = batch.schema();
249 let mut buf = Vec::with_capacity(8 * 1024);
250 {
251 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())?;
252 if batch.num_rows() > 0 {
253 w.write(&batch)?;
254 }
255 w.finish()?;
256 }
257 Ok(buf)
258 }
259
260 pub async fn query_arrow_stream(
261 &self,
262 name: &str,
263 req: &QueryRequest,
264 ) -> Result<ArrowIpcStream, AppError> {
265 let batches = self.query_batches(name, req).await?;
266 Ok(stream_arrow_batches(batches))
267 } pub async fn query_arrow_stream_all(
268 &self,
269 name: &str,
270 req: &QueryRequest,
271 ) -> Result<ArrowIpcStream, AppError> {
272 let batches = self.query_batches_all(name, req).await?;
273 Ok(stream_arrow_batches(batches))
274 }
275
276 pub async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
284 let req = QueryRequest {
286 columns: Vec::new(),
287 predicates: Vec::new(),
288 group_by: Vec::new(),
289 aggregations: Vec::new(),
290 distinct: false,
291 order_by: Vec::new(),
292 limit: None,
293 page: 1,
294 page_size: 1,
295 };
296 let st = self.dataset(name)?;
297 let batches = self.query_batches_all(name, &req).await?;
298 let schema = batches
302 .first()
303 .map(|b| b.schema())
304 .unwrap_or_else(|| st.arrow_schema.clone());
305
306 let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
307 {
308 let props = parquet::file::properties::WriterProperties::builder()
309 .set_compression(parquet::basic::Compression::SNAPPY)
310 .build();
311 let mut writer =
312 parquet::arrow::ArrowWriter::try_new(&mut buf, schema, Some(props))
313 .map_err(|e| AppError::Internal(format!("parquet writer init: {e}")))?;
314 for batch in &batches {
315 if batch.num_rows() > 0 {
316 writer
317 .write(batch)
318 .map_err(|e| AppError::Internal(format!("parquet write: {e}")))?;
319 }
320 }
321 writer
322 .close()
323 .map_err(|e| AppError::Internal(format!("parquet finish: {e}")))?;
324 }
325 Ok(bytes::Bytes::from(buf))
326 }
327
328 async fn query_batch(&self, name: &str, req: &QueryRequest) -> Result<RecordBatch, AppError> {
331 let batches = self.query_batches(name, req).await?;
332 if batches.is_empty() {
333 return Ok(RecordBatch::new_empty(Arc::new(
334 arrow::datatypes::Schema::empty(),
335 )));
336 }
337 if batches.len() == 1 {
338 return Ok(batches.into_iter().next().expect("checked len"));
339 }
340 if batches.iter().all(|b| b.num_rows() == 0) {
341 return Ok(RecordBatch::new_empty(batches[0].schema()));
342 }
343 let batch = compute::concat_batches(&batches[0].schema(), batches.iter())?;
344 Ok(batch)
345 }
346
347 async fn query_batches(
351 &self,
352 name: &str,
353 req: &QueryRequest,
354 ) -> Result<Vec<RecordBatch>, AppError> {
355 let st = self.dataset(name)?;
356
357 let page = req.page.max(1);
358 let page_size = req.page_size.clamp(1, self.max_page_size);
359 let offset = ((page - 1) * page_size) as usize;
360 let limit = page_size as usize;
361
362 self.query_batches_inner(st, req, Some((offset, limit)))
363 .await
364 }
365
366 async fn query_batches_all(
370 &self,
371 name: &str,
372 req: &QueryRequest,
373 ) -> Result<Vec<RecordBatch>, AppError> {
374 let st = self.dataset(name)?;
375 self.query_batches_inner(st, req, None).await
376 }
377
378 async fn query_batches_inner(
379 &self,
380 st: Arc<DatasetState>,
381 req: &QueryRequest,
382 page_window: Option<(usize, usize)>,
383 ) -> Result<Vec<RecordBatch>, AppError> {
384 let (offset, limit) = page_window.unwrap_or((0, req.limit.unwrap_or(u64::MAX) as usize));
385
386 let can_fast_path = !st.lazy
393 && req.order_by.is_empty()
394 && (page_window.is_none() || req.limit.is_none())
395 && req.group_by.is_empty()
396 && !req.distinct;
397
398 if can_fast_path {
399 let total = st.num_rows();
400
401 if req.predicates.is_empty() {
404 if page_window.is_none() && req.limit.is_none() {
405 return st
406 .data
407 .iter()
408 .cloned()
409 .map(|batch| project(&st.schema, batch, &req.columns))
410 .collect();
411 }
412 let start = offset.min(total);
413 let len = limit.min(total - start);
414 let batch = slice_global(&st.data, &st.arrow_schema, start, len)?;
415 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
416 }
417
418 if let Some(rows) = try_index(&st.index, &req.predicates) {
421 let batch = take_page(&st.data, &st.arrow_schema, &rows, offset, limit)?;
422 return Ok(vec![project(&st.schema, batch, &req.columns)?]);
423 }
424 }
425
426 let (sql, params) = match page_window {
428 Some(_) => build_query_sql(&st.schema, req, self.max_page_size)?,
429 None => build_query_stream_sql(&st.schema, req)?,
430 };
431 let mut df = self.ctx.sql(&sql).await?;
432 if !params.is_empty() {
433 df = df.with_param_values(params)?;
434 }
435 let batches = df.collect().await?;
436 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
437 let schema = batches
438 .first()
439 .map(|b| b.schema())
440 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
441 return Ok(vec![RecordBatch::new_empty(schema)]);
442 }
443 Ok(batches)
444 }
445}
446
447fn stream_arrow_batches(batches: Vec<RecordBatch>) -> ArrowIpcStream {
448 let schema = batches
449 .first()
450 .map(|batch| batch.schema())
451 .unwrap_or_else(|| Arc::new(arrow::datatypes::Schema::empty()));
452 let (mut writer, stream) = arrow_ipc_stream_channel(8);
453
454 tokio::task::spawn_blocking(move || {
455 let result = (|| -> Result<(), AppError> {
456 let mut w = arrow::ipc::writer::StreamWriter::try_new(&mut writer, schema.as_ref())?;
457 for batch in batches {
458 if batch.num_rows() > 0 {
459 w.write(&batch)?;
460 }
461 }
462 w.finish()?;
463 Ok(())
464 })();
465 if let Err(err) = result {
466 log::error!("datafusion arrow stream failed: {err}");
467 writer.send_error(err);
468 }
469 });
470
471 stream
472}
473
474impl Store {
475 pub async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
479 let st = self.dataset(name)?;
480
481 if !st.lazy {
482 if req.predicates.is_empty() {
484 return Ok(st.num_rows() as i64);
485 }
486 if let Some(rows) = try_index(&st.index, &req.predicates) {
488 return Ok(rows.len() as i64);
489 }
490 }
491
492 let (sql, params) = build_count_sql(&st.schema, &req.predicates)?;
495 let mut df = self.ctx.sql(&sql).await?;
496 if !params.is_empty() {
497 df = df.with_param_values(params)?;
498 }
499 let batches = df.collect().await?;
500 let n = batches
501 .first()
502 .and_then(|b| {
503 b.column(0)
504 .as_any()
505 .downcast_ref::<arrow::array::Int64Array>()
506 })
507 .filter(|a| !a.is_empty())
508 .map(|a| a.value(0))
509 .unwrap_or(0);
510 Ok(n)
511 }
512}
513
514async fn build_dataset(
519 d: &DatasetConfig,
520 ctx: &SessionContext,
521) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
522 if d.lazy {
528 match (d.source.kind, d.source.is_s3()) {
529 (SourceKind::Parquet, false) => return build_lazy_local_parquet(d, ctx).await,
530 (SourceKind::Parquet, true) => return build_lazy_s3_parquet(d, ctx).await,
531 (SourceKind::Delta, _) => {
532 return Err(AppError::Internal(format!(
533 "dataset '{}': lazy mode is not supported for delta sources",
534 d.name
535 )));
536 }
537 }
538 }
539
540 let raw_batches: Vec<RecordBatch> = match (d.source.kind, d.source.is_s3()) {
545 (SourceKind::Parquet, false) => read_local_parquet(d)?,
546 (SourceKind::Parquet, true) => read_s3_parquet(d, ctx).await?,
547 (SourceKind::Delta, false) => read_delta(d, HashMap::new()).await?,
548 (SourceKind::Delta, true) => read_delta(d, delta_s3_options(d)?).await?,
549 };
550 if raw_batches.is_empty() {
551 return Err(AppError::Internal(format!(
552 "dataset '{}': source produced no batches",
553 d.name
554 )));
555 }
556
557 let chunks = raw_batches;
558 let arrow_sch = chunks[0].schema();
559
560 let columns: Vec<ColumnInfo> = arrow_sch
562 .fields()
563 .iter()
564 .map(|f| {
565 let dt = f.data_type();
566 ColumnInfo {
567 name: f.name().clone(),
568 logical: arrow_to_logical(dt),
569 sql_type: format!("{dt:?}"),
570 nullable: f.is_nullable(),
571 }
572 })
573 .collect();
574 let schema = DatasetSchema::new(&d.name, columns);
575
576 let index = build_eq_index_with_policy(&chunks, &d.index);
581
582 let n_parts = std::thread::available_parallelism()
587 .map(|n| n.get())
588 .unwrap_or(4);
589 let mut parts: Vec<Vec<RecordBatch>> = (0..n_parts).map(|_| Vec::new()).collect();
590 for (i, b) in chunks.iter().enumerate() {
591 if b.num_rows() == 0 {
592 continue;
593 }
594 parts[i % n_parts].push(b.clone());
595 }
596 parts.retain(|p| !p.is_empty());
597 let provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(arrow_sch.clone(), parts)?);
598
599 let total_rows: usize = chunks.iter().map(|b| b.num_rows()).sum();
600 let mem_mb: usize = chunks
601 .iter()
602 .flat_map(|b| b.columns().iter())
603 .map(|c| c.get_buffer_memory_size())
604 .sum::<usize>()
605 / 1_048_576;
606 log::info!(
607 "dataset '{}' [{}]: {} rows, {} cols, {} MB, {} chunks, {} indexed cols",
608 d.name,
609 d.source.kind.as_str(),
610 total_rows,
611 schema.columns.len(),
612 mem_mb,
613 chunks.len(),
614 index.len()
615 );
616
617 Ok((
618 DatasetState {
619 schema,
620 data: chunks,
621 arrow_schema: arrow_sch,
622 index,
623 lazy: false,
624 },
625 provider,
626 ))
627}
628
629async fn build_lazy_local_parquet(
634 d: &DatasetConfig,
635 ctx: &SessionContext,
636) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
637 let (url, part_keys) = lazy_local_listing(d)?;
638
639 let mut opts =
640 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
641 if !part_keys.is_empty() {
642 opts = opts.with_table_partition_cols(
643 part_keys
644 .iter()
645 .map(|k| (k.clone(), DataType::Utf8))
646 .collect(),
647 );
648 }
649
650 let session_state = ctx.state();
651 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
654 AppError::Internal(format!("dataset '{}': infer parquet schema: {e}", d.name))
655 })?;
656
657 let cfg = ListingTableConfig::new(url)
658 .with_listing_options(opts)
659 .with_schema(file_schema.clone());
660 let table = ListingTable::try_new(cfg).map_err(|e| {
661 AppError::Internal(format!("dataset '{}': ListingTable::try_new: {e}", d.name))
662 })?;
663 let provider: Arc<dyn TableProvider> = Arc::new(table);
664
665 let mut fields: Vec<Field> = file_schema
667 .fields()
668 .iter()
669 .map(|f| f.as_ref().clone())
670 .collect();
671 for k in &part_keys {
672 if !fields.iter().any(|f| f.name() == k) {
673 fields.push(Field::new(k, DataType::Utf8, false));
674 }
675 }
676 let arrow_sch = Arc::new(Schema::new(fields));
677
678 let columns: Vec<ColumnInfo> = arrow_sch
679 .fields()
680 .iter()
681 .map(|f| {
682 let dt = f.data_type();
683 ColumnInfo {
684 name: f.name().clone(),
685 logical: arrow_to_logical(dt),
686 sql_type: format!("{dt:?}"),
687 nullable: f.is_nullable(),
688 }
689 })
690 .collect();
691 let schema = DatasetSchema::new(&d.name, columns);
692
693 log::info!(
694 "dataset '{}' [{}, lazy]: {} cols ({} partition), no materialise, no index",
695 d.name,
696 d.source.kind.as_str(),
697 schema.columns.len(),
698 part_keys.len()
699 );
700
701 Ok((
702 DatasetState {
703 schema,
704 data: Vec::new(),
705 arrow_schema: arrow_sch,
706 index: EqIndex::default(),
707 lazy: true,
708 },
709 provider,
710 ))
711}
712
713fn lazy_local_listing(d: &DatasetConfig) -> Result<(ListingTableUrl, Vec<String>), AppError> {
718 let loc = &d.source.location;
719
720 if loc.contains('*') || loc.contains('?') || loc.contains('[') {
721 let parts: Vec<&str> = loc.split('/').collect();
722 let first_wild = parts
723 .iter()
724 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
725 .unwrap_or(parts.len());
726 let base = parts[..first_wild].join("/");
727 let base = if base.is_empty() {
728 "/".to_string()
729 } else {
730 base
731 };
732 let upper = parts.len().saturating_sub(1);
735 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
736 .iter()
737 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
738 .filter(|k| !k.is_empty())
739 .collect();
740 return Ok((dir_url(std::path::Path::new(&base), d)?, keys));
741 }
742
743 let path = std::path::Path::new(loc);
744 if path.is_dir() {
745 let keys = discover_hive_keys(path);
746 return Ok((dir_url(path, d)?, keys));
747 }
748
749 let url = ListingTableUrl::parse(loc)
750 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{loc}': {e}", d.name)))?;
751 Ok((url, Vec::new()))
752}
753
754fn dir_url(path: &std::path::Path, d: &DatasetConfig) -> Result<ListingTableUrl, AppError> {
757 let s = path.to_str().ok_or_else(|| {
758 AppError::Internal(format!(
759 "dataset '{}': non-utf8 path {}",
760 d.name,
761 path.display()
762 ))
763 })?;
764 let s = if s.ends_with('/') {
765 s.to_string()
766 } else {
767 format!("{s}/")
768 };
769 ListingTableUrl::parse(&s)
770 .map_err(|e| AppError::Internal(format!("dataset '{}': bad url '{s}': {e}", d.name)))
771}
772
773fn discover_hive_keys(base: &std::path::Path) -> Vec<String> {
777 let mut keys = Vec::new();
778 let mut cur = base.to_path_buf();
779 loop {
780 let Ok(rd) = std::fs::read_dir(&cur) else {
781 break;
782 };
783 let mut next: Option<(String, std::path::PathBuf)> = None;
784 for entry in rd.flatten() {
785 let p = entry.path();
786 if !p.is_dir() {
787 continue;
788 }
789 let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
790 continue;
791 };
792 if let Some((k, v)) = name.split_once('=')
793 && !k.is_empty()
794 && !v.is_empty()
795 {
796 next = Some((k.to_string(), p));
797 break;
798 }
799 }
800 match next {
801 Some((k, p)) => {
802 keys.push(k);
803 cur = p;
804 }
805 None => break,
806 }
807 }
808 keys
809}
810
811async fn build_lazy_s3_parquet(
817 d: &DatasetConfig,
818 ctx: &SessionContext,
819) -> Result<(DatasetState, Arc<dyn TableProvider>), AppError> {
820 register_s3_object_store(d, ctx)?;
821
822 let (provider, file_schema, part_keys) = build_s3_listing_table(d, ctx).await?;
823
824 let mut fields: Vec<Field> = file_schema
826 .fields()
827 .iter()
828 .map(|f| f.as_ref().clone())
829 .collect();
830 for k in &part_keys {
831 if !fields.iter().any(|f| f.name() == k) {
832 fields.push(Field::new(k, DataType::Utf8, false));
833 }
834 }
835 let arrow_sch = Arc::new(Schema::new(fields));
836
837 let columns: Vec<ColumnInfo> = arrow_sch
838 .fields()
839 .iter()
840 .map(|f| {
841 let dt = f.data_type();
842 ColumnInfo {
843 name: f.name().clone(),
844 logical: arrow_to_logical(dt),
845 sql_type: format!("{dt:?}"),
846 nullable: f.is_nullable(),
847 }
848 })
849 .collect();
850 let schema = DatasetSchema::new(&d.name, columns);
851
852 log::info!(
853 "dataset '{}' [{}, lazy, s3]: {} cols ({} partition, no materialise, no index)",
854 d.name,
855 d.source.kind.as_str(),
856 schema.columns.len(),
857 part_keys.len()
858 );
859
860 Ok((
861 DatasetState {
862 schema,
863 data: Vec::new(),
864 arrow_schema: arrow_sch,
865 index: EqIndex::default(),
866 lazy: true,
867 },
868 provider,
869 ))
870}
871
872async fn build_s3_listing_table(
878 d: &DatasetConfig,
879 ctx: &SessionContext,
880) -> Result<(Arc<dyn TableProvider>, Arc<Schema>, Vec<String>), AppError> {
881 let (url, part_keys) = s3_listing(d, ctx).await?;
882
883 let mut opts =
884 ListingOptions::new(Arc::new(ParquetFormat::default())).with_file_extension(".parquet");
885 if !part_keys.is_empty() {
886 opts = opts.with_table_partition_cols(
887 part_keys
888 .iter()
889 .map(|k| (k.clone(), DataType::Utf8))
890 .collect(),
891 );
892 }
893
894 let session_state = ctx.state();
895 let file_schema = opts.infer_schema(&session_state, &url).await.map_err(|e| {
896 AppError::Internal(format!(
897 "dataset '{}': infer parquet schema on s3: {e}",
898 d.name
899 ))
900 })?;
901
902 let cfg = ListingTableConfig::new(url)
903 .with_listing_options(opts)
904 .with_schema(file_schema.clone());
905 let table = ListingTable::try_new(cfg).map_err(|e| {
906 AppError::Internal(format!(
907 "dataset '{}': ListingTable::try_new (s3): {e}",
908 d.name
909 ))
910 })?;
911 Ok((Arc::new(table), file_schema, part_keys))
912}
913
914async fn s3_listing(
920 d: &DatasetConfig,
921 ctx: &SessionContext,
922) -> Result<(ListingTableUrl, Vec<String>), AppError> {
923 let s3 = d.s3.clone().unwrap_or_default();
924 let want_partitions = !matches!(s3.partitioning, Partitioning::None);
925 let loc = &d.source.location;
926
927 if d.source.has_glob() {
928 let (base, keys) = split_glob_base_keys(loc);
929 let base = format!("{}/", base.trim_end_matches('/'));
930 let url = ListingTableUrl::parse(&base).map_err(|e| {
931 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
932 })?;
933 let keys = if want_partitions { keys } else { Vec::new() };
934 return Ok((url, keys));
935 }
936
937 let base = if loc.ends_with('/') {
938 loc.clone()
939 } else {
940 format!("{loc}/")
941 };
942 let url = ListingTableUrl::parse(&base).map_err(|e| {
943 AppError::Internal(format!("dataset '{}': bad s3 url '{base}': {e}", d.name))
944 })?;
945 let keys = if want_partitions {
946 discover_s3_hive_keys(ctx, &url).await
947 } else {
948 Vec::new()
949 };
950 Ok((url, keys))
951}
952
953fn split_glob_base_keys(loc: &str) -> (String, Vec<String>) {
957 let parts: Vec<&str> = loc.split('/').collect();
958 let first_wild = parts
959 .iter()
960 .position(|c| c.contains('*') || c.contains('?') || c.contains('['))
961 .unwrap_or(parts.len());
962 let base = parts[..first_wild].join("/");
963 let base = if base.is_empty() {
964 "/".to_string()
965 } else {
966 base
967 };
968 let upper = parts.len().saturating_sub(1);
969 let keys: Vec<String> = parts[first_wild.min(upper)..upper]
970 .iter()
971 .filter_map(|c| c.split_once('=').map(|(k, _)| k.to_string()))
972 .filter(|k| !k.is_empty())
973 .collect();
974 (base, keys)
975}
976
977async fn discover_s3_hive_keys(ctx: &SessionContext, url: &ListingTableUrl) -> Vec<String> {
982 let store = match ctx.runtime_env().object_store(url.object_store()) {
983 Ok(s) => s,
984 Err(_) => return Vec::new(),
985 };
986 let mut keys = Vec::new();
987 let mut prefix = url.prefix().clone();
988 loop {
989 let listing = match store.list_with_delimiter(Some(&prefix)).await {
990 Ok(l) => l,
991 Err(_) => break,
992 };
993 let mut next: Option<object_store::path::Path> = None;
994 for cp in &listing.common_prefixes {
995 if let Some(seg) = cp.parts().next_back() {
996 let seg = seg.as_ref().to_string();
997 if let Some((k, v)) = seg.split_once('=')
998 && !k.is_empty()
999 && !v.is_empty()
1000 {
1001 keys.push(k.to_string());
1002 next = Some(cp.clone());
1003 break;
1004 }
1005 }
1006 }
1007 match next {
1008 Some(p) => prefix = p,
1009 None => break,
1010 }
1011 }
1012 keys
1013}
1014
1015fn read_local_parquet(d: &DatasetConfig) -> Result<Vec<RecordBatch>, AppError> {
1032 let files = d.resolve_local_parquet_files()?;
1033 let mut all = Vec::new();
1034 let wanted: Option<std::collections::HashSet<String>> = if d.columns.is_empty() {
1035 None
1036 } else {
1037 Some(d.columns.iter().map(|c| c.to_lowercase()).collect())
1038 };
1039
1040 for f in &files {
1041 let file = std::fs::File::open(f)
1042 .map_err(|e| AppError::Internal(format!("open {}: {e}", f.display())))?;
1043
1044 let probe = ParquetRecordBatchReaderBuilder::try_new(
1049 file.try_clone()
1050 .map_err(|e| AppError::Internal(format!("dup fd {}: {e}", f.display())))?,
1051 )?;
1052 let parquet_schema = probe.parquet_schema().clone();
1053 let arrow_schema = probe.schema().clone();
1054 let metadata = probe.metadata().clone();
1055 drop(probe);
1056
1057 let projection = if let Some(w) = &wanted {
1059 let indices: Vec<usize> = arrow_schema
1060 .fields()
1061 .iter()
1062 .enumerate()
1063 .filter(|(_, fld)| w.contains(&fld.name().to_lowercase()))
1064 .map(|(i, _)| i)
1065 .collect();
1066 if indices.is_empty() {
1067 return Err(AppError::Internal(format!(
1068 "dataset '{}': no columns from `columns = {:?}` match parquet schema for {}",
1069 d.name,
1070 d.columns,
1071 f.display()
1072 )));
1073 }
1074 ProjectionMask::roots(&parquet_schema, indices)
1075 } else {
1076 ProjectionMask::all()
1077 };
1078
1079 let mut new_fields: Vec<Field> = arrow_schema
1087 .fields()
1088 .iter()
1089 .map(|f| f.as_ref().clone())
1090 .collect();
1091 if d.dict_encode
1092 && let Some(rg0) = metadata.row_groups().first()
1093 {
1094 for (i, fld) in arrow_schema.fields().iter().enumerate() {
1095 if !matches!(
1096 fld.data_type(),
1097 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1098 ) {
1099 continue;
1100 }
1101 if let Some(col) = rg0.columns().get(i)
1102 && col.dictionary_page_offset().is_some()
1103 {
1104 new_fields[i] = Field::new(
1105 fld.name(),
1106 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1107 fld.is_nullable(),
1108 );
1109 }
1110 }
1111 }
1112 let forced_schema = Arc::new(Schema::new(new_fields));
1113
1114 let opts = ArrowReaderOptions::new().with_schema(forced_schema);
1115 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, opts)?
1116 .with_batch_size(65_536)
1117 .with_projection(projection)
1118 .build()?;
1119 let pairs = hive_pairs(f);
1123 for batch in reader {
1124 let batch = batch.map_err(|e| AppError::Internal(e.to_string()))?;
1125 all.push(if pairs.is_empty() {
1126 batch
1127 } else {
1128 append_partition_cols(&batch, &pairs)?
1129 });
1130 }
1131 }
1132 if all.is_empty() {
1133 return Err(AppError::Internal(format!(
1134 "dataset '{}': parquet source is empty",
1135 d.name
1136 )));
1137 }
1138 Ok(all)
1139}
1140
1141fn hive_pairs(path: &std::path::Path) -> Vec<(String, String)> {
1144 path.components()
1145 .filter_map(|c| c.as_os_str().to_str())
1146 .filter_map(|seg| {
1147 let (k, v) = seg.split_once('=')?;
1148 if k.is_empty() || v.is_empty() || v.contains('=') {
1149 return None;
1150 }
1151 Some((k.to_string(), v.to_string()))
1152 })
1153 .collect()
1154}
1155
1156fn append_partition_cols(
1159 batch: &RecordBatch,
1160 pairs: &[(String, String)],
1161) -> Result<RecordBatch, AppError> {
1162 let n = batch.num_rows();
1163 let mut fields: Vec<Field> = batch
1164 .schema()
1165 .fields()
1166 .iter()
1167 .map(|f| f.as_ref().clone())
1168 .collect();
1169 let mut cols: Vec<ArrayRef> = batch.columns().to_vec();
1170 for (k, v) in pairs {
1171 if fields.iter().any(|f| f.name() == k) {
1172 continue;
1173 }
1174 fields.push(Field::new(k, DataType::Utf8, false));
1175 cols.push(Arc::new(StringArray::from(vec![v.as_str(); n])));
1176 }
1177 RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)
1178 .map_err(|e| AppError::Internal(e.to_string()))
1179}
1180
1181async fn read_s3_parquet(
1187 d: &DatasetConfig,
1188 ctx: &SessionContext,
1189) -> Result<Vec<RecordBatch>, AppError> {
1190 register_s3_object_store(d, ctx)?;
1191 let (provider, _file_schema, _keys) = build_s3_listing_table(d, ctx).await?;
1192 let df = ctx
1193 .read_table(provider)
1194 .map_err(|e| AppError::Internal(format!("dataset '{}': s3 read_table: {e}", d.name)))?;
1195 Ok(df.collect().await?)
1196}
1197
1198async fn read_delta(
1202 d: &DatasetConfig,
1203 opts: HashMap<String, String>,
1204) -> Result<Vec<RecordBatch>, AppError> {
1205 let url = deltalake::ensure_table_uri(&d.source.location).map_err(|e| {
1206 AppError::Internal(format!(
1207 "dataset '{}': bad delta location '{}': {e}",
1208 d.name, d.source.location
1209 ))
1210 })?;
1211 let table = deltalake::open_table_with_storage_options(url, opts)
1212 .await
1213 .map_err(|e| {
1214 AppError::Internal(format!(
1215 "dataset '{}': delta open '{}': {e}",
1216 d.name, d.source.location
1217 ))
1218 })?;
1219 let provider = table.table_provider().await.map_err(|e| {
1220 AppError::Internal(format!("dataset '{}': delta table_provider: {e}", d.name))
1221 })?;
1222 let scan_ctx = SessionContext::new();
1225 let df = scan_ctx
1226 .read_table(provider)
1227 .map_err(|e| AppError::Internal(format!("dataset '{}': delta read_table: {e}", d.name)))?;
1228 Ok(df.collect().await?)
1229}
1230
1231fn delta_s3_options(d: &DatasetConfig) -> Result<HashMap<String, String>, AppError> {
1235 let creds = d.resolved_creds();
1236 let region = d.resolved_region();
1237 let s3 = d.s3.clone().unwrap_or_default();
1238 let (bucket, _) = d.source.s3_bucket()?;
1239
1240 let mut opts = HashMap::new();
1241 opts.insert("AWS_REGION".into(), region);
1242 if let Some(ep) = s3.effective_endpoint(bucket) {
1243 opts.insert("AWS_ENDPOINT_URL".into(), ep);
1244 }
1245 if s3.allow_http {
1246 opts.insert("AWS_ALLOW_HTTP".into(), "true".into());
1247 }
1248 opts.insert(
1249 "AWS_VIRTUAL_HOSTED_STYLE_REQUEST".into(),
1250 (s3.addressing_style == AddressingStyle::Virtual).to_string(),
1251 );
1252 if let Some(k) = creds.access_key_id {
1253 opts.insert("AWS_ACCESS_KEY_ID".into(), k);
1254 }
1255 if let Some(s) = creds.secret_access_key {
1256 opts.insert("AWS_SECRET_ACCESS_KEY".into(), s);
1257 }
1258 if let Some(t) = creds.session_token {
1259 opts.insert("AWS_SESSION_TOKEN".into(), t);
1260 }
1261 opts.insert("AWS_S3_ALLOW_UNSAFE_RENAME".into(), "true".into());
1263 Ok(opts)
1264}
1265
1266fn register_s3_object_store(d: &DatasetConfig, ctx: &SessionContext) -> Result<(), AppError> {
1270 let (bucket, _key) = d.source.s3_bucket()?;
1271 let creds = d.resolved_creds();
1272 let region = d.resolved_region();
1273 let s3 = d.s3.clone().unwrap_or_default();
1274
1275 let store = build_s3(bucket, ®ion, &s3, &creds).map_err(|e| {
1276 AppError::Internal(format!(
1277 "dataset '{}': build S3 store for '{bucket}': {e}",
1278 d.name
1279 ))
1280 })?;
1281
1282 let url = Url::parse(&format!("s3://{bucket}"))
1283 .map_err(|e| AppError::Internal(format!("invalid s3 URL for bucket {bucket}: {e}")))?;
1284 ctx.register_object_store(&url, Arc::new(store));
1285 Ok(())
1286}
1287
1288fn build_s3(
1289 bucket: &str,
1290 region: &str,
1291 s3: &S3Config,
1292 creds: &ResolvedCreds,
1293) -> Result<object_store::aws::AmazonS3, object_store::Error> {
1294 let mut b = AmazonS3Builder::new()
1295 .with_bucket_name(bucket)
1296 .with_region(region)
1297 .with_allow_http(s3.allow_http)
1298 .with_virtual_hosted_style_request(s3.addressing_style == AddressingStyle::Virtual);
1299 if let Some(ep) = s3.effective_endpoint(bucket) {
1300 b = b.with_endpoint(ep);
1301 }
1302 if let Some(k) = creds.access_key_id.as_deref() {
1303 b = b.with_access_key_id(k);
1304 }
1305 if let Some(s) = creds.secret_access_key.as_deref() {
1306 b = b.with_secret_access_key(s);
1307 }
1308 if let Some(t) = creds.session_token.as_deref() {
1309 b = b.with_token(t);
1310 }
1311 b.build()
1312}
1313
1314fn arrow_to_logical(dt: &DataType) -> LogicalType {
1315 match dt {
1316 DataType::Boolean => LogicalType::Bool,
1317 DataType::Int8
1318 | DataType::Int16
1319 | DataType::Int32
1320 | DataType::Int64
1321 | DataType::UInt8
1322 | DataType::UInt16
1323 | DataType::UInt32
1324 | DataType::UInt64 => LogicalType::Int,
1325 DataType::Float16 | DataType::Float32 | DataType::Float64 => LogicalType::Float,
1326 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => LogicalType::Utf8,
1327 DataType::Dictionary(_, v)
1331 if matches!(
1332 v.as_ref(),
1333 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1334 ) =>
1335 {
1336 LogicalType::Utf8
1337 }
1338 DataType::Date32
1339 | DataType::Date64
1340 | DataType::Time32(_)
1341 | DataType::Time64(_)
1342 | DataType::Timestamp(_, _)
1343 | DataType::Duration(_)
1344 | DataType::Interval(_) => LogicalType::Temporal,
1345 _ => LogicalType::Other,
1346 }
1347}
1348
1349fn project(
1354 schema: &DatasetSchema,
1355 batch: RecordBatch,
1356 columns: &[String],
1357) -> Result<RecordBatch, AppError> {
1358 if columns.is_empty() {
1359 return Ok(batch);
1360 }
1361 let indices: Vec<usize> = columns
1362 .iter()
1363 .map(|c| {
1364 schema
1365 .find(c)
1366 .map(|info| schema.by_name[&info.name.to_lowercase()])
1367 })
1368 .collect::<Result<_, _>>()?;
1369 let fields: Vec<Field> = indices
1370 .iter()
1371 .map(|&i| batch.schema().field(i).clone())
1372 .collect();
1373 let cols: Vec<ArrayRef> = indices.iter().map(|&i| batch.column(i).clone()).collect();
1374 Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), cols)?)
1375}
1376
1377#[derive(Default)]
1390struct Params {
1391 values: Vec<ScalarValue>,
1392}
1393
1394impl Params {
1395 fn new() -> Self {
1396 Self::default()
1397 }
1398
1399 fn bind(&mut self, v: ScalarValue) -> String {
1401 self.values.push(v);
1402 format!("${}", self.values.len())
1403 }
1404
1405 fn into_values(self) -> Vec<ScalarValue> {
1406 self.values
1407 }
1408}
1409
1410fn build_query_sql(
1411 schema: &DatasetSchema,
1412 req: &QueryRequest,
1413 max_page_size: u64,
1414) -> Result<(String, Vec<ScalarValue>), AppError> {
1415 let (limit, offset) = req.effective_limit_offset(max_page_size);
1416 build_query_sql_with_suffix(schema, req, &format!(" LIMIT {limit} OFFSET {offset}"))
1417}
1418
1419fn build_query_stream_sql(
1420 schema: &DatasetSchema,
1421 req: &QueryRequest,
1422) -> Result<(String, Vec<ScalarValue>), AppError> {
1423 let suffix = req
1424 .limit
1425 .map(|limit| format!(" LIMIT {limit}"))
1426 .unwrap_or_default();
1427 build_query_sql_with_suffix(schema, req, &suffix)
1428}
1429
1430fn build_query_sql_with_suffix(
1431 schema: &DatasetSchema,
1432 req: &QueryRequest,
1433 suffix: &str,
1434) -> Result<(String, Vec<ScalarValue>), AppError> {
1435 let agg_plan = req.agg_plan(schema)?;
1436
1437 let cols = if let Some(plan) = &agg_plan {
1438 let mut parts: Vec<String> = plan
1440 .group_cols
1441 .iter()
1442 .map(|c| DatasetSchema::quote_ident(c))
1443 .collect();
1444 for a in &plan.aggs {
1445 let expr = a.sql_expr()?;
1446 parts.push(format!(
1447 "{expr} AS {}",
1448 DatasetSchema::quote_ident(&a.alias)
1449 ));
1450 }
1451 parts.join(", ")
1452 } else if req.columns.is_empty() {
1453 if req.distinct {
1454 "DISTINCT *".to_string()
1455 } else {
1456 "*".to_string()
1457 }
1458 } else {
1459 let list = req
1460 .columns
1461 .iter()
1462 .map(|c| {
1463 schema
1464 .find(c)
1465 .map(|info| DatasetSchema::quote_ident(&info.name))
1466 })
1467 .collect::<Result<Vec<_>, _>>()?
1468 .join(", ");
1469 if req.distinct {
1470 format!("DISTINCT {list}")
1471 } else {
1472 list
1473 }
1474 };
1475
1476 let mut params = Params::new();
1477 let clauses: Vec<String> = req
1478 .predicates
1479 .iter()
1480 .map(|p| pred_to_sql(schema, p, &mut params))
1481 .collect::<Result<_, _>>()?;
1482
1483 let table = DatasetSchema::quote_ident(&schema.name);
1484 let where_clause = if clauses.is_empty() {
1485 String::new()
1486 } else {
1487 format!(" WHERE {}", clauses.join(" AND "))
1488 };
1489 let group_clause = match &agg_plan {
1490 Some(p) => format!(
1491 " GROUP BY {}",
1492 p.group_cols
1493 .iter()
1494 .map(|c| DatasetSchema::quote_ident(c))
1495 .collect::<Vec<_>>()
1496 .join(", "),
1497 ),
1498 None => String::new(),
1499 };
1500 let order_clause = match req.order_by_sql(schema, agg_plan.as_ref())? {
1501 Some(s) => format!(" ORDER BY {s}"),
1502 None => String::new(),
1503 };
1504 let sql =
1505 format!("SELECT {cols} FROM {table}{where_clause}{group_clause}{order_clause}{suffix}");
1506 Ok((sql, params.into_values()))
1507}
1508
1509fn build_count_sql(
1510 schema: &DatasetSchema,
1511 predicates: &[Predicate],
1512) -> Result<(String, Vec<ScalarValue>), AppError> {
1513 let mut params = Params::new();
1514 let clauses: Vec<String> = predicates
1515 .iter()
1516 .map(|p| pred_to_sql(schema, p, &mut params))
1517 .collect::<Result<_, _>>()?;
1518 let table = DatasetSchema::quote_ident(&schema.name);
1519 let where_clause = if clauses.is_empty() {
1520 String::new()
1521 } else {
1522 format!(" WHERE {}", clauses.join(" AND "))
1523 };
1524 let sql = format!("SELECT COUNT(*) FROM {table}{where_clause}");
1525 Ok((sql, params.into_values()))
1526}
1527
1528fn pred_to_sql(
1529 schema: &DatasetSchema,
1530 pred: &Predicate,
1531 params: &mut Params,
1532) -> Result<String, AppError> {
1533 let info = schema.find(&pred.col)?;
1534 let col = DatasetSchema::quote_ident(&info.name);
1535
1536 match pred.op.as_str() {
1537 "is_null" => return Ok(format!("{col} IS NULL")),
1538 "is_not_null" => return Ok(format!("{col} IS NOT NULL")),
1539 _ => {}
1540 }
1541
1542 let val = pred
1543 .val
1544 .as_ref()
1545 .ok_or_else(|| AppError::InvalidValue(format!("'{}' requires a value", pred.op)))?;
1546
1547 if pred.op == "in" {
1548 let items = val
1549 .as_array()
1550 .filter(|a| !a.is_empty())
1551 .ok_or_else(|| AppError::InvalidValue("'in' needs a non-empty array".into()))?;
1552 let placeholders: Vec<String> = items
1553 .iter()
1554 .map(|item| Ok(params.bind(json_to_scalar(item)?)))
1555 .collect::<Result<_, AppError>>()?;
1556 return Ok(format!("{col} IN ({})", placeholders.join(", ")));
1557 }
1558
1559 let sql_op = match pred.op.as_str() {
1560 "eq" => "=",
1561 "neq" => "!=",
1562 "gt" => ">",
1563 "gte" => ">=",
1564 "lt" => "<",
1565 "lte" => "<=",
1566 "like" => "LIKE",
1567 "ilike" => "ILIKE",
1568 other => return Err(AppError::UnknownOperator(other.into())),
1569 };
1570 let placeholder = params.bind(json_to_scalar(val)?);
1571 Ok(format!("{col} {sql_op} {placeholder}"))
1572}
1573
1574fn json_to_scalar(val: &JsonValue) -> Result<ScalarValue, AppError> {
1578 match val {
1579 JsonValue::String(s) => Ok(ScalarValue::Utf8(Some(s.clone()))),
1580 JsonValue::Bool(b) => Ok(ScalarValue::Boolean(Some(*b))),
1581 JsonValue::Null => Ok(ScalarValue::Null),
1582 JsonValue::Number(n) => {
1583 if let Some(i) = n.as_i64() {
1584 Ok(ScalarValue::Int64(Some(i)))
1585 } else if let Some(u) = n.as_u64() {
1586 Ok(ScalarValue::UInt64(Some(u)))
1587 } else if let Some(f) = n.as_f64() {
1588 Ok(ScalarValue::Float64(Some(f)))
1589 } else {
1590 Err(AppError::InvalidValue(
1591 "unsupported numeric literal in predicate".into(),
1592 ))
1593 }
1594 }
1595 _ => Err(AppError::InvalidValue(
1596 "unsupported literal type in predicate".into(),
1597 )),
1598 }
1599}
1600
1601fn json_index_key(val: &JsonValue) -> Option<String> {
1606 match val {
1607 JsonValue::String(s) => Some(s.clone()),
1608 JsonValue::Number(n) => Some(n.to_string()),
1609 JsonValue::Bool(b) => Some(b.to_string()),
1610 _ => None,
1611 }
1612}
1613
1614fn intersect_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1615 let mut out = Vec::new();
1616 let (mut i, mut j) = (0, 0);
1617 while i < a.len() && j < b.len() {
1618 match a[i].cmp(&b[j]) {
1619 Ordering::Equal => {
1620 out.push(a[i]);
1621 i += 1;
1622 j += 1;
1623 }
1624 Ordering::Less => i += 1,
1625 Ordering::Greater => j += 1,
1626 }
1627 }
1628 out
1629}
1630
1631fn union_sorted(a: &[u32], b: &[u32]) -> Vec<u32> {
1632 let mut out = Vec::with_capacity(a.len() + b.len());
1633 let (mut i, mut j) = (0, 0);
1634 while i < a.len() && j < b.len() {
1635 match a[i].cmp(&b[j]) {
1636 Ordering::Less => {
1637 out.push(a[i]);
1638 i += 1;
1639 }
1640 Ordering::Greater => {
1641 out.push(b[j]);
1642 j += 1;
1643 }
1644 Ordering::Equal => {
1645 out.push(a[i]);
1646 i += 1;
1647 j += 1;
1648 }
1649 }
1650 }
1651 out.extend_from_slice(&a[i..]);
1652 out.extend_from_slice(&b[j..]);
1653 out
1654}
1655
1656fn try_index<'a>(index: &'a EqIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
1657 if predicates.is_empty() || index.is_empty() {
1658 return None;
1659 }
1660
1661 if let [pred] = predicates
1664 && pred.op.as_str() == "eq"
1665 {
1666 let col_lower = pred.col.to_lowercase();
1667 let col_map = index.get(&col_lower)?;
1668 let key = json_index_key(pred.val.as_ref()?)?;
1669 return Some(match col_map.get(&key) {
1670 Some(rows) => Cow::Borrowed(rows.as_slice()),
1671 None => Cow::Owned(Vec::new()),
1672 });
1673 }
1674
1675 let mut result: Option<Vec<u32>> = None;
1676 for pred in predicates {
1677 let col_lower = pred.col.to_lowercase();
1678 let col_map = index.get(&col_lower)?;
1679
1680 let rows: Vec<u32> = match pred.op.as_str() {
1681 "eq" => {
1682 let key = json_index_key(pred.val.as_ref()?)?;
1683 col_map.get(&key).cloned().unwrap_or_default()
1684 }
1685 "in" => {
1686 let items = pred.val.as_ref()?.as_array()?;
1687 let mut merged: Vec<u32> = Vec::new();
1688 for item in items {
1689 if let Some(r) = col_map.get(&json_index_key(item)?) {
1690 merged = union_sorted(&merged, r);
1691 }
1692 }
1693 merged
1694 }
1695 _ => return None,
1696 };
1697
1698 result = Some(match result {
1699 None => rows,
1700 Some(r) => intersect_sorted(&r, &rows),
1701 });
1702 }
1703 result.map(Cow::Owned)
1704}
1705
1706#[doc(hidden)]
1710pub mod bench {
1711 use super::{EqIndex, FastMap, json_index_key, try_index};
1712 use datapress_core::models::Predicate;
1713 use serde_json::Value as JsonValue;
1714 use std::borrow::Cow;
1715
1716 pub struct BenchIndex(EqIndex);
1718
1719 pub fn single_bucket_index(col: &str, val: &JsonValue, rows: Vec<u32>) -> BenchIndex {
1723 let key = json_index_key(val).expect("benchable index key");
1724 let mut col_map: FastMap<String, Vec<u32>> = FastMap::default();
1725 col_map.insert(key, rows);
1726 let mut index: EqIndex = EqIndex::default();
1727 index.insert(col.to_string(), col_map);
1728 BenchIndex(index)
1729 }
1730
1731 pub fn lookup<'a>(idx: &'a BenchIndex, predicates: &[Predicate]) -> Option<Cow<'a, [u32]>> {
1733 try_index(&idx.0, predicates)
1734 }
1735
1736 pub fn lookup_cloning(idx: &BenchIndex, predicates: &[Predicate]) -> Option<Vec<u32>> {
1741 let [pred] = predicates else { return None };
1742 if pred.op.as_str() != "eq" {
1743 return None;
1744 }
1745 let col_lower = pred.col.to_lowercase();
1746 let col_map = idx.0.get(&col_lower)?;
1747 let key = json_index_key(pred.val.as_ref()?)?;
1748 Some(col_map.get(&key).cloned().unwrap_or_default())
1749 }
1750}
1751
1752fn slice_global(
1755 chunks: &[RecordBatch],
1756 schema: &Arc<Schema>,
1757 offset: usize,
1758 limit: usize,
1759) -> Result<RecordBatch, AppError> {
1760 if limit == 0 || chunks.is_empty() {
1761 return Ok(RecordBatch::new_empty(schema.clone()));
1762 }
1763 let mut out = Vec::new();
1764 let mut to_skip = offset;
1765 let mut remaining = limit;
1766 for b in chunks {
1767 if remaining == 0 {
1768 break;
1769 }
1770 let n = b.num_rows();
1771 if to_skip >= n {
1772 to_skip -= n;
1773 continue;
1774 }
1775 let take = remaining.min(n - to_skip);
1776 out.push(b.slice(to_skip, take));
1777 to_skip = 0;
1778 remaining -= take;
1779 }
1780 if out.is_empty() {
1781 return Ok(RecordBatch::new_empty(schema.clone()));
1782 }
1783 compute::concat_batches(schema, out.iter()).map_err(AppError::from)
1784}
1785
1786fn take_page(
1791 chunks: &[RecordBatch],
1792 schema: &Arc<Schema>,
1793 rows: &[u32],
1794 offset: usize,
1795 limit: usize,
1796) -> Result<RecordBatch, AppError> {
1797 let start = offset.min(rows.len());
1798 let len = limit.min(rows.len() - start);
1799 if len == 0 || chunks.is_empty() {
1800 return Ok(RecordBatch::new_empty(schema.clone()));
1801 }
1802
1803 let mut offsets: Vec<u32> = Vec::with_capacity(chunks.len() + 1);
1806 let mut acc: u32 = 0;
1807 offsets.push(0);
1808 for b in chunks {
1809 acc = acc
1810 .checked_add(b.num_rows() as u32)
1811 .expect("row count exceeds u32::MAX");
1812 offsets.push(acc);
1813 }
1814
1815 let mut buckets: Vec<Vec<(u32, u32)>> = (0..chunks.len()).map(|_| Vec::new()).collect();
1818 for (out_pos, &gid) in rows[start..start + len].iter().enumerate() {
1819 let bi = offsets.partition_point(|&x| x <= gid).saturating_sub(1);
1820 let local = gid - offsets[bi];
1821 buckets[bi].push((out_pos as u32, local));
1822 }
1823
1824 let mut takens: Vec<RecordBatch> = Vec::new();
1826 let mut dest: Vec<u32> = Vec::with_capacity(len);
1827 for (bi, bucket) in buckets.iter().enumerate() {
1828 if bucket.is_empty() {
1829 continue;
1830 }
1831 let idx = UInt32Array::from(bucket.iter().map(|(_, l)| *l).collect::<Vec<u32>>());
1832 let cols: Vec<ArrayRef> = chunks[bi]
1833 .columns()
1834 .iter()
1835 .map(|c| {
1836 arrow::compute::take(c.as_ref(), &idx, None::<arrow::compute::TakeOptions>)
1837 .map_err(AppError::from)
1838 })
1839 .collect::<Result<_, _>>()?;
1840 takens.push(RecordBatch::try_new(chunks[bi].schema(), cols)?);
1841 dest.extend(bucket.iter().map(|(out_pos, _)| *out_pos));
1842 }
1843
1844 let stitched = compute::concat_batches(schema, takens.iter())?;
1846 let mut inv = vec![0u32; len];
1847 for (i, &d) in dest.iter().enumerate() {
1848 inv[d as usize] = i as u32;
1849 }
1850 let perm = UInt32Array::from(inv);
1851 let cols: Vec<ArrayRef> = stitched
1852 .columns()
1853 .iter()
1854 .map(|c| {
1855 arrow::compute::take(c.as_ref(), &perm, None::<arrow::compute::TakeOptions>)
1856 .map_err(AppError::from)
1857 })
1858 .collect::<Result<_, _>>()?;
1859 RecordBatch::try_new(stitched.schema(), cols).map_err(AppError::from)
1860}
1861
1862fn build_eq_index_with_policy(chunks: &[RecordBatch], cfg: &IndexConfig) -> EqIndex {
1866 use rayon::prelude::*;
1867
1868 if cfg.mode == IndexMode::None || chunks.is_empty() {
1869 return EqIndex::default();
1870 }
1871
1872 let allow: Option<HashMap<String, ()>> = if cfg.mode == IndexMode::List {
1873 Some(cfg.columns.iter().map(|c| (c.to_lowercase(), ())).collect())
1874 } else {
1875 None
1876 };
1877
1878 let max_card = if cfg.mode == IndexMode::Auto {
1879 Some(cfg.max_cardinality)
1880 } else {
1881 None
1882 };
1883
1884 let mut batch_offsets: Vec<u32> = Vec::with_capacity(chunks.len());
1886 let mut acc: u32 = 0;
1887 for b in chunks {
1888 batch_offsets.push(acc);
1889 acc = acc
1890 .checked_add(b.num_rows() as u32)
1891 .expect("row count exceeds u32::MAX");
1892 }
1893
1894 let schema = chunks[0].schema();
1895
1896 schema
1897 .fields()
1898 .par_iter()
1899 .enumerate()
1900 .filter_map(|(ci, field)| {
1901 let col_lower = field.name().to_lowercase();
1902 if let Some(a) = &allow
1903 && !a.contains_key(&col_lower)
1904 {
1905 return None;
1906 }
1907
1908 let dtype = field.data_type();
1911 let dict_utf8 = matches!(dtype,
1912 DataType::Dictionary(k, v)
1913 if matches!(k.as_ref(), DataType::Int32)
1914 && matches!(v.as_ref(), DataType::Utf8));
1915 match dtype {
1916 DataType::Utf8
1917 | DataType::Utf8View
1918 | DataType::Boolean
1919 | DataType::Int8
1920 | DataType::Int16
1921 | DataType::Int32
1922 | DataType::Int64 => {}
1923 _ if dict_utf8 => {}
1924 _ => return None,
1925 }
1926
1927 let mut map: FastMap<String, Vec<u32>> = FastMap::default();
1928
1929 for (bi, batch) in chunks.iter().enumerate() {
1930 let base = batch_offsets[bi];
1931 let col = batch.column(ci);
1932
1933 macro_rules! index_col {
1934 ($arr_ty:ty) => {{
1935 let arr = col.as_any().downcast_ref::<$arr_ty>()?;
1936 for row in 0..arr.len() {
1937 if arr.is_null(row) {
1938 continue;
1939 }
1940 let key = arr.value(row).to_string();
1941 let gid = base + row as u32;
1942 if let Some(v) = map.get_mut(&key) {
1943 v.push(gid);
1944 } else {
1945 if let Some(mc) = max_card {
1946 if map.len() >= mc {
1947 return None;
1948 }
1949 }
1950 map.insert(key, vec![gid]);
1951 }
1952 }
1953 }};
1954 }
1955
1956 if dict_utf8 {
1957 let arr = col
1964 .as_any()
1965 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>(
1966 )?;
1967 let keys = arr.keys();
1968 let values = arr.values().as_any().downcast_ref::<StringArray>()?;
1969 for row in 0..arr.len() {
1970 if arr.is_null(row) {
1971 continue;
1972 }
1973 let k = keys.value(row) as usize;
1974 let s = values.value(k);
1975 let gid = base + row as u32;
1976 if let Some(v) = map.get_mut(s) {
1977 v.push(gid);
1978 } else {
1979 if let Some(mc) = max_card
1980 && map.len() >= mc
1981 {
1982 return None;
1983 }
1984 map.insert(s.to_string(), vec![gid]);
1985 }
1986 }
1987 } else {
1988 match dtype {
1989 DataType::Utf8 => index_col!(StringArray),
1990 DataType::Utf8View => index_col!(StringViewArray),
1991 DataType::Boolean => index_col!(BooleanArray),
1992 DataType::Int8 => index_col!(Int8Array),
1993 DataType::Int16 => index_col!(Int16Array),
1994 DataType::Int32 => index_col!(Int32Array),
1995 DataType::Int64 => index_col!(Int64Array),
1996 _ => unreachable!(),
1997 }
1998 }
1999 }
2000
2001 Some((col_lower, map))
2002 })
2003 .collect()
2004}
2005
2006fn writable_inline(dt: &DataType) -> bool {
2019 match dt {
2020 DataType::Utf8
2021 | DataType::LargeUtf8
2022 | DataType::Utf8View
2023 | DataType::Boolean
2024 | DataType::Int8
2025 | DataType::Int16
2026 | DataType::Int32
2027 | DataType::Int64
2028 | DataType::UInt8
2029 | DataType::UInt16
2030 | DataType::UInt32
2031 | DataType::UInt64
2032 | DataType::Float32
2033 | DataType::Float64
2034 | DataType::Decimal128(_, _)
2035 | DataType::Decimal256(_, _) => true,
2036 DataType::Dictionary(k, v)
2037 if matches!(k.as_ref(), DataType::Int32) && matches!(v.as_ref(), DataType::Utf8) =>
2038 {
2039 true
2040 }
2041 _ => false,
2042 }
2043}
2044
2045fn cast_for_serialize(batch: &RecordBatch) -> Result<RecordBatch, AppError> {
2051 let schema = batch.schema();
2052 let to_cast: Vec<usize> = schema
2053 .fields()
2054 .iter()
2055 .enumerate()
2056 .filter_map(|(i, f)| {
2057 if writable_inline(f.data_type()) {
2058 None
2059 } else {
2060 Some(i)
2061 }
2062 })
2063 .collect();
2064 if to_cast.is_empty() {
2065 return Ok(batch.clone());
2066 }
2067 let new_fields: Vec<Field> = schema
2068 .fields()
2069 .iter()
2070 .enumerate()
2071 .map(|(i, f)| {
2072 if to_cast.contains(&i) {
2073 Field::new(f.name(), DataType::Utf8, f.is_nullable())
2074 } else {
2075 f.as_ref().clone()
2076 }
2077 })
2078 .collect();
2079 let new_schema = Arc::new(Schema::new(new_fields));
2080 let cols: Vec<ArrayRef> = batch
2081 .columns()
2082 .iter()
2083 .enumerate()
2084 .map(|(i, c)| {
2085 if to_cast.contains(&i) {
2086 compute::cast(c.as_ref(), &DataType::Utf8).map_err(AppError::from)
2087 } else {
2088 Ok(c.clone())
2089 }
2090 })
2091 .collect::<Result<_, _>>()?;
2092 RecordBatch::try_new(new_schema, cols).map_err(AppError::from)
2093}
2094
2095#[allow(dead_code)]
2101#[derive(Clone, Copy)]
2102enum CmpOp {
2103 Eq,
2104 Neq,
2105 Gt,
2106 Gte,
2107 Lt,
2108 Lte,
2109 Like,
2110 ILike,
2111}
2112
2113#[allow(dead_code)]
2114fn eq_str(col: &ArrayRef, val: &str) -> Result<BooleanArray, AppError> {
2115 let arr = col
2116 .as_any()
2117 .downcast_ref::<StringArray>()
2118 .ok_or_else(|| AppError::InvalidValue("equality: column is not a string".into()))?;
2119 let s = Scalar::new(StringArray::from(vec![val]));
2120 Ok(eq(arr, &s)?)
2121}
2122
2123#[allow(dead_code)]
2124fn cmp_scalar(col: &ArrayRef, op: CmpOp, val: &JsonValue) -> Result<BooleanArray, AppError> {
2125 macro_rules! num_cmp {
2126 ($arr_type:ty, $cast:ty) => {{
2127 let n = val
2128 .as_f64()
2129 .ok_or_else(|| AppError::InvalidValue("expected number".into()))?
2130 as $cast;
2131 let arr = col.as_any().downcast_ref::<$arr_type>().unwrap();
2132 let s = Scalar::new(<$arr_type>::from(vec![n]));
2133 Ok(match op {
2134 CmpOp::Eq => eq(arr, &s)?,
2135 CmpOp::Neq => neq(arr, &s)?,
2136 CmpOp::Gt => gt(arr, &s)?,
2137 CmpOp::Gte => gt_eq(arr, &s)?,
2138 CmpOp::Lt => lt(arr, &s)?,
2139 CmpOp::Lte => lt_eq(arr, &s)?,
2140 CmpOp::Like | CmpOp::ILike => {
2141 return Err(AppError::InvalidValue(
2142 "LIKE requires a string column".into(),
2143 ));
2144 }
2145 })
2146 }};
2147 }
2148 match col.data_type() {
2149 DataType::Utf8 => {
2150 let s = val
2151 .as_str()
2152 .ok_or_else(|| AppError::InvalidValue("expected string".into()))?;
2153 let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
2154 let sc = Scalar::new(StringArray::from(vec![s]));
2155 Ok(match op {
2156 CmpOp::Eq => eq(arr, &sc)?,
2157 CmpOp::Neq => neq(arr, &sc)?,
2158 CmpOp::Gt => gt(arr, &sc)?,
2159 CmpOp::Gte => gt_eq(arr, &sc)?,
2160 CmpOp::Lt => lt(arr, &sc)?,
2161 CmpOp::Lte => lt_eq(arr, &sc)?,
2162 CmpOp::Like => compute::like(arr, &sc)?,
2163 CmpOp::ILike => compute::ilike(arr, &sc)?,
2164 })
2165 }
2166 DataType::Int8 => num_cmp!(Int8Array, i8),
2167 DataType::Int16 => num_cmp!(Int16Array, i16),
2168 DataType::Int32 => num_cmp!(Int32Array, i32),
2169 DataType::Int64 => num_cmp!(Int64Array, i64),
2170 DataType::Float32 => num_cmp!(Float32Array, f32),
2171 DataType::Float64 => num_cmp!(Float64Array, f64),
2172 dt => Err(AppError::InvalidValue(format!(
2173 "unsupported type for comparison: {dt:?}"
2174 ))),
2175 }
2176}
2177
2178pub fn serialize(batch: &RecordBatch) -> Result<String, AppError> {
2183 let batch = cast_for_serialize(batch)?;
2188 let schema = batch.schema();
2189 let n_rows = batch.num_rows();
2190
2191 let keys: Vec<Vec<u8>> = schema
2192 .fields()
2193 .iter()
2194 .map(|f| {
2195 let mut k = Vec::with_capacity(f.name().len() + 3);
2196 k.push(b'"');
2197 k.extend_from_slice(f.name().as_bytes());
2198 k.extend_from_slice(b"\":");
2199 k
2200 })
2201 .collect();
2202
2203 let encoders: Vec<ColEnc> = batch
2208 .columns()
2209 .iter()
2210 .map(|c| ColEnc::new(c.as_ref()))
2211 .collect();
2212
2213 let mut buf: Vec<u8> = Vec::with_capacity(n_rows.max(1) * 300);
2214 let mut itoa_buf = itoa::Buffer::new();
2215 let mut ryu_buf = ryu::Buffer::new();
2216 buf.push(b'[');
2217
2218 for row in 0..n_rows {
2219 if row > 0 {
2220 buf.push(b',');
2221 }
2222 buf.push(b'{');
2223 for (i, (key, enc)) in keys.iter().zip(encoders.iter()).enumerate() {
2224 if i > 0 {
2225 buf.push(b',');
2226 }
2227 buf.extend_from_slice(key);
2228 enc.write(&mut buf, row, &mut itoa_buf, &mut ryu_buf);
2229 }
2230 buf.push(b'}');
2231 }
2232
2233 buf.push(b']');
2234 Ok(unsafe { String::from_utf8_unchecked(buf) })
2235}
2236
2237enum ColEnc<'a> {
2242 Utf8(&'a StringArray),
2243 LargeUtf8(&'a LargeStringArray),
2244 Utf8View(&'a StringViewArray),
2245 DictI32Utf8(
2248 &'a arrow::array::DictionaryArray<arrow::datatypes::Int32Type>,
2249 &'a StringArray,
2250 ),
2251 Bool(&'a BooleanArray),
2252 I8(&'a Int8Array),
2253 I16(&'a Int16Array),
2254 I32(&'a Int32Array),
2255 I64(&'a Int64Array),
2256 U8(&'a UInt8Array),
2257 U16(&'a UInt16Array),
2258 U32(&'a UInt32Array),
2259 U64(&'a UInt64Array),
2260 Dec128(&'a Decimal128Array),
2261 Dec256(&'a Decimal256Array),
2262 F32(&'a Float32Array),
2263 F64(&'a Float64Array),
2264 Other(&'a dyn Array),
2266}
2267
2268impl<'a> ColEnc<'a> {
2269 fn new(col: &'a dyn Array) -> ColEnc<'a> {
2270 macro_rules! dc {
2271 ($t:ty) => {
2272 col.as_any().downcast_ref::<$t>().unwrap()
2273 };
2274 }
2275 match col.data_type() {
2276 DataType::Utf8 => ColEnc::Utf8(dc!(StringArray)),
2277 DataType::LargeUtf8 => ColEnc::LargeUtf8(dc!(LargeStringArray)),
2278 DataType::Utf8View => ColEnc::Utf8View(dc!(StringViewArray)),
2279 DataType::Dictionary(key, value)
2280 if matches!(key.as_ref(), DataType::Int32)
2281 && matches!(value.as_ref(), DataType::Utf8) =>
2282 {
2283 let dict = dc!(arrow::array::DictionaryArray<arrow::datatypes::Int32Type>);
2284 let values = dict
2285 .values()
2286 .as_any()
2287 .downcast_ref::<StringArray>()
2288 .unwrap();
2289 ColEnc::DictI32Utf8(dict, values)
2290 }
2291 DataType::Boolean => ColEnc::Bool(dc!(BooleanArray)),
2292 DataType::Int8 => ColEnc::I8(dc!(Int8Array)),
2293 DataType::Int16 => ColEnc::I16(dc!(Int16Array)),
2294 DataType::Int32 => ColEnc::I32(dc!(Int32Array)),
2295 DataType::Int64 => ColEnc::I64(dc!(Int64Array)),
2296 DataType::UInt8 => ColEnc::U8(dc!(UInt8Array)),
2297 DataType::UInt16 => ColEnc::U16(dc!(UInt16Array)),
2298 DataType::UInt32 => ColEnc::U32(dc!(UInt32Array)),
2299 DataType::UInt64 => ColEnc::U64(dc!(UInt64Array)),
2300 DataType::Decimal128(_, _) => ColEnc::Dec128(dc!(Decimal128Array)),
2301 DataType::Decimal256(_, _) => ColEnc::Dec256(dc!(Decimal256Array)),
2302 DataType::Float32 => ColEnc::F32(dc!(Float32Array)),
2303 DataType::Float64 => ColEnc::F64(dc!(Float64Array)),
2304 _ => ColEnc::Other(col),
2305 }
2306 }
2307
2308 #[inline]
2309 fn write(
2310 &self,
2311 buf: &mut Vec<u8>,
2312 row: usize,
2313 itoa_buf: &mut itoa::Buffer,
2314 ryu_buf: &mut ryu::Buffer,
2315 ) {
2316 macro_rules! int {
2317 ($arr:expr) => {{
2318 if $arr.is_null(row) {
2319 buf.extend_from_slice(b"null");
2320 } else {
2321 buf.extend_from_slice(itoa_buf.format($arr.value(row)).as_bytes());
2322 }
2323 }};
2324 }
2325 match self {
2326 ColEnc::Utf8(a) => {
2327 if a.is_null(row) {
2328 buf.extend_from_slice(b"null");
2329 } else {
2330 write_str(buf, a.value(row));
2331 }
2332 }
2333 ColEnc::LargeUtf8(a) => {
2334 if a.is_null(row) {
2335 buf.extend_from_slice(b"null");
2336 } else {
2337 write_str(buf, a.value(row));
2338 }
2339 }
2340 ColEnc::Utf8View(a) => {
2341 if a.is_null(row) {
2342 buf.extend_from_slice(b"null");
2343 } else {
2344 write_str(buf, a.value(row));
2345 }
2346 }
2347 ColEnc::DictI32Utf8(keys, values) => {
2348 if keys.is_null(row) {
2349 buf.extend_from_slice(b"null");
2350 } else {
2351 let k = keys.keys().value(row) as usize;
2352 write_str(buf, values.value(k));
2353 }
2354 }
2355 ColEnc::Bool(a) => {
2356 if a.is_null(row) {
2357 buf.extend_from_slice(b"null");
2358 } else {
2359 buf.extend_from_slice(if a.value(row) { b"true" } else { b"false" });
2360 }
2361 }
2362 ColEnc::I8(a) => int!(a),
2363 ColEnc::I16(a) => int!(a),
2364 ColEnc::I32(a) => int!(a),
2365 ColEnc::I64(a) => int!(a),
2366 ColEnc::U8(a) => int!(a),
2367 ColEnc::U16(a) => int!(a),
2368 ColEnc::U32(a) => int!(a),
2369 ColEnc::U64(a) => int!(a),
2370 ColEnc::Dec128(a) => {
2371 if a.is_null(row) {
2372 buf.extend_from_slice(b"null");
2373 } else {
2374 write_str(buf, &a.value_as_string(row));
2375 }
2376 }
2377 ColEnc::Dec256(a) => {
2378 if a.is_null(row) {
2379 buf.extend_from_slice(b"null");
2380 } else {
2381 write_str(buf, &a.value_as_string(row));
2382 }
2383 }
2384 ColEnc::F32(a) => {
2385 if a.is_null(row) {
2386 buf.extend_from_slice(b"null");
2387 } else {
2388 let v = a.value(row);
2389 if v.is_finite() {
2390 buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2391 } else {
2392 buf.extend_from_slice(b"null");
2393 }
2394 }
2395 }
2396 ColEnc::F64(a) => {
2397 if a.is_null(row) {
2398 buf.extend_from_slice(b"null");
2399 } else {
2400 let v = a.value(row);
2401 if v.is_finite() {
2402 buf.extend_from_slice(ryu_buf.format_finite(v).as_bytes());
2403 } else {
2404 buf.extend_from_slice(b"null");
2405 }
2406 }
2407 }
2408 ColEnc::Other(col) => {
2409 if col.is_null(row) {
2410 buf.extend_from_slice(b"null");
2411 } else {
2412 write_value(buf, *col, row);
2413 }
2414 }
2415 }
2416 }
2417}
2418
2419#[inline]
2420fn write_value(buf: &mut Vec<u8>, col: &dyn Array, row: usize) {
2421 match col.data_type() {
2422 DataType::Utf8 => write_str(
2423 buf,
2424 col.as_any()
2425 .downcast_ref::<StringArray>()
2426 .unwrap()
2427 .value(row),
2428 ),
2429 DataType::LargeUtf8 => write_str(
2430 buf,
2431 col.as_any()
2432 .downcast_ref::<LargeStringArray>()
2433 .unwrap()
2434 .value(row),
2435 ),
2436 DataType::Utf8View => write_str(
2437 buf,
2438 col.as_any()
2439 .downcast_ref::<StringViewArray>()
2440 .unwrap()
2441 .value(row),
2442 ),
2443 DataType::Dictionary(key, value)
2444 if matches!(key.as_ref(), DataType::Int32)
2445 && matches!(value.as_ref(), DataType::Utf8) =>
2446 {
2447 let dict = col
2448 .as_any()
2449 .downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
2450 .unwrap();
2451 let keys = dict.keys();
2452 let values = dict
2453 .values()
2454 .as_any()
2455 .downcast_ref::<StringArray>()
2456 .unwrap();
2457 let k = keys.value(row) as usize;
2458 write_str(buf, values.value(k));
2459 }
2460 DataType::Boolean => {
2461 let v = col
2462 .as_any()
2463 .downcast_ref::<BooleanArray>()
2464 .unwrap()
2465 .value(row);
2466 buf.extend_from_slice(if v { b"true" } else { b"false" });
2467 }
2468 DataType::Int8 => {
2469 let mut b = itoa::Buffer::new();
2470 buf.extend_from_slice(
2471 b.format(col.as_any().downcast_ref::<Int8Array>().unwrap().value(row))
2472 .as_bytes(),
2473 );
2474 }
2475 DataType::Int16 => {
2476 let mut b = itoa::Buffer::new();
2477 buf.extend_from_slice(
2478 b.format(
2479 col.as_any()
2480 .downcast_ref::<Int16Array>()
2481 .unwrap()
2482 .value(row),
2483 )
2484 .as_bytes(),
2485 );
2486 }
2487 DataType::Int32 => {
2488 let mut b = itoa::Buffer::new();
2489 buf.extend_from_slice(
2490 b.format(
2491 col.as_any()
2492 .downcast_ref::<Int32Array>()
2493 .unwrap()
2494 .value(row),
2495 )
2496 .as_bytes(),
2497 );
2498 }
2499 DataType::Int64 => {
2500 let mut b = itoa::Buffer::new();
2501 buf.extend_from_slice(
2502 b.format(
2503 col.as_any()
2504 .downcast_ref::<Int64Array>()
2505 .unwrap()
2506 .value(row),
2507 )
2508 .as_bytes(),
2509 );
2510 }
2511 DataType::UInt8 => {
2512 let mut b = itoa::Buffer::new();
2513 buf.extend_from_slice(
2514 b.format(
2515 col.as_any()
2516 .downcast_ref::<UInt8Array>()
2517 .unwrap()
2518 .value(row),
2519 )
2520 .as_bytes(),
2521 );
2522 }
2523 DataType::UInt16 => {
2524 let mut b = itoa::Buffer::new();
2525 buf.extend_from_slice(
2526 b.format(
2527 col.as_any()
2528 .downcast_ref::<UInt16Array>()
2529 .unwrap()
2530 .value(row),
2531 )
2532 .as_bytes(),
2533 );
2534 }
2535 DataType::UInt32 => {
2536 let mut b = itoa::Buffer::new();
2537 buf.extend_from_slice(
2538 b.format(
2539 col.as_any()
2540 .downcast_ref::<UInt32Array>()
2541 .unwrap()
2542 .value(row),
2543 )
2544 .as_bytes(),
2545 );
2546 }
2547 DataType::UInt64 => {
2548 let mut b = itoa::Buffer::new();
2549 buf.extend_from_slice(
2550 b.format(
2551 col.as_any()
2552 .downcast_ref::<UInt64Array>()
2553 .unwrap()
2554 .value(row),
2555 )
2556 .as_bytes(),
2557 );
2558 }
2559 DataType::Decimal128(_, _) => {
2560 let arr = col.as_any().downcast_ref::<Decimal128Array>().unwrap();
2561 write_str(buf, &arr.value_as_string(row));
2562 }
2563 DataType::Decimal256(_, _) => {
2564 let arr = col.as_any().downcast_ref::<Decimal256Array>().unwrap();
2565 write_str(buf, &arr.value_as_string(row));
2566 }
2567 DataType::Float32 => {
2568 let v = col
2569 .as_any()
2570 .downcast_ref::<Float32Array>()
2571 .unwrap()
2572 .value(row);
2573 if v.is_finite() {
2574 let mut b = ryu::Buffer::new();
2575 buf.extend_from_slice(b.format_finite(v).as_bytes());
2576 } else {
2577 buf.extend_from_slice(b"null");
2578 }
2579 }
2580 DataType::Float64 => {
2581 let v = col
2582 .as_any()
2583 .downcast_ref::<Float64Array>()
2584 .unwrap()
2585 .value(row);
2586 if v.is_finite() {
2587 let mut b = ryu::Buffer::new();
2588 buf.extend_from_slice(b.format_finite(v).as_bytes());
2589 } else {
2590 buf.extend_from_slice(b"null");
2591 }
2592 }
2593 other => write_str(buf, &format!("<unsupported dtype: {other:?}>")),
2598 }
2599}
2600
2601#[inline]
2602fn write_str(buf: &mut Vec<u8>, s: &str) {
2603 buf.push(b'"');
2604 for &byte in s.as_bytes() {
2605 match byte {
2606 b'"' => buf.extend_from_slice(b"\\\""),
2607 b'\\' => buf.extend_from_slice(b"\\\\"),
2608 b'\n' => buf.extend_from_slice(b"\\n"),
2609 b'\r' => buf.extend_from_slice(b"\\r"),
2610 b'\t' => buf.extend_from_slice(b"\\t"),
2611 0x00..=0x1f => {
2612 buf.extend_from_slice(b"\\u00");
2613 const HEX: &[u8] = b"0123456789abcdef";
2614 buf.push(HEX[(byte >> 4) as usize]);
2615 buf.push(HEX[(byte & 0xf) as usize]);
2616 }
2617 b => buf.push(b),
2618 }
2619 }
2620 buf.push(b'"');
2621}
2622
2623#[async_trait]
2628impl Backend for Store {
2629 fn names(&self) -> Vec<String> {
2630 Store::names(self)
2631 }
2632
2633 fn summary(&self, name: &str) -> Result<DatasetSummary, AppError> {
2634 let st = self.dataset(name)?;
2635 Ok(DatasetSummary {
2636 name: st.schema.name.clone(),
2637 columns: st.schema.columns.len(),
2638 rows: st.num_rows(),
2639 })
2640 }
2641
2642 fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError> {
2643 let st = self.dataset(name)?;
2644 Ok(Arc::new(st.schema.clone()))
2645 }
2646
2647 fn indexed_columns(&self, name: &str) -> Result<Vec<String>, AppError> {
2648 let st = self.dataset(name)?;
2649 let mut cols: Vec<String> = st
2652 .schema
2653 .columns
2654 .iter()
2655 .map(|c| c.name.clone())
2656 .filter(|n| st.index.contains_key(n))
2657 .collect();
2658 let mut extras: Vec<String> = st
2661 .index
2662 .keys()
2663 .filter(|n| !cols.iter().any(|c| c == *n))
2664 .cloned()
2665 .collect();
2666 extras.sort();
2667 cols.extend(extras);
2668 Ok(cols)
2669 }
2670
2671 async fn sample(&self, name: &str) -> Result<String, AppError> {
2672 Store::sample(self, name).await
2673 }
2674
2675 async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError> {
2676 Store::query(self, name, req).await
2677 }
2678
2679 async fn query_arrow(&self, name: &str, req: &QueryRequest) -> Result<Vec<u8>, AppError> {
2680 Store::query_arrow(self, name, req).await
2681 }
2682
2683 async fn query_arrow_stream(
2684 &self,
2685 name: &str,
2686 req: &QueryRequest,
2687 ) -> Result<ArrowIpcStream, AppError> {
2688 Store::query_arrow_stream(self, name, req).await
2689 }
2690
2691 async fn query_arrow_stream_all(
2692 &self,
2693 name: &str,
2694 req: &QueryRequest,
2695 ) -> Result<ArrowIpcStream, AppError> {
2696 Store::query_arrow_stream_all(self, name, req).await
2697 }
2698
2699 async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError> {
2700 Store::count(self, name, req).await
2701 }
2702
2703 async fn parquet(&self, name: &str) -> Result<bytes::Bytes, AppError> {
2704 Store::parquet(self, name).await
2705 }
2706
2707 async fn reload(&self, name: &str) -> Result<ReloadStats, AppError> {
2708 Store::reload(self, name).await
2709 }
2710}