1use crate::expression::compiler::compile;
2use crate::expression::compiler::config::CompilationConfig;
3use crate::expression::compiler::utils::ExprHelpers;
4use crate::task_graph::task::TaskCall;
5use std::borrow::Cow;
6
7use async_trait::async_trait;
8
9use datafusion_expr::{expr, lit, Expr};
10use std::collections::{HashMap, HashSet};
11use std::path::Path;
12use vegafusion_core::data::dataset::VegaFusionDataset;
13
14use crate::task_graph::timezone::RuntimeTzConfig;
15use crate::transform::pipeline::TransformPipelineUtils;
16use cfg_if::cfg_if;
17use datafusion::datasource::listing::ListingTableUrl;
18use datafusion::datasource::object_store::ObjectStoreUrl;
19use datafusion::execution::options::{ArrowReadOptions, ReadOptions};
20use datafusion::prelude::{CsvReadOptions, DataFrame, SessionContext};
21use datafusion_common::config::TableOptions;
22use datafusion_functions::expr_fn::make_date;
23use std::sync::Arc;
24
25use vegafusion_common::data::scalar::{ScalarValue, ScalarValueHelpers};
26use vegafusion_common::error::{Result, ResultWithContext, VegaFusionError};
27
28use vegafusion_core::proto::gen::tasks::data_url_task::Url;
29use vegafusion_core::proto::gen::tasks::scan_url_format;
30use vegafusion_core::proto::gen::tasks::scan_url_format::Parse;
31use vegafusion_core::proto::gen::tasks::{DataSourceTask, DataUrlTask, DataValuesTask};
32use vegafusion_core::proto::gen::transforms::TransformPipeline;
33use vegafusion_core::task_graph::task::{InputVariable, TaskDependencies};
34use vegafusion_core::task_graph::task_value::TaskValue;
35
36use crate::data::util::{DataFrameUtils, SessionContextUtils};
37use crate::transform::utils::str_to_timestamp;
38
39use object_store::ObjectStore;
40use vegafusion_common::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
41use vegafusion_common::column::flat_col;
42use vegafusion_common::data::table::VegaFusionTable;
43use vegafusion_common::data::ORDER_COL;
44use vegafusion_common::datatypes::{is_integer_datatype, is_string_datatype};
45use vegafusion_core::proto::gen::transforms::transform::TransformKind;
46use vegafusion_core::spec::visitors::extract_inline_dataset;
47
48#[cfg(feature = "s3")]
49use object_store::aws::AmazonS3Builder;
50
51#[cfg(feature = "http")]
52use object_store::{http::HttpBuilder, ClientOptions};
53
54#[cfg(feature = "fs")]
55use tokio::io::AsyncReadExt;
56
57#[cfg(feature = "parquet")]
58use {datafusion::prelude::ParquetReadOptions, vegafusion_common::error::ToExternalError};
59
60#[cfg(target_arch = "wasm32")]
61use object_store_wasm::HttpStore;
62
63pub fn build_compilation_config(
64 input_vars: &[InputVariable],
65 values: &[TaskValue],
66 tz_config: &Option<RuntimeTzConfig>,
67) -> CompilationConfig {
68 let mut signal_scope: HashMap<String, ScalarValue> = HashMap::new();
70 let mut data_scope: HashMap<String, VegaFusionTable> = HashMap::new();
71
72 for (input_var, input_val) in input_vars.iter().zip(values) {
73 match input_val {
74 TaskValue::Scalar(value) => {
75 signal_scope.insert(input_var.var.name.clone(), value.clone());
76 }
77 TaskValue::Table(table) => {
78 data_scope.insert(input_var.var.name.clone(), table.clone());
79 }
80 }
81 }
82
83 CompilationConfig {
86 signal_scope,
87 data_scope,
88 tz_config: *tz_config,
89 ..Default::default()
90 }
91}
92
93#[async_trait]
94impl TaskCall for DataUrlTask {
95 async fn eval(
96 &self,
97 values: &[TaskValue],
98 tz_config: &Option<RuntimeTzConfig>,
99 inline_datasets: HashMap<String, VegaFusionDataset>,
100 ctx: Arc<SessionContext>,
101 ) -> Result<(TaskValue, Vec<TaskValue>)> {
102 let config = build_compilation_config(&self.input_vars(), values, tz_config);
104
105 let url = match self.url.as_ref().unwrap() {
107 Url::String(url) => url.clone(),
108 Url::Expr(expr) => {
109 let compiled = compile(expr, &config, None)?;
110 let url_scalar = compiled.eval_to_scalar()?;
111 url_scalar.to_scalar_string()?
112 }
113 };
114
115 let url_parts: Vec<&str> = url.splitn(2, '#').collect();
117 let url = url_parts.first().cloned().unwrap_or(&url).to_string();
118
119 let url = check_builtin_dataset(url);
121
122 let parse = self.format_type.as_ref().and_then(|fmt| fmt.parse.clone());
124 let file_type = self.format_type.as_ref().and_then(|fmt| fmt.r#type.clone());
125
126 let file_type = if file_type == Some("json".to_string()) {
129 None
130 } else {
131 file_type.as_deref()
132 };
133
134 let df = if let Some(inline_name) = extract_inline_dataset(&url) {
135 let inline_name = inline_name.trim().to_string();
136 if let Some(inline_dataset) = inline_datasets.get(&inline_name) {
137 match inline_dataset {
138 VegaFusionDataset::Table { table, .. } => {
139 let table = table.clone().with_ordering()?;
140 ctx.vegafusion_table(table).await?
141 }
142 VegaFusionDataset::Plan { plan } => {
143 ctx.execute_logical_plan(plan.clone()).await?
144 }
145 }
146 } else if let Ok(df) = ctx.table(&inline_name).await {
147 df
148 } else {
149 return Err(VegaFusionError::internal(format!(
150 "No inline dataset named {inline_name}"
151 )));
152 }
153 } else if file_type == Some("csv") || (file_type.is_none() && url.ends_with(".csv")) {
154 read_csv(&url, &parse, ctx, false).await?
155 } else if file_type == Some("tsv") || (file_type.is_none() && url.ends_with(".tsv")) {
156 read_csv(&url, &parse, ctx, true).await?
157 } else if file_type == Some("json") || (file_type.is_none() && url.ends_with(".json")) {
158 read_json(&url, ctx).await?
159 } else if file_type == Some("arrow")
160 || (file_type.is_none() && (url.ends_with(".arrow") || url.ends_with(".feather")))
161 {
162 read_arrow(&url, ctx).await?
163 } else if file_type == Some("parquet")
164 || (file_type.is_none() && (url.ends_with(".parquet")))
165 {
166 cfg_if! {
167 if #[cfg(any(feature = "parquet"))] {
168 read_parquet(&url, ctx).await?
169 } else {
170 return Err(VegaFusionError::internal(format!(
171 "Enable parquet support by enabling the `parquet` feature flag"
172 )))
173 }
174 }
175 } else {
176 return Err(VegaFusionError::internal(format!(
177 "Invalid url file extension {url}"
178 )));
179 };
180
181 let df = if df.schema().inner().column_with_name(ORDER_COL).is_none() {
183 df.with_index(ORDER_COL).await?
184 } else {
185 df
186 };
187
188 let df = pre_process_column_types(df).await?;
190
191 let df = process_datetimes(&parse, df, &config.tz_config).await?;
193
194 eval_sql_df(df, &self.pipeline, &config).await
195 }
196}
197
198async fn eval_sql_df(
199 sql_df: DataFrame,
200 pipeline: &Option<TransformPipeline>,
201 config: &CompilationConfig,
202) -> Result<(TaskValue, Vec<TaskValue>)> {
203 let (transformed_df, output_values) = if pipeline
205 .as_ref()
206 .map(|p| !p.transforms.is_empty())
207 .unwrap_or(false)
208 {
209 let pipeline = pipeline.as_ref().unwrap();
210 pipeline.eval_sql(sql_df, config).await?
211 } else {
212 (
214 sql_df.collect_to_table().await?.without_ordering()?,
215 Vec::new(),
216 )
217 };
218
219 let table_value = TaskValue::Table(transformed_df.without_ordering()?);
220
221 Ok((table_value, output_values))
222}
223
224lazy_static! {
225 static ref BUILT_IN_DATASETS: HashSet<&'static str> = vec![
226 "7zip.png",
227 "airports.csv",
228 "annual-precip.json",
229 "anscombe.json",
230 "barley.json",
231 "birdstrikes.csv",
232 "budget.json",
233 "budgets.json",
234 "burtin.json",
235 "cars.json",
236 "co2-concentration.csv",
237 "countries.json",
238 "crimea.json",
239 "disasters.csv",
240 "driving.json",
241 "earthquakes.json",
242 "ffox.png",
243 "flare-dependencies.json",
244 "flare.json",
245 "flights-10k.json",
246 "flights-200k.arrow",
247 "flights-200k.json",
248 "flights-20k.json",
249 "flights-2k.json",
250 "flights-3m.csv",
251 "flights-5k.json",
252 "flights-airport.csv",
253 "football.json",
254 "gapminder-health-income.csv",
255 "gapminder.json",
256 "gimp.png",
257 "github.csv",
258 "income.json",
259 "iowa-electricity.csv",
260 "jobs.json",
261 "la-riots.csv",
262 "londonBoroughs.json",
263 "londonCentroids.json",
264 "londonTubeLines.json",
265 "lookup_groups.csv",
266 "lookup_people.csv",
267 "miserables.json",
268 "monarchs.json",
269 "movies.json",
270 "normal-2d.json",
271 "obesity.json",
272 "ohlc.json",
273 "penguins.json",
274 "platformer-terrain.json",
275 "points.json",
276 "political-contributions.json",
277 "population_engineers_hurricanes.csv",
278 "population.json",
279 "seattle-weather.csv",
280 "seattle-weather-hourly-normals.csv",
281 "sp500-2000.csv",
282 "sp500.csv",
283 "stocks.csv",
284 "udistrict.json",
285 "unemployment-across-industries.json",
286 "unemployment.tsv",
287 "uniform-2d.json",
288 "us-10m.json",
289 "us-employment.csv",
290 "us-state-capitals.json",
291 "volcano.json",
292 "weather.csv",
293 "weather.json",
294 "wheat.json",
295 "windvectors.csv",
296 "world-110m.json",
297 "zipcodes.csv",
298 ]
299 .into_iter()
300 .collect();
301}
302
303const DATASET_BASE: &str = "https://raw.githubusercontent.com/vega/vega-datasets";
304const DATASET_TAG: &str = "v2.3.0";
305
306fn check_builtin_dataset(url: String) -> String {
307 if let Some(dataset) = url.strip_prefix("data/") {
308 let path = std::path::Path::new(&url);
309 if !path.exists() && BUILT_IN_DATASETS.contains(dataset) {
310 format!("{DATASET_BASE}/{DATASET_TAG}/data/{dataset}")
311 } else {
312 url
313 }
314 } else {
315 url
316 }
317}
318
319async fn pre_process_column_types(df: DataFrame) -> Result<DataFrame> {
320 let mut selections: Vec<Expr> = Vec::new();
321 let mut pre_proc_needed = false;
322 for field in df.schema().fields().iter() {
323 if field.data_type() == &DataType::LargeUtf8 {
324 selections.push(
327 Expr::Cast(expr::Cast {
328 expr: Box::new(flat_col(field.name())),
329 data_type: DataType::Utf8,
330 })
331 .alias(field.name()),
332 );
333 pre_proc_needed = true;
334 } else {
335 selections.push(flat_col(field.name()))
336 }
337 }
338 if pre_proc_needed {
339 Ok(df.select(selections)?)
340 } else {
341 Ok(df)
342 }
343}
344
345async fn process_datetimes(
347 parse: &Option<Parse>,
348 sql_df: DataFrame,
349 tz_config: &Option<RuntimeTzConfig>,
350) -> Result<DataFrame> {
351 let mut date_fields: Vec<String> = Vec::new();
353 let mut df = sql_df;
354 if let Some(scan_url_format::Parse::Object(formats)) = parse {
355 for spec in &formats.specs {
356 let datatype = &spec.datatype;
357 if datatype.starts_with("date") || datatype.starts_with("utc") {
358 let (typ, fmt) = if let Some((typ, fmt)) = datatype.split_once(':') {
360 if fmt.starts_with("'") && fmt.ends_with("'") {
361 (typ.to_lowercase(), Some(fmt[1..fmt.len() - 1].to_string()))
362 } else {
363 (typ.to_lowercase(), Some(fmt.to_string()))
364 }
365 } else {
366 (datatype.to_lowercase(), None)
367 };
368
369 let schema = df.schema();
370 if let Ok(date_field) = schema.field_with_unqualified_name(&spec.name) {
371 let dtype = date_field.data_type();
372 let date_expr = if is_string_datatype(dtype) {
373 let default_input_tz_str = if typ == "utc" || tz_config.is_none() {
375 "UTC".to_string()
376 } else {
377 tz_config.unwrap().default_input_tz.to_string()
378 };
379
380 if let Some(fmt) = fmt {
381 str_to_timestamp(
383 flat_col(&spec.name),
384 &default_input_tz_str,
385 schema,
386 Some(fmt.as_str()),
387 )?
388 } else {
389 str_to_timestamp(
391 flat_col(&spec.name),
392 &default_input_tz_str,
393 schema,
394 None,
395 )?
396 }
397 } else if is_integer_datatype(dtype) {
398 make_date(flat_col(&spec.name), lit(1), lit(1))
400 } else {
401 continue;
402 };
403
404 date_fields.push(date_field.name().clone());
406
407 let mut columns: Vec<_> = schema
408 .fields()
409 .iter()
410 .filter_map(|field| {
411 let name = field.name();
412 if name == &spec.name {
413 None
414 } else {
415 Some(flat_col(name))
416 }
417 })
418 .collect();
419 columns.push(date_expr.alias(&spec.name));
420 df = df.select(columns)?
421 }
422 }
423 }
424 }
425
426 let schema = df.schema();
429 let selection: Vec<_> = schema
430 .fields()
431 .iter()
432 .map(|field| {
433 if !date_fields.contains(field.name()) {
434 let expr = match field.data_type() {
435 DataType::Timestamp(_, tz) => match tz {
436 Some(_) => {
437 flat_col(field.name())
439 }
440 _ => {
441 let tz_config =
443 tz_config.with_context(|| "No local timezone info provided")?;
444
445 flat_col(field.name()).try_cast_to(
446 &DataType::Timestamp(
447 TimeUnit::Millisecond,
448 Some(tz_config.default_input_tz.to_string().into()),
449 ),
450 schema,
451 )?
452 }
453 },
454 DataType::Date64 => {
455 let tz_config =
456 tz_config.with_context(|| "No local timezone info provided")?;
457
458 flat_col(field.name())
460 .try_cast_to(&DataType::Timestamp(TimeUnit::Millisecond, None), schema)?
461 .try_cast_to(
462 &DataType::Timestamp(
463 TimeUnit::Millisecond,
464 Some(tz_config.default_input_tz.to_string().into()),
465 ),
466 schema,
467 )?
468 }
469 _ => flat_col(field.name()),
470 };
471
472 Ok(if matches!(expr, Expr::Alias(_)) {
473 expr
474 } else {
475 expr.alias(field.name())
476 })
477 } else {
478 Ok(flat_col(field.name()))
479 }
480 })
481 .collect::<Result<Vec<_>>>()?;
482
483 Ok(df.select(selection)?)
484}
485
486#[async_trait]
487impl TaskCall for DataValuesTask {
488 async fn eval(
489 &self,
490 values: &[TaskValue],
491 tz_config: &Option<RuntimeTzConfig>,
492 _inline_datasets: HashMap<String, VegaFusionDataset>,
493 ctx: Arc<SessionContext>,
494 ) -> Result<(TaskValue, Vec<TaskValue>)> {
495 let values_table = VegaFusionTable::from_ipc_bytes(&self.values)?;
497 if values_table.schema.fields.is_empty() {
498 return Ok((TaskValue::Table(values_table), Default::default()));
499 }
500
501 if values_table.num_rows() == 0 {
504 if let Some(pipeline) = &self.pipeline {
505 if let Some(first_tx) = pipeline.transforms.first() {
506 if !matches!(first_tx.transform_kind(), TransformKind::Sequence(_)) {
507 return Ok((TaskValue::Table(values_table), Default::default()));
508 }
509 }
510 }
511 }
512
513 let values_table = values_table.with_ordering()?;
515
516 let parse = self.format_type.as_ref().and_then(|fmt| fmt.parse.clone());
518
519 let (transformed_table, output_values) = if self
521 .pipeline
522 .as_ref()
523 .map(|p| !p.transforms.is_empty())
524 .unwrap_or(false)
525 {
526 let pipeline = self.pipeline.as_ref().unwrap();
527
528 let config = build_compilation_config(&self.input_vars(), values, tz_config);
529
530 let df = ctx.vegafusion_table(values_table).await?;
532 let sql_df = process_datetimes(&parse, df, &config.tz_config).await?;
533
534 let (table, output_values) = pipeline.eval_sql(sql_df, &config).await?;
535
536 (table, output_values)
537 } else {
538 let values_df = ctx.vegafusion_table(values_table).await?;
540 let values_df = process_datetimes(&parse, values_df, tz_config).await?;
541 (values_df.collect_to_table().await?, Vec::new())
542 };
543
544 let table_value = TaskValue::Table(transformed_table.without_ordering()?);
545
546 Ok((table_value, output_values))
547 }
548}
549
550#[async_trait]
551impl TaskCall for DataSourceTask {
552 async fn eval(
553 &self,
554 values: &[TaskValue],
555 tz_config: &Option<RuntimeTzConfig>,
556 _inline_datasets: HashMap<String, VegaFusionDataset>,
557 ctx: Arc<SessionContext>,
558 ) -> Result<(TaskValue, Vec<TaskValue>)> {
559 let input_vars = self.input_vars();
560 let mut config = build_compilation_config(&input_vars, values, tz_config);
561
562 let source_table = config.data_scope.remove(&self.source).with_context(|| {
564 format!(
565 "Missing source {} for task with input variables\n{:#?}",
566 self.source, input_vars
567 )
568 })?;
569
570 let source_table = source_table.with_ordering()?;
572
573 let (transformed_table, output_values) = if self
575 .pipeline
576 .as_ref()
577 .map(|p| !p.transforms.is_empty())
578 .unwrap_or(false)
579 {
580 let pipeline = self.pipeline.as_ref().unwrap();
581 let sql_df = ctx.vegafusion_table(source_table).await?;
582 let (table, output_values) = pipeline.eval_sql(sql_df, &config).await?;
583
584 (table, output_values)
585 } else {
586 (source_table, Vec::new())
588 };
589
590 let table_value = TaskValue::Table(transformed_table.without_ordering()?);
591 Ok((table_value, output_values))
592 }
593}
594
595async fn read_csv_with_object_store(
597 url: &str,
598 parse: &Option<Parse>,
599 ctx: &SessionContext,
600 is_tsv: bool,
601 ext: &str,
602) -> Result<DataFrame> {
603 let mut csv_opts = if is_tsv {
605 CsvReadOptions {
606 delimiter: b'\t',
607 ..Default::default()
608 }
609 } else {
610 Default::default()
611 };
612 csv_opts.file_extension = ext;
613
614 let schema = build_csv_schema(&csv_opts, parse, url, ctx).await?;
616 csv_opts.schema = Some(&schema);
617
618 Ok(ctx.read_csv(url, csv_opts).await?)
620}
621
622#[cfg(feature = "http")]
624async fn read_csv_with_reqwest(
625 url: &str,
626 parse: &Option<Parse>,
627 ctx: &SessionContext,
628 is_tsv: bool,
629 ext: &str,
630) -> Result<DataFrame> {
631 let client = reqwest::Client::new();
633 let response = client
634 .get(url)
635 .send()
636 .await
637 .external(format!("Failed to fetch URL: {url}"))?;
638
639 let text = response
640 .text()
641 .await
642 .external("Failed to read response as text")?;
643
644 use std::io::Write;
646 let temp_dir = tempfile::tempdir()?;
647 let temp_path = temp_dir.path().join("temp.csv");
648 let mut temp_file = std::fs::File::create(&temp_path)?;
649 temp_file.write_all(text.as_bytes())?;
650 temp_file.sync_all()?;
651
652 let temp_url = format!("file://{}", temp_path.display());
654
655 let mut csv_opts = if is_tsv {
657 CsvReadOptions {
658 delimiter: b'\t',
659 ..Default::default()
660 }
661 } else {
662 Default::default()
663 };
664 csv_opts.file_extension = ext;
665
666 let schema = build_csv_schema(&csv_opts, parse, &temp_url, ctx).await?;
668 csv_opts.schema = Some(&schema);
669
670 let df = ctx.read_csv(&temp_url, csv_opts).await?;
673 let batches = df.collect().await?;
674
675 let schema = if let Some(batch) = batches.first() {
677 batch.schema()
678 } else {
679 return Err(VegaFusionError::internal("No data in CSV file"));
680 };
681 let table = VegaFusionTable::try_new(schema, batches)?;
682 ctx.vegafusion_table(table).await
683}
684
685async fn read_csv(
686 url: &str,
687 parse: &Option<Parse>,
688 ctx: Arc<SessionContext>,
689 is_tsv: bool,
690) -> Result<DataFrame> {
691 let ext = if let Some(ext) = Path::new(url).extension().and_then(|ext| ext.to_str()) {
693 ext.to_string()
694 } else {
695 "".to_string()
696 };
697
698 maybe_register_object_stores_for_url(&ctx, url)?;
699
700 #[cfg(feature = "http")]
701 {
702 if url.starts_with("http://") || url.starts_with("https://") {
704 match read_csv_with_object_store(url, parse, &ctx, is_tsv, &ext).await {
705 Ok(df) => Ok(df),
706 Err(_) => {
707 read_csv_with_reqwest(url, parse, &ctx, is_tsv, &ext).await
709 }
710 }
711 } else {
712 read_csv_with_object_store(url, parse, &ctx, is_tsv, &ext).await
714 }
715 }
716
717 #[cfg(not(feature = "http"))]
718 {
719 read_csv_with_object_store(url, parse, &ctx, is_tsv, &ext).await
721 }
722}
723
724async fn build_csv_schema(
726 csv_opts: &CsvReadOptions<'_>,
727 parse: &Option<Parse>,
728 uri: impl Into<String>,
729 ctx: &SessionContext,
730) -> Result<Schema> {
731 let format_specs = if let Some(parse) = parse {
733 match parse {
734 Parse::String(_) => {
735 HashMap::new()
737 }
738 Parse::Object(field_specs) => field_specs
739 .specs
740 .iter()
741 .map(|spec| (spec.name.clone(), spec.datatype.clone()))
742 .collect(),
743 }
744 } else {
745 HashMap::new()
746 };
747
748 let field_types: HashMap<_, _> = format_specs
750 .iter()
751 .map(|(name, vega_type)| {
752 let dtype = match vega_type.as_str() {
753 "number" => DataType::Float64,
754 "boolean" => DataType::Boolean,
755 "date" => DataType::Utf8, "string" => DataType::Utf8,
757 _ => DataType::Utf8,
758 };
759 (name.clone(), dtype)
760 })
761 .collect();
762
763 let table_path = ListingTableUrl::parse(uri.into().as_str())?;
765 let listing_options =
766 csv_opts.to_listing_options(&ctx.copied_config(), TableOptions::default());
767 let inferred_schema = listing_options
768 .infer_schema(&ctx.state(), &table_path)
769 .await?;
770
771 let new_fields: Vec<_> = inferred_schema
773 .fields()
774 .iter()
775 .map(|field| {
776 let dtype = field_types
778 .get(field.name())
779 .cloned()
780 .unwrap_or(DataType::Utf8);
781 Field::new(field.name(), dtype, true)
782 })
783 .collect();
784 Ok(Schema::new(new_fields))
785}
786
787async fn read_json(url: &str, ctx: Arc<SessionContext>) -> Result<DataFrame> {
788 let value: serde_json::Value =
789 if let Some(base_url) = maybe_register_object_stores_for_url(&ctx, url)? {
790 let store = ctx.runtime_env().object_store(&base_url)?;
792 let child_url = url.strip_prefix(&base_url.to_string()).unwrap();
793 match store.get(&child_url.into()).await {
794 Ok(get_res) => {
795 let bytes = get_res.bytes().await?.to_vec();
796 let text: Cow<str> = String::from_utf8_lossy(&bytes);
797 serde_json::from_str(text.as_ref())?
798 }
799 Err(e) => {
800 cfg_if::cfg_if! {
801 if #[cfg(feature="http")] {
802 if url.starts_with("http://") || url.starts_with("https://") {
803 let client = reqwest::Client::new();
807 let response = client
808 .get(url)
809 .send()
810 .await
811 .external(format!("Failed to fetch URL: {url}"))?;
812
813 let text = response
814 .text()
815 .await
816 .external("Failed to read response as text")?;
817 serde_json::from_str(&text)?
818 } else {
819 return Err(VegaFusionError::from(e));
820 }
821 } else {
822 return Err(VegaFusionError::from(e));
823 }
824 }
825 }
826 }
827 } else {
828 cfg_if::cfg_if! {
829 if #[cfg(feature="fs")] {
830 let mut file = tokio::fs::File::open(url)
832 .await
833 .external(format!("Failed to open as local file: {url}"))?;
834
835 let mut json_str = String::new();
836 file.read_to_string(&mut json_str)
837 .await
838 .external("Failed to read file contents to string")?;
839
840 serde_json::from_str(&json_str)?
841 } else {
842 return Err(VegaFusionError::internal(
843 "The `fs` feature flag must be enabled for file system support"
844 ));
845 }
846 }
847 };
848
849 let table = VegaFusionTable::from_json(&value)?.with_ordering()?;
850 ctx.vegafusion_table(table).await
851}
852
853async fn read_arrow(url: &str, ctx: Arc<SessionContext>) -> Result<DataFrame> {
854 maybe_register_object_stores_for_url(&ctx, url)?;
855 Ok(ctx.read_arrow(url, ArrowReadOptions::default()).await?)
856}
857
858#[cfg(feature = "parquet")]
859async fn read_parquet(url: &str, ctx: Arc<SessionContext>) -> Result<DataFrame> {
860 maybe_register_object_stores_for_url(&ctx, url)?;
861 Ok(ctx.read_parquet(url, ParquetReadOptions::default()).await?)
862}
863
864fn maybe_register_object_stores_for_url(
865 ctx: &SessionContext,
866 url: &str,
867) -> Result<Option<ObjectStoreUrl>> {
868 #[cfg(any(feature = "http", feature = "http-wasm"))]
870 {
871 let maybe_register_http_store = |prefix: &str| -> Result<Option<ObjectStoreUrl>> {
872 if let Some(path) = url.strip_prefix(prefix) {
873 let Some((root, _)) = path.split_once('/') else {
874 return Err(VegaFusionError::specification(format!(
875 "Invalid https URL: {url}"
876 )));
877 };
878 let base_url_str = format!("https://{root}");
879 let base_url = url::Url::parse(&base_url_str)?;
880
881 let object_store_url = ObjectStoreUrl::parse(&base_url_str)?;
883 if ctx
884 .runtime_env()
885 .object_store(object_store_url.clone())
886 .is_err()
887 {
888 cfg_if! {
889 if #[cfg(feature="http")] {
890 let client_options = ClientOptions::new().with_allow_http(true);
891 let http_store = HttpBuilder::new()
892 .with_url(base_url.clone())
893 .with_client_options(client_options)
894 .build()?;
895 ctx.register_object_store(&base_url, Arc::new(http_store));
896 } else if #[cfg(target_arch = "wasm32")] {
897 let http_store = HttpStore::new(base_url.clone());
898 ctx.register_object_store(&base_url, Arc::new(http_store));
899 } else {
900 return Err(VegaFusionError::internal("HTTP support not available"));
901 }
902 }
903 }
904 return Ok(Some(object_store_url));
905 }
906 Ok(None)
907 };
908
909 if let Some(url) = maybe_register_http_store("https://")? {
911 return Ok(Some(url));
912 }
913
914 if let Some(url) = maybe_register_http_store("http://")? {
916 return Ok(Some(url));
917 }
918 }
919
920 #[cfg(feature = "s3")]
922 if let Some(bucket_path) = url.strip_prefix("s3://") {
923 let Some((bucket, _)) = bucket_path.split_once('/') else {
924 return Err(VegaFusionError::specification(format!(
925 "Invalid s3 URL: {url}"
926 )));
927 };
928 let base_url_str = format!("s3://{bucket}/");
930 let object_store_url = ObjectStoreUrl::parse(&base_url_str)?;
931 if ctx
932 .runtime_env()
933 .object_store(object_store_url.clone())
934 .is_err()
935 {
936 let base_url = url::Url::parse(&base_url_str)?;
937 let s3 = AmazonS3Builder::from_env().with_url(base_url.clone()).build().with_context(||
938 "Failed to initialize s3 connection from environment variables.\n\
939 See https://docs.rs/object_store/latest/object_store/aws/struct.AmazonS3Builder.html#method.from_env".to_string()
940 )?;
941 ctx.register_object_store(&base_url, Arc::new(s3));
942 }
943 return Ok(Some(object_store_url));
944 }
945
946 Ok(None)
947}