use anyhow::Result;
use datafusion::catalog::TableProvider;
use datafusion::datasource::file_format::json::JsonFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::execution::context::SessionContext;
use futures::StreamExt;
use object_store::ObjectStore;
use std::sync::Arc;
async fn verify_files_exist(
object_store: &Arc<dyn ObjectStore>,
prefix: &object_store::path::Path,
url: &str,
) -> Result<()> {
let head_result = object_store.head(prefix).await;
if head_result.is_err() {
let mut list_stream = object_store.list(Some(prefix));
let first_file = list_stream.next().await;
if first_file.is_none() {
anyhow::bail!("No files found at URL: {}", url);
}
}
Ok(())
}
pub async fn json_table_provider(
ctx: &SessionContext,
url: &str,
) -> Result<Arc<dyn TableProvider>> {
let file_format = Arc::new(JsonFormat::default());
let listing_options = ListingOptions::new(file_format);
let table_url = ListingTableUrl::parse(url)?;
let object_store = ctx.state().runtime_env().object_store(&table_url)?;
verify_files_exist(&object_store, table_url.prefix(), url).await?;
let mut config = ListingTableConfig::new(table_url).with_listing_options(listing_options);
config = config.infer_schema(&ctx.state()).await?;
let listing_table = ListingTable::try_new(config)?;
Ok(Arc::new(listing_table))
}