vegafusion_runtime/data/
tasks.rs

1use crate::expression::compiler::compile;
2use crate::expression::compiler::config::CompilationConfig;
3use crate::expression::compiler::utils::ExprHelpers;
4use crate::task_graph::task::TaskCall;
5use std::borrow::Cow;
6
7use async_trait::async_trait;
8
9use datafusion_expr::{expr, lit, Expr};
10use std::collections::{HashMap, HashSet};
11use std::path::Path;
12use vegafusion_core::data::dataset::VegaFusionDataset;
13
14use crate::task_graph::timezone::RuntimeTzConfig;
15use crate::transform::pipeline::TransformPipelineUtils;
16use cfg_if::cfg_if;
17use datafusion::datasource::listing::ListingTableUrl;
18use datafusion::datasource::object_store::ObjectStoreUrl;
19use datafusion::execution::options::{ArrowReadOptions, ReadOptions};
20use datafusion::prelude::{CsvReadOptions, DataFrame, SessionContext};
21use datafusion_common::config::TableOptions;
22use datafusion_functions::expr_fn::make_date;
23use std::sync::Arc;
24
25use vegafusion_common::data::scalar::{ScalarValue, ScalarValueHelpers};
26use vegafusion_common::error::{Result, ResultWithContext, VegaFusionError};
27
28use vegafusion_core::proto::gen::tasks::data_url_task::Url;
29use vegafusion_core::proto::gen::tasks::scan_url_format;
30use vegafusion_core::proto::gen::tasks::scan_url_format::Parse;
31use vegafusion_core::proto::gen::tasks::{DataSourceTask, DataUrlTask, DataValuesTask};
32use vegafusion_core::proto::gen::transforms::TransformPipeline;
33use vegafusion_core::task_graph::task::{InputVariable, TaskDependencies};
34use vegafusion_core::task_graph::task_value::TaskValue;
35
36use crate::data::util::{DataFrameUtils, SessionContextUtils};
37use crate::transform::utils::str_to_timestamp;
38
39use object_store::ObjectStore;
40use vegafusion_common::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
41use vegafusion_common::column::flat_col;
42use vegafusion_common::data::table::VegaFusionTable;
43use vegafusion_common::data::ORDER_COL;
44use vegafusion_common::datatypes::{is_integer_datatype, is_string_datatype};
45use vegafusion_core::proto::gen::transforms::transform::TransformKind;
46use vegafusion_core::spec::visitors::extract_inline_dataset;
47
48#[cfg(feature = "s3")]
49use object_store::aws::AmazonS3Builder;
50
51#[cfg(feature = "http")]
52use object_store::{http::HttpBuilder, ClientOptions};
53
54#[cfg(feature = "fs")]
55use tokio::io::AsyncReadExt;
56
57#[cfg(feature = "parquet")]
58use {datafusion::prelude::ParquetReadOptions, vegafusion_common::error::ToExternalError};
59
60#[cfg(target_arch = "wasm32")]
61use object_store_wasm::HttpStore;
62
63pub fn build_compilation_config(
64    input_vars: &[InputVariable],
65    values: &[TaskValue],
66    tz_config: &Option<RuntimeTzConfig>,
67) -> CompilationConfig {
68    // Build compilation config from input_vals
69    let mut signal_scope: HashMap<String, ScalarValue> = HashMap::new();
70    let mut data_scope: HashMap<String, VegaFusionTable> = HashMap::new();
71
72    for (input_var, input_val) in input_vars.iter().zip(values) {
73        match input_val {
74            TaskValue::Scalar(value) => {
75                signal_scope.insert(input_var.var.name.clone(), value.clone());
76            }
77            TaskValue::Table(table) => {
78                data_scope.insert(input_var.var.name.clone(), table.clone());
79            }
80        }
81    }
82
83    // CompilationConfig is not Send, so use local scope here to make sure it's dropped
84    // before the call to await below.
85    CompilationConfig {
86        signal_scope,
87        data_scope,
88        tz_config: *tz_config,
89        ..Default::default()
90    }
91}
92
93#[async_trait]
94impl TaskCall for DataUrlTask {
95    async fn eval(
96        &self,
97        values: &[TaskValue],
98        tz_config: &Option<RuntimeTzConfig>,
99        inline_datasets: HashMap<String, VegaFusionDataset>,
100        ctx: Arc<SessionContext>,
101    ) -> Result<(TaskValue, Vec<TaskValue>)> {
102        // Build compilation config for url signal (if any) and transforms (if any)
103        let config = build_compilation_config(&self.input_vars(), values, tz_config);
104
105        // Build url string
106        let url = match self.url.as_ref().unwrap() {
107            Url::String(url) => url.clone(),
108            Url::Expr(expr) => {
109                let compiled = compile(expr, &config, None)?;
110                let url_scalar = compiled.eval_to_scalar()?;
111                url_scalar.to_scalar_string()?
112            }
113        };
114
115        // Strip trailing Hash, e.g. https://foo.csv#1234 -> https://foo.csv
116        let url_parts: Vec<&str> = url.splitn(2, '#').collect();
117        let url = url_parts.first().cloned().unwrap_or(&url).to_string();
118
119        // Handle references to vega default datasets (e.g. "data/us-10m.json")
120        let url = check_builtin_dataset(url);
121
122        // Load data from URL
123        let parse = self.format_type.as_ref().and_then(|fmt| fmt.parse.clone());
124        let file_type = self.format_type.as_ref().and_then(|fmt| fmt.r#type.clone());
125
126        // Vega-Lite sets unspecified file types to "json", so we don't want this to take
127        // precedence over file extension
128        let file_type = if file_type == Some("json".to_string()) {
129            None
130        } else {
131            file_type.as_deref()
132        };
133
134        let df = if let Some(inline_name) = extract_inline_dataset(&url) {
135            let inline_name = inline_name.trim().to_string();
136            if let Some(inline_dataset) = inline_datasets.get(&inline_name) {
137                match inline_dataset {
138                    VegaFusionDataset::Table { table, .. } => {
139                        let table = table.clone().with_ordering()?;
140                        ctx.vegafusion_table(table).await?
141                    }
142                    VegaFusionDataset::Plan { plan } => {
143                        ctx.execute_logical_plan(plan.clone()).await?
144                    }
145                }
146            } else if let Ok(df) = ctx.table(&inline_name).await {
147                df
148            } else {
149                return Err(VegaFusionError::internal(format!(
150                    "No inline dataset named {inline_name}"
151                )));
152            }
153        } else if file_type == Some("csv") || (file_type.is_none() && url.ends_with(".csv")) {
154            read_csv(&url, &parse, ctx, false).await?
155        } else if file_type == Some("tsv") || (file_type.is_none() && url.ends_with(".tsv")) {
156            read_csv(&url, &parse, ctx, true).await?
157        } else if file_type == Some("json") || (file_type.is_none() && url.ends_with(".json")) {
158            read_json(&url, ctx).await?
159        } else if file_type == Some("arrow")
160            || (file_type.is_none() && (url.ends_with(".arrow") || url.ends_with(".feather")))
161        {
162            read_arrow(&url, ctx).await?
163        } else if file_type == Some("parquet")
164            || (file_type.is_none() && (url.ends_with(".parquet")))
165        {
166            cfg_if! {
167                if #[cfg(any(feature = "parquet"))] {
168                    read_parquet(&url, ctx).await?
169                } else {
170                    return Err(VegaFusionError::internal(format!(
171                        "Enable parquet support by enabling the `parquet` feature flag"
172                    )))
173                }
174            }
175        } else {
176            return Err(VegaFusionError::internal(format!(
177                "Invalid url file extension {url}"
178            )));
179        };
180
181        // Ensure there is an ordering column present
182        let df = if df.schema().inner().column_with_name(ORDER_COL).is_none() {
183            df.with_index(ORDER_COL).await?
184        } else {
185            df
186        };
187
188        // Perform any up-front type conversions
189        let df = pre_process_column_types(df).await?;
190
191        // Process datetime columns
192        let df = process_datetimes(&parse, df, &config.tz_config).await?;
193
194        eval_sql_df(df, &self.pipeline, &config).await
195    }
196}
197
198async fn eval_sql_df(
199    sql_df: DataFrame,
200    pipeline: &Option<TransformPipeline>,
201    config: &CompilationConfig,
202) -> Result<(TaskValue, Vec<TaskValue>)> {
203    // Apply transforms (if any)
204    let (transformed_df, output_values) = if pipeline
205        .as_ref()
206        .map(|p| !p.transforms.is_empty())
207        .unwrap_or(false)
208    {
209        let pipeline = pipeline.as_ref().unwrap();
210        pipeline.eval_sql(sql_df, config).await?
211    } else {
212        // No transforms, just remove any ordering column
213        (
214            sql_df.collect_to_table().await?.without_ordering()?,
215            Vec::new(),
216        )
217    };
218
219    let table_value = TaskValue::Table(transformed_df.without_ordering()?);
220
221    Ok((table_value, output_values))
222}
223
224lazy_static! {
225    static ref BUILT_IN_DATASETS: HashSet<&'static str> = vec![
226        "7zip.png",
227        "airports.csv",
228        "annual-precip.json",
229        "anscombe.json",
230        "barley.json",
231        "birdstrikes.csv",
232        "budget.json",
233        "budgets.json",
234        "burtin.json",
235        "cars.json",
236        "co2-concentration.csv",
237        "countries.json",
238        "crimea.json",
239        "disasters.csv",
240        "driving.json",
241        "earthquakes.json",
242        "ffox.png",
243        "flare-dependencies.json",
244        "flare.json",
245        "flights-10k.json",
246        "flights-200k.arrow",
247        "flights-200k.json",
248        "flights-20k.json",
249        "flights-2k.json",
250        "flights-3m.csv",
251        "flights-5k.json",
252        "flights-airport.csv",
253        "football.json",
254        "gapminder-health-income.csv",
255        "gapminder.json",
256        "gimp.png",
257        "github.csv",
258        "income.json",
259        "iowa-electricity.csv",
260        "jobs.json",
261        "la-riots.csv",
262        "londonBoroughs.json",
263        "londonCentroids.json",
264        "londonTubeLines.json",
265        "lookup_groups.csv",
266        "lookup_people.csv",
267        "miserables.json",
268        "monarchs.json",
269        "movies.json",
270        "normal-2d.json",
271        "obesity.json",
272        "ohlc.json",
273        "penguins.json",
274        "platformer-terrain.json",
275        "points.json",
276        "political-contributions.json",
277        "population_engineers_hurricanes.csv",
278        "population.json",
279        "seattle-weather.csv",
280        "seattle-weather-hourly-normals.csv",
281        "sp500-2000.csv",
282        "sp500.csv",
283        "stocks.csv",
284        "udistrict.json",
285        "unemployment-across-industries.json",
286        "unemployment.tsv",
287        "uniform-2d.json",
288        "us-10m.json",
289        "us-employment.csv",
290        "us-state-capitals.json",
291        "volcano.json",
292        "weather.csv",
293        "weather.json",
294        "wheat.json",
295        "windvectors.csv",
296        "world-110m.json",
297        "zipcodes.csv",
298    ]
299    .into_iter()
300    .collect();
301}
302
303const DATASET_BASE: &str = "https://raw.githubusercontent.com/vega/vega-datasets";
304const DATASET_TAG: &str = "v2.3.0";
305
306fn check_builtin_dataset(url: String) -> String {
307    if let Some(dataset) = url.strip_prefix("data/") {
308        let path = std::path::Path::new(&url);
309        if !path.exists() && BUILT_IN_DATASETS.contains(dataset) {
310            format!("{DATASET_BASE}/{DATASET_TAG}/data/{dataset}")
311        } else {
312            url
313        }
314    } else {
315        url
316    }
317}
318
319async fn pre_process_column_types(df: DataFrame) -> Result<DataFrame> {
320    let mut selections: Vec<Expr> = Vec::new();
321    let mut pre_proc_needed = false;
322    for field in df.schema().fields().iter() {
323        if field.data_type() == &DataType::LargeUtf8 {
324            // Work around https://github.com/apache/arrow-rs/issues/2654 by converting
325            // LargeUtf8 to Utf8
326            selections.push(
327                Expr::Cast(expr::Cast {
328                    expr: Box::new(flat_col(field.name())),
329                    data_type: DataType::Utf8,
330                })
331                .alias(field.name()),
332            );
333            pre_proc_needed = true;
334        } else {
335            selections.push(flat_col(field.name()))
336        }
337    }
338    if pre_proc_needed {
339        Ok(df.select(selections)?)
340    } else {
341        Ok(df)
342    }
343}
344
345/// After processing, all datetime columns are converted to Timestamptz and Date32
346async fn process_datetimes(
347    parse: &Option<Parse>,
348    sql_df: DataFrame,
349    tz_config: &Option<RuntimeTzConfig>,
350) -> Result<DataFrame> {
351    // Perform specialized date parsing
352    let mut date_fields: Vec<String> = Vec::new();
353    let mut df = sql_df;
354    if let Some(scan_url_format::Parse::Object(formats)) = parse {
355        for spec in &formats.specs {
356            let datatype = &spec.datatype;
357            if datatype.starts_with("date") || datatype.starts_with("utc") {
358                // look for format string
359                let (typ, fmt) = if let Some((typ, fmt)) = datatype.split_once(':') {
360                    if fmt.starts_with("'") && fmt.ends_with("'") {
361                        (typ.to_lowercase(), Some(fmt[1..fmt.len() - 1].to_string()))
362                    } else {
363                        (typ.to_lowercase(), Some(fmt.to_string()))
364                    }
365                } else {
366                    (datatype.to_lowercase(), None)
367                };
368
369                let schema = df.schema();
370                if let Ok(date_field) = schema.field_with_unqualified_name(&spec.name) {
371                    let dtype = date_field.data_type();
372                    let date_expr = if is_string_datatype(dtype) {
373                        // Compute default timezone
374                        let default_input_tz_str = if typ == "utc" || tz_config.is_none() {
375                            "UTC".to_string()
376                        } else {
377                            tz_config.unwrap().default_input_tz.to_string()
378                        };
379
380                        if let Some(fmt) = fmt {
381                            // Parse with single explicit format
382                            str_to_timestamp(
383                                flat_col(&spec.name),
384                                &default_input_tz_str,
385                                schema,
386                                Some(fmt.as_str()),
387                            )?
388                        } else {
389                            // Parse with auto formats, then localize to default_input_tz
390                            str_to_timestamp(
391                                flat_col(&spec.name),
392                                &default_input_tz_str,
393                                schema,
394                                None,
395                            )?
396                        }
397                    } else if is_integer_datatype(dtype) {
398                        // Assume Year was parsed numerically, return Date32
399                        make_date(flat_col(&spec.name), lit(1), lit(1))
400                    } else {
401                        continue;
402                    };
403
404                    // Add to date_fields if special date processing was performed
405                    date_fields.push(date_field.name().clone());
406
407                    let mut columns: Vec<_> = schema
408                        .fields()
409                        .iter()
410                        .filter_map(|field| {
411                            let name = field.name();
412                            if name == &spec.name {
413                                None
414                            } else {
415                                Some(flat_col(name))
416                            }
417                        })
418                        .collect();
419                    columns.push(date_expr.alias(&spec.name));
420                    df = df.select(columns)?
421                }
422            }
423        }
424    }
425
426    // Standardize other Timestamp columns (those that weren't created above) to integer
427    // milliseconds
428    let schema = df.schema();
429    let selection: Vec<_> = schema
430        .fields()
431        .iter()
432        .map(|field| {
433            if !date_fields.contains(field.name()) {
434                let expr = match field.data_type() {
435                    DataType::Timestamp(_, tz) => match tz {
436                        Some(_) => {
437                            // Timestamp has explicit timezone, all good
438                            flat_col(field.name())
439                        }
440                        _ => {
441                            // Naive timestamp, localize to default_input_tz
442                            let tz_config =
443                                tz_config.with_context(|| "No local timezone info provided")?;
444
445                            flat_col(field.name()).try_cast_to(
446                                &DataType::Timestamp(
447                                    TimeUnit::Millisecond,
448                                    Some(tz_config.default_input_tz.to_string().into()),
449                                ),
450                                schema,
451                            )?
452                        }
453                    },
454                    DataType::Date64 => {
455                        let tz_config =
456                            tz_config.with_context(|| "No local timezone info provided")?;
457
458                        // Cast to naive timestamp, then localize to timestamp with timezone
459                        flat_col(field.name())
460                            .try_cast_to(&DataType::Timestamp(TimeUnit::Millisecond, None), schema)?
461                            .try_cast_to(
462                                &DataType::Timestamp(
463                                    TimeUnit::Millisecond,
464                                    Some(tz_config.default_input_tz.to_string().into()),
465                                ),
466                                schema,
467                            )?
468                    }
469                    _ => flat_col(field.name()),
470                };
471
472                Ok(if matches!(expr, Expr::Alias(_)) {
473                    expr
474                } else {
475                    expr.alias(field.name())
476                })
477            } else {
478                Ok(flat_col(field.name()))
479            }
480        })
481        .collect::<Result<Vec<_>>>()?;
482
483    Ok(df.select(selection)?)
484}
485
486#[async_trait]
487impl TaskCall for DataValuesTask {
488    async fn eval(
489        &self,
490        values: &[TaskValue],
491        tz_config: &Option<RuntimeTzConfig>,
492        _inline_datasets: HashMap<String, VegaFusionDataset>,
493        ctx: Arc<SessionContext>,
494    ) -> Result<(TaskValue, Vec<TaskValue>)> {
495        // Deserialize data into table
496        let values_table = VegaFusionTable::from_ipc_bytes(&self.values)?;
497        if values_table.schema.fields.is_empty() {
498            return Ok((TaskValue::Table(values_table), Default::default()));
499        }
500
501        // Return early for empty input data unless first transform is a sequence
502        // (which generates its own data)
503        if values_table.num_rows() == 0 {
504            if let Some(pipeline) = &self.pipeline {
505                if let Some(first_tx) = pipeline.transforms.first() {
506                    if !matches!(first_tx.transform_kind(), TransformKind::Sequence(_)) {
507                        return Ok((TaskValue::Table(values_table), Default::default()));
508                    }
509                }
510            }
511        }
512
513        // Add ordering column
514        let values_table = values_table.with_ordering()?;
515
516        // Get parse format for date processing
517        let parse = self.format_type.as_ref().and_then(|fmt| fmt.parse.clone());
518
519        // Apply transforms (if any)
520        let (transformed_table, output_values) = if self
521            .pipeline
522            .as_ref()
523            .map(|p| !p.transforms.is_empty())
524            .unwrap_or(false)
525        {
526            let pipeline = self.pipeline.as_ref().unwrap();
527
528            let config = build_compilation_config(&self.input_vars(), values, tz_config);
529
530            // Process datetime columns
531            let df = ctx.vegafusion_table(values_table).await?;
532            let sql_df = process_datetimes(&parse, df, &config.tz_config).await?;
533
534            let (table, output_values) = pipeline.eval_sql(sql_df, &config).await?;
535
536            (table, output_values)
537        } else {
538            // No transforms
539            let values_df = ctx.vegafusion_table(values_table).await?;
540            let values_df = process_datetimes(&parse, values_df, tz_config).await?;
541            (values_df.collect_to_table().await?, Vec::new())
542        };
543
544        let table_value = TaskValue::Table(transformed_table.without_ordering()?);
545
546        Ok((table_value, output_values))
547    }
548}
549
550#[async_trait]
551impl TaskCall for DataSourceTask {
552    async fn eval(
553        &self,
554        values: &[TaskValue],
555        tz_config: &Option<RuntimeTzConfig>,
556        _inline_datasets: HashMap<String, VegaFusionDataset>,
557        ctx: Arc<SessionContext>,
558    ) -> Result<(TaskValue, Vec<TaskValue>)> {
559        let input_vars = self.input_vars();
560        let mut config = build_compilation_config(&input_vars, values, tz_config);
561
562        // Remove source table from config
563        let source_table = config.data_scope.remove(&self.source).with_context(|| {
564            format!(
565                "Missing source {} for task with input variables\n{:#?}",
566                self.source, input_vars
567            )
568        })?;
569
570        // Add ordering column
571        let source_table = source_table.with_ordering()?;
572
573        // Apply transforms (if any)
574        let (transformed_table, output_values) = if self
575            .pipeline
576            .as_ref()
577            .map(|p| !p.transforms.is_empty())
578            .unwrap_or(false)
579        {
580            let pipeline = self.pipeline.as_ref().unwrap();
581            let sql_df = ctx.vegafusion_table(source_table).await?;
582            let (table, output_values) = pipeline.eval_sql(sql_df, &config).await?;
583
584            (table, output_values)
585        } else {
586            // No transforms
587            (source_table, Vec::new())
588        };
589
590        let table_value = TaskValue::Table(transformed_table.without_ordering()?);
591        Ok((table_value, output_values))
592    }
593}
594
595// Try to read CSV using object_store
596async fn read_csv_with_object_store(
597    url: &str,
598    parse: &Option<Parse>,
599    ctx: &SessionContext,
600    is_tsv: bool,
601    ext: &str,
602) -> Result<DataFrame> {
603    // Build CSV options
604    let mut csv_opts = if is_tsv {
605        CsvReadOptions {
606            delimiter: b'\t',
607            ..Default::default()
608        }
609    } else {
610        Default::default()
611    };
612    csv_opts.file_extension = ext;
613
614    // Build schema from Vega parse options
615    let schema = build_csv_schema(&csv_opts, parse, url, ctx).await?;
616    csv_opts.schema = Some(&schema);
617
618    // Read the CSV
619    Ok(ctx.read_csv(url, csv_opts).await?)
620}
621
622// Read CSV using reqwest fallback
623#[cfg(feature = "http")]
624async fn read_csv_with_reqwest(
625    url: &str,
626    parse: &Option<Parse>,
627    ctx: &SessionContext,
628    is_tsv: bool,
629    ext: &str,
630) -> Result<DataFrame> {
631    // Fetch CSV content using reqwest
632    let client = reqwest::Client::new();
633    let response = client
634        .get(url)
635        .send()
636        .await
637        .external(format!("Failed to fetch URL: {url}"))?;
638
639    let text = response
640        .text()
641        .await
642        .external("Failed to read response as text")?;
643
644    // Create a temporary file to store the CSV content
645    use std::io::Write;
646    let temp_dir = tempfile::tempdir()?;
647    let temp_path = temp_dir.path().join("temp.csv");
648    let mut temp_file = std::fs::File::create(&temp_path)?;
649    temp_file.write_all(text.as_bytes())?;
650    temp_file.sync_all()?;
651
652    // Read the CSV from the temporary file
653    let temp_url = format!("file://{}", temp_path.display());
654
655    // Build CSV options
656    let mut csv_opts = if is_tsv {
657        CsvReadOptions {
658            delimiter: b'\t',
659            ..Default::default()
660        }
661    } else {
662        Default::default()
663    };
664    csv_opts.file_extension = ext;
665
666    // Build schema from the temporary file
667    let schema = build_csv_schema(&csv_opts, parse, &temp_url, ctx).await?;
668    csv_opts.schema = Some(&schema);
669
670    // Read the CSV and collect it immediately to ensure the data is loaded
671    // before the temporary file is deleted
672    let df = ctx.read_csv(&temp_url, csv_opts).await?;
673    let batches = df.collect().await?;
674
675    // Create a VegaFusionTable from the collected batches and convert back to DataFrame
676    let schema = if let Some(batch) = batches.first() {
677        batch.schema()
678    } else {
679        return Err(VegaFusionError::internal("No data in CSV file"));
680    };
681    let table = VegaFusionTable::try_new(schema, batches)?;
682    ctx.vegafusion_table(table).await
683}
684
685async fn read_csv(
686    url: &str,
687    parse: &Option<Parse>,
688    ctx: Arc<SessionContext>,
689    is_tsv: bool,
690) -> Result<DataFrame> {
691    // Add file extension based on URL
692    let ext = if let Some(ext) = Path::new(url).extension().and_then(|ext| ext.to_str()) {
693        ext.to_string()
694    } else {
695        "".to_string()
696    };
697
698    maybe_register_object_stores_for_url(&ctx, url)?;
699
700    #[cfg(feature = "http")]
701    {
702        // For HTTP URLs, try object_store first, fall back to reqwest on any error
703        if url.starts_with("http://") || url.starts_with("https://") {
704            match read_csv_with_object_store(url, parse, &ctx, is_tsv, &ext).await {
705                Ok(df) => Ok(df),
706                Err(_) => {
707                    // Any error, fall back to reqwest
708                    read_csv_with_reqwest(url, parse, &ctx, is_tsv, &ext).await
709                }
710            }
711        } else {
712            // Non-HTTP URL, use object_store
713            read_csv_with_object_store(url, parse, &ctx, is_tsv, &ext).await
714        }
715    }
716
717    #[cfg(not(feature = "http"))]
718    {
719        // HTTP feature not enabled (e.g., WASM), use object_store only
720        read_csv_with_object_store(url, parse, &ctx, is_tsv, &ext).await
721    }
722}
723
724/// Build final schema by combining the input and inferred schemas
725async fn build_csv_schema(
726    csv_opts: &CsvReadOptions<'_>,
727    parse: &Option<Parse>,
728    uri: impl Into<String>,
729    ctx: &SessionContext,
730) -> Result<Schema> {
731    // Get HashMap of provided columns formats
732    let format_specs = if let Some(parse) = parse {
733        match parse {
734            Parse::String(_) => {
735                // auto, use inferred schema
736                HashMap::new()
737            }
738            Parse::Object(field_specs) => field_specs
739                .specs
740                .iter()
741                .map(|spec| (spec.name.clone(), spec.datatype.clone()))
742                .collect(),
743        }
744    } else {
745        HashMap::new()
746    };
747
748    // Map formats to fields
749    let field_types: HashMap<_, _> = format_specs
750        .iter()
751        .map(|(name, vega_type)| {
752            let dtype = match vega_type.as_str() {
753                "number" => DataType::Float64,
754                "boolean" => DataType::Boolean,
755                "date" => DataType::Utf8, // Parse as string, convert to date later
756                "string" => DataType::Utf8,
757                _ => DataType::Utf8,
758            };
759            (name.clone(), dtype)
760        })
761        .collect();
762
763    // Get inferred schema
764    let table_path = ListingTableUrl::parse(uri.into().as_str())?;
765    let listing_options =
766        csv_opts.to_listing_options(&ctx.copied_config(), TableOptions::default());
767    let inferred_schema = listing_options
768        .infer_schema(&ctx.state(), &table_path)
769        .await?;
770
771    // Override inferred schema based on parse options
772    let new_fields: Vec<_> = inferred_schema
773        .fields()
774        .iter()
775        .map(|field| {
776            // Use provided field type, but fall back to string for unprovided columns
777            let dtype = field_types
778                .get(field.name())
779                .cloned()
780                .unwrap_or(DataType::Utf8);
781            Field::new(field.name(), dtype, true)
782        })
783        .collect();
784    Ok(Schema::new(new_fields))
785}
786
787async fn read_json(url: &str, ctx: Arc<SessionContext>) -> Result<DataFrame> {
788    let value: serde_json::Value =
789        if let Some(base_url) = maybe_register_object_stores_for_url(&ctx, url)? {
790            // Create single use object store that points directly to file
791            let store = ctx.runtime_env().object_store(&base_url)?;
792            let child_url = url.strip_prefix(&base_url.to_string()).unwrap();
793            match store.get(&child_url.into()).await {
794                Ok(get_res) => {
795                    let bytes = get_res.bytes().await?.to_vec();
796                    let text: Cow<str> = String::from_utf8_lossy(&bytes);
797                    serde_json::from_str(text.as_ref())?
798                }
799                Err(e) => {
800                    cfg_if::cfg_if! {
801                        if #[cfg(feature="http")] {
802                            if url.starts_with("http://") || url.starts_with("https://") {
803                                // Fallback to direct reqwest implementation. This is needed in some cases because
804                                // the object-store http implementation has stricter requirements on what the
805                                // server provides. For example the content-length header is required.
806                                let client = reqwest::Client::new();
807                                let response = client
808                                    .get(url)
809                                    .send()
810                                    .await
811                                    .external(format!("Failed to fetch URL: {url}"))?;
812
813                                let text = response
814                                    .text()
815                                    .await
816                                    .external("Failed to read response as text")?;
817                                serde_json::from_str(&text)?
818                            } else {
819                                return Err(VegaFusionError::from(e));
820                            }
821                        } else {
822                            return Err(VegaFusionError::from(e));
823                        }
824                    }
825                }
826            }
827        } else {
828            cfg_if::cfg_if! {
829                if #[cfg(feature="fs")] {
830                    // Assume local file
831                    let mut file = tokio::fs::File::open(url)
832                        .await
833                        .external(format!("Failed to open as local file: {url}"))?;
834
835                    let mut json_str = String::new();
836                    file.read_to_string(&mut json_str)
837                        .await
838                        .external("Failed to read file contents to string")?;
839
840                    serde_json::from_str(&json_str)?
841                } else {
842                    return Err(VegaFusionError::internal(
843                        "The `fs` feature flag must be enabled for file system support"
844                    ));
845                }
846            }
847        };
848
849    let table = VegaFusionTable::from_json(&value)?.with_ordering()?;
850    ctx.vegafusion_table(table).await
851}
852
853async fn read_arrow(url: &str, ctx: Arc<SessionContext>) -> Result<DataFrame> {
854    maybe_register_object_stores_for_url(&ctx, url)?;
855    Ok(ctx.read_arrow(url, ArrowReadOptions::default()).await?)
856}
857
858#[cfg(feature = "parquet")]
859async fn read_parquet(url: &str, ctx: Arc<SessionContext>) -> Result<DataFrame> {
860    maybe_register_object_stores_for_url(&ctx, url)?;
861    Ok(ctx.read_parquet(url, ParquetReadOptions::default()).await?)
862}
863
864fn maybe_register_object_stores_for_url(
865    ctx: &SessionContext,
866    url: &str,
867) -> Result<Option<ObjectStoreUrl>> {
868    // Handle object store registration for non-local sources
869    #[cfg(any(feature = "http", feature = "http-wasm"))]
870    {
871        let maybe_register_http_store = |prefix: &str| -> Result<Option<ObjectStoreUrl>> {
872            if let Some(path) = url.strip_prefix(prefix) {
873                let Some((root, _)) = path.split_once('/') else {
874                    return Err(VegaFusionError::specification(format!(
875                        "Invalid https URL: {url}"
876                    )));
877                };
878                let base_url_str = format!("https://{root}");
879                let base_url = url::Url::parse(&base_url_str)?;
880
881                // Register store for url if not already registered
882                let object_store_url = ObjectStoreUrl::parse(&base_url_str)?;
883                if ctx
884                    .runtime_env()
885                    .object_store(object_store_url.clone())
886                    .is_err()
887                {
888                    cfg_if! {
889                        if #[cfg(feature="http")] {
890                            let client_options = ClientOptions::new().with_allow_http(true);
891                            let http_store = HttpBuilder::new()
892                                .with_url(base_url.clone())
893                                .with_client_options(client_options)
894                                .build()?;
895                            ctx.register_object_store(&base_url, Arc::new(http_store));
896                        } else if #[cfg(target_arch = "wasm32")] {
897                            let http_store = HttpStore::new(base_url.clone());
898                            ctx.register_object_store(&base_url, Arc::new(http_store));
899                        } else {
900                            return Err(VegaFusionError::internal("HTTP support not available"));
901                        }
902                    }
903                }
904                return Ok(Some(object_store_url));
905            }
906            Ok(None)
907        };
908
909        // Register https://
910        if let Some(url) = maybe_register_http_store("https://")? {
911            return Ok(Some(url));
912        }
913
914        // Register http://
915        if let Some(url) = maybe_register_http_store("http://")? {
916            return Ok(Some(url));
917        }
918    }
919
920    // Register s3://
921    #[cfg(feature = "s3")]
922    if let Some(bucket_path) = url.strip_prefix("s3://") {
923        let Some((bucket, _)) = bucket_path.split_once('/') else {
924            return Err(VegaFusionError::specification(format!(
925                "Invalid s3 URL: {url}"
926            )));
927        };
928        // Register store for url if not already registered
929        let base_url_str = format!("s3://{bucket}/");
930        let object_store_url = ObjectStoreUrl::parse(&base_url_str)?;
931        if ctx
932            .runtime_env()
933            .object_store(object_store_url.clone())
934            .is_err()
935        {
936            let base_url = url::Url::parse(&base_url_str)?;
937            let s3 = AmazonS3Builder::from_env().with_url(base_url.clone()).build().with_context(||
938            "Failed to initialize s3 connection from environment variables.\n\
939                See https://docs.rs/object_store/latest/object_store/aws/struct.AmazonS3Builder.html#method.from_env".to_string()
940            )?;
941            ctx.register_object_store(&base_url, Arc::new(s3));
942        }
943        return Ok(Some(object_store_url));
944    }
945
946    Ok(None)
947}