use std::collections::HashMap;
use std::env;
use std::num::NonZeroUsize;
use std::path::Path;
use std::process::ExitCode;
use std::sync::{Arc, LazyLock};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::logical_expr::ExplainFormat;
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::{
ListFilesCacheFunc, MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
};
use datafusion_cli::object_storage::instrumented::{
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
};
use datafusion_cli::{
DATAFUSION_CLI_VERSION, exec,
pool_type::PoolType,
print_format::PrintFormat,
print_options::{MaxRows, PrintOptions},
};
use clap::Parser;
use datafusion::common::config_err;
use datafusion::config::ConfigOptions;
use datafusion::execution::disk_manager::{DiskManagerBuilder, DiskManagerMode};
use mimalloc::MiMalloc;
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
#[derive(Debug, Parser, PartialEq)]
#[clap(author, version, about, long_about= None)]
struct Args {
#[clap(
short = 'p',
long,
help = "Path to your data, default to current directory",
value_parser(parse_valid_data_dir)
)]
data_path: Option<String>,
#[clap(
short = 'b',
long,
help = "The batch size of each query, or use DataFusion default",
value_parser(parse_batch_size)
)]
batch_size: Option<usize>,
#[clap(
short = 'c',
long,
num_args = 0..,
help = "Execute the given command string(s), then exit. Commands are expected to be non empty.",
value_parser(parse_command)
)]
command: Vec<String>,
#[clap(
short = 'm',
long,
help = "The memory pool limitation (e.g. '10g'), default to None (no limit)",
value_parser(extract_memory_pool_size)
)]
memory_limit: Option<usize>,
#[clap(
short,
long,
num_args = 0..,
help = "Execute commands from file(s), then exit",
value_parser(parse_valid_file)
)]
file: Vec<String>,
#[clap(
short = 'r',
long,
num_args = 0..,
help = "Run the provided files on startup instead of ~/.datafusionrc",
value_parser(parse_valid_file),
conflicts_with = "file"
)]
rc: Option<Vec<String>>,
#[clap(long, value_enum, default_value_t = PrintFormat::Automatic)]
format: PrintFormat,
#[clap(
short,
long,
help = "Reduce printing other than the results and work quietly"
)]
quiet: bool,
#[clap(
long,
help = "Specify the memory pool type 'greedy' or 'fair'",
default_value_t = PoolType::Greedy
)]
mem_pool_type: PoolType,
#[clap(
long,
help = "The number of top memory consumers to display when query fails due to memory exhaustion. To disable memory consumer tracking, set this value to 0",
default_value = "3"
)]
top_memory_consumers: usize,
#[clap(
long,
help = "The max number of rows to display for 'Table' format\n[possible values: numbers(0/10/...), inf(no limit)]",
default_value = "40"
)]
maxrows: MaxRows,
#[clap(long, help = "Enables console syntax highlighting")]
color: bool,
#[clap(
short = 'd',
long,
help = "Available disk space for spilling queries (e.g. '10g'), default to None (uses DataFusion's default value of '100g')",
value_parser(extract_disk_limit)
)]
disk_limit: Option<usize>,
#[clap(
long,
help = "Specify the default object_store_profiling mode, defaults to 'disabled'.\n[possible values: disabled, summary, trace]",
default_value_t = InstrumentedObjectStoreMode::Disabled
)]
object_store_profiling: InstrumentedObjectStoreMode,
}
#[tokio::main]
pub async fn main() -> ExitCode {
if let Err(e) = main_inner().await {
println!("Error: {e}");
return ExitCode::FAILURE;
}
ExitCode::SUCCESS
}
async fn main_inner() -> Result<()> {
env_logger::init();
let args = Args::parse();
if !args.quiet {
println!("DataFusion CLI v{DATAFUSION_CLI_VERSION}");
}
if let Some(ref path) = args.data_path {
let p = Path::new(path);
env::set_current_dir(p).unwrap();
};
let session_config = get_session_config(&args)?;
let mut rt_builder = RuntimeEnvBuilder::new();
if let Some(memory_limit) = args.memory_limit {
let pool: Arc<dyn MemoryPool> = match args.mem_pool_type {
PoolType::Fair if args.top_memory_consumers == 0 => {
Arc::new(FairSpillPool::new(memory_limit))
}
PoolType::Fair => Arc::new(TrackConsumersPool::new(
FairSpillPool::new(memory_limit),
NonZeroUsize::new(args.top_memory_consumers).unwrap(),
)),
PoolType::Greedy if args.top_memory_consumers == 0 => {
Arc::new(GreedyMemoryPool::new(memory_limit))
}
PoolType::Greedy => Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(memory_limit),
NonZeroUsize::new(args.top_memory_consumers).unwrap(),
)),
};
rt_builder = rt_builder.with_memory_pool(pool)
}
if let Some(disk_limit) = args.disk_limit {
let builder = DiskManagerBuilder::default()
.with_mode(DiskManagerMode::OsTmpDirectory)
.with_max_temp_directory_size(disk_limit.try_into().unwrap());
rt_builder = rt_builder.with_disk_manager_builder(builder);
}
let instrumented_registry = Arc::new(
InstrumentedObjectStoreRegistry::new()
.with_profile_mode(args.object_store_profiling),
);
rt_builder = rt_builder.with_object_store_registry(instrumented_registry.clone());
let runtime_env = rt_builder.build_arc()?;
let ctx = SessionContext::new_with_config_rt(session_config, runtime_env)
.enable_url_table();
ctx.refresh_catalogs().await?;
ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new(
ctx.state().catalog_list().clone(),
ctx.state_weak_ref(),
)));
ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));
ctx.register_udtf(
"metadata_cache",
Arc::new(MetadataCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);
ctx.register_udtf(
"statistics_cache",
Arc::new(StatisticsCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);
ctx.register_udtf(
"list_files_cache",
Arc::new(ListFilesCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);
let mut print_options = PrintOptions {
format: args.format,
quiet: args.quiet,
maxrows: args.maxrows,
color: args.color,
instrumented_registry: Arc::clone(&instrumented_registry),
};
let commands = args.command;
let files = args.file;
let rc = match args.rc {
Some(file) => file,
None => {
let mut files = Vec::new();
let home = dirs::home_dir();
if let Some(p) = home {
let home_rc = p.join(".datafusionrc");
if home_rc.exists() {
files.push(home_rc.into_os_string().into_string().unwrap());
}
}
files
}
};
if commands.is_empty() && files.is_empty() {
if !rc.is_empty() {
exec::exec_from_files(&ctx, rc, &print_options).await?;
}
return exec::exec_from_repl(&ctx, &mut print_options)
.await
.map_err(|e| DataFusionError::External(Box::new(e)));
}
if !files.is_empty() {
exec::exec_from_files(&ctx, files, &print_options).await?;
}
if !commands.is_empty() {
exec::exec_from_commands(&ctx, commands, &print_options).await?;
}
Ok(())
}
fn get_session_config(args: &Args) -> Result<SessionConfig> {
let mut config_options = ConfigOptions::from_env()?;
if let Some(batch_size) = args.batch_size {
if batch_size == 0 {
return config_err!("batch_size must be greater than 0");
}
config_options.execution.batch_size = batch_size;
};
if env::var_os("DATAFUSION_EXPLAIN_FORMAT").is_none() {
config_options.explain.format = ExplainFormat::Tree;
}
if env::var_os("DATAFUSION_FORMAT_NULL").is_none() {
config_options.format.null = String::from("NULL");
}
let session_config =
SessionConfig::from(config_options).with_information_schema(true);
Ok(session_config)
}
fn parse_valid_file(dir: &str) -> Result<String, String> {
if Path::new(dir).is_file() {
Ok(dir.to_string())
} else {
Err(format!("Invalid file '{dir}'"))
}
}
fn parse_valid_data_dir(dir: &str) -> Result<String, String> {
if Path::new(dir).is_dir() {
Ok(dir.to_string())
} else {
Err(format!("Invalid data directory '{dir}'"))
}
}
fn parse_batch_size(size: &str) -> Result<usize, String> {
match size.parse::<usize>() {
Ok(size) if size > 0 => Ok(size),
_ => Err(format!("Invalid batch size '{size}'")),
}
}
fn parse_command(command: &str) -> Result<String, String> {
if !command.is_empty() {
Ok(command.to_string())
} else {
Err("-c flag expects only non empty commands".to_string())
}
}
#[derive(Debug, Clone, Copy)]
enum ByteUnit {
Byte,
KiB,
MiB,
GiB,
TiB,
}
impl ByteUnit {
fn multiplier(&self) -> u64 {
match self {
ByteUnit::Byte => 1,
ByteUnit::KiB => 1 << 10,
ByteUnit::MiB => 1 << 20,
ByteUnit::GiB => 1 << 30,
ByteUnit::TiB => 1 << 40,
}
}
}
fn parse_size_string(size: &str, label: &str) -> Result<usize, String> {
static BYTE_SUFFIXES: LazyLock<HashMap<&'static str, ByteUnit>> =
LazyLock::new(|| {
let mut m = HashMap::new();
m.insert("b", ByteUnit::Byte);
m.insert("k", ByteUnit::KiB);
m.insert("kb", ByteUnit::KiB);
m.insert("m", ByteUnit::MiB);
m.insert("mb", ByteUnit::MiB);
m.insert("g", ByteUnit::GiB);
m.insert("gb", ByteUnit::GiB);
m.insert("t", ByteUnit::TiB);
m.insert("tb", ByteUnit::TiB);
m
});
static SUFFIX_REGEX: LazyLock<regex::Regex> =
LazyLock::new(|| regex::Regex::new(r"^(-?[0-9]+)([a-z]+)?$").unwrap());
let lower = size.to_lowercase();
if let Some(caps) = SUFFIX_REGEX.captures(&lower) {
let num_str = caps.get(1).unwrap().as_str();
let num = num_str
.parse::<usize>()
.map_err(|_| format!("Invalid numeric value in {label} '{size}'"))?;
let suffix = caps.get(2).map(|m| m.as_str()).unwrap_or("b");
let unit = BYTE_SUFFIXES
.get(suffix)
.ok_or_else(|| format!("Invalid {label} '{size}'"))?;
let total_bytes = usize::try_from(unit.multiplier())
.ok()
.and_then(|multiplier| num.checked_mul(multiplier))
.ok_or_else(|| format!("{label} '{size}' is too large"))?;
Ok(total_bytes)
} else {
Err(format!("Invalid {label} '{size}'"))
}
}
pub fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
parse_size_string(size, "memory pool size")
}
pub fn extract_disk_limit(size: &str) -> Result<usize, String> {
parse_size_string(size, "disk limit")
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use datafusion::{
common::test_util::batches_to_string,
execution::cache::{
DefaultListFilesCache, cache_manager::CacheManagerConfig,
cache_unit::DefaultFileStatisticsCache,
},
prelude::{ParquetReadOptions, col, lit, split_part},
};
use insta::assert_snapshot;
use object_store::memory::InMemory;
use url::Url;
fn assert_conversion(input: &str, expected: Result<usize, String>) {
let result = extract_memory_pool_size(input);
match expected {
Ok(v) => assert_eq!(result.unwrap(), v),
Err(e) => assert_eq!(result.unwrap_err(), e),
}
}
#[test]
fn memory_pool_size() -> Result<(), String> {
assert_conversion("5", Ok(5));
assert_conversion("100", Ok(100));
assert_conversion("5b", Ok(5));
assert_conversion("4k", Ok(4 * 1024));
assert_conversion("4kb", Ok(4 * 1024));
assert_conversion("20m", Ok(20 * 1024 * 1024));
assert_conversion("20mb", Ok(20 * 1024 * 1024));
assert_conversion("2g", Ok(2 * 1024 * 1024 * 1024));
assert_conversion("2gb", Ok(2 * 1024 * 1024 * 1024));
assert_conversion("3t", Ok(3 * 1024 * 1024 * 1024 * 1024));
assert_conversion("4tb", Ok(4 * 1024 * 1024 * 1024 * 1024));
assert_conversion("4K", Ok(4 * 1024));
assert_conversion("4KB", Ok(4 * 1024));
assert_conversion("20M", Ok(20 * 1024 * 1024));
assert_conversion("20MB", Ok(20 * 1024 * 1024));
assert_conversion("2G", Ok(2 * 1024 * 1024 * 1024));
assert_conversion("2GB", Ok(2 * 1024 * 1024 * 1024));
assert_conversion("2T", Ok(2 * 1024 * 1024 * 1024 * 1024));
assert_conversion(
"invalid",
Err("Invalid memory pool size 'invalid'".to_string()),
);
assert_conversion("4kbx", Err("Invalid memory pool size '4kbx'".to_string()));
assert_conversion(
"-20mb",
Err("Invalid numeric value in memory pool size '-20mb'".to_string()),
);
assert_conversion(
"-100",
Err("Invalid numeric value in memory pool size '-100'".to_string()),
);
assert_conversion(
"12k12k",
Err("Invalid memory pool size '12k12k'".to_string()),
);
Ok(())
}
#[tokio::test]
async fn test_parquet_metadata_works() -> Result<(), DataFusionError> {
let ctx = SessionContext::new();
ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));
let sql = "SELECT * FROM parquet_metadata('../datafusion/core/tests/data/fixed_size_list_array.parquet')";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs), @r#"
+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+
| filename | row_group_id | row_group_num_rows | row_group_num_columns | row_group_bytes | column_id | file_offset | num_values | path_in_schema | type | stats_min | stats_max | stats_null_count | stats_distinct_count | stats_min_value | stats_max_value | compression | encodings | index_page_offset | dictionary_page_offset | data_page_offset | total_compressed_size | total_uncompressed_size |
+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+
| ../datafusion/core/tests/data/fixed_size_list_array.parquet | 0 | 2 | 1 | 123 | 0 | 125 | 4 | "f0.list.item" | INT64 | 1 | 4 | 0 | | 1 | 4 | SNAPPY | [PLAIN, RLE, RLE_DICTIONARY] | | 4 | 46 | 121 | 123 |
+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+
"#);
let sql = "SELECT * FROM parquet_metadata(\"../datafusion/core/tests/data/fixed_size_list_array.parquet\")";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs), @r#"
+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+
| filename | row_group_id | row_group_num_rows | row_group_num_columns | row_group_bytes | column_id | file_offset | num_values | path_in_schema | type | stats_min | stats_max | stats_null_count | stats_distinct_count | stats_min_value | stats_max_value | compression | encodings | index_page_offset | dictionary_page_offset | data_page_offset | total_compressed_size | total_uncompressed_size |
+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+
| ../datafusion/core/tests/data/fixed_size_list_array.parquet | 0 | 2 | 1 | 123 | 0 | 125 | 4 | "f0.list.item" | INT64 | 1 | 4 | 0 | | 1 | 4 | SNAPPY | [PLAIN, RLE, RLE_DICTIONARY] | | 4 | 46 | 121 | 123 |
+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+
"#);
Ok(())
}
#[tokio::test]
async fn test_parquet_metadata_works_with_strings() -> Result<(), DataFusionError> {
let ctx = SessionContext::new();
ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));
let sql = "SELECT * FROM parquet_metadata('../parquet-testing/data/data_index_bloom_encoding_stats.parquet')";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r#"
+-----------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+------------+-----------+-----------+------------------+----------------------+-----------------+-----------------+--------------------+--------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+
| filename | row_group_id | row_group_num_rows | row_group_num_columns | row_group_bytes | column_id | file_offset | num_values | path_in_schema | type | stats_min | stats_max | stats_null_count | stats_distinct_count | stats_min_value | stats_max_value | compression | encodings | index_page_offset | dictionary_page_offset | data_page_offset | total_compressed_size | total_uncompressed_size |
+-----------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+------------+-----------+-----------+------------------+----------------------+-----------------+-----------------+--------------------+--------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+
| ../parquet-testing/data/data_index_bloom_encoding_stats.parquet | 0 | 14 | 1 | 163 | 0 | 4 | 14 | "String" | BYTE_ARRAY | Hello | today | 0 | | Hello | today | GZIP(GzipLevel(6)) | [PLAIN, RLE, BIT_PACKED] | | | 4 | 152 | 163 |
+-----------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+------------+-----------+-----------+------------------+----------------------+-----------------+-----------------+--------------------+--------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+
"#);
Ok(())
}
#[tokio::test]
async fn test_metadata_cache() -> Result<(), DataFusionError> {
let ctx = SessionContext::new();
ctx.register_udtf(
"metadata_cache",
Arc::new(MetadataCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);
ctx.register_parquet(
"alltypes_plain",
"../parquet-testing/data/alltypes_plain.parquet",
ParquetReadOptions::new(),
)
.await?;
ctx.register_parquet(
"alltypes_tiny_pages",
"../parquet-testing/data/alltypes_tiny_pages.parquet",
ParquetReadOptions::new(),
)
.await?;
ctx.register_parquet(
"lz4_raw_compressed_larger",
"../parquet-testing/data/lz4_raw_compressed_larger.parquet",
ParquetReadOptions::new(),
)
.await?;
ctx.sql("select * from alltypes_plain")
.await?
.collect()
.await?;
ctx.sql("select * from alltypes_tiny_pages")
.await?
.collect()
.await?;
ctx.sql("select * from lz4_raw_compressed_larger")
.await?
.collect()
.await?;
let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, metadata_size_bytes, hits, extra from metadata_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
+-----------------------------------+-----------------+---------------------+------+------------------+
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
+-----------------------------------+-----------------+---------------------+------+------------------+
| alltypes_plain.parquet | 1851 | 8882 | 2 | page_index=false |
| alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true |
| lz4_raw_compressed_larger.parquet | 380836 | 1347 | 2 | page_index=false |
+-----------------------------------+-----------------+---------------------+------+------------------+
");
ctx.sql("select * from alltypes_plain")
.await?
.collect()
.await?;
ctx.sql("select * from alltypes_plain")
.await?
.collect()
.await?;
ctx.sql("select * from alltypes_plain")
.await?
.collect()
.await?;
ctx.sql("select * from lz4_raw_compressed_larger")
.await?
.collect()
.await?;
let sql = "select split_part(path, '/', -1) as filename, file_size_bytes, metadata_size_bytes, hits, extra from metadata_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
+-----------------------------------+-----------------+---------------------+------+------------------+
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
+-----------------------------------+-----------------+---------------------+------+------------------+
| alltypes_plain.parquet | 1851 | 8882 | 5 | page_index=false |
| alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true |
| lz4_raw_compressed_larger.parquet | 380836 | 1347 | 3 | page_index=false |
+-----------------------------------+-----------------+---------------------+------+------------------+
");
Ok(())
}
#[tokio::test]
async fn test_statistics_cache_default() -> Result<(), DataFusionError> {
let ctx = SessionContext::new();
ctx.register_udtf(
"statistics_cache",
Arc::new(StatisticsCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);
for filename in [
"alltypes_plain",
"alltypes_tiny_pages",
"lz4_raw_compressed_larger",
] {
ctx.sql(
format!(
"create external table {filename}
stored as parquet
location '../parquet-testing/data/{filename}.parquet'",
)
.as_str(),
)
.await?
.collect()
.await?;
}
let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
++
++
");
Ok(())
}
#[tokio::test]
async fn test_statistics_cache_override() -> Result<(), DataFusionError> {
let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default());
let cache_config = CacheManagerConfig::default()
.with_files_statistics_cache(Some(file_statistics_cache.clone()));
let runtime = RuntimeEnvBuilder::new()
.with_cache_manager(cache_config)
.build()?;
let config = SessionConfig::new().with_collect_statistics(true);
let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime));
ctx.register_udtf(
"statistics_cache",
Arc::new(StatisticsCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);
for filename in [
"alltypes_plain",
"alltypes_tiny_pages",
"lz4_raw_compressed_larger",
] {
ctx.sql(
format!(
"create external table {filename}
stored as parquet
location '../parquet-testing/data/{filename}.parquet'",
)
.as_str(),
)
.await?
.collect()
.await?;
}
let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
+-----------------------------------+-----------------+--------------+-------------+------------------+
| filename | file_size_bytes | num_rows | num_columns | table_size_bytes |
+-----------------------------------+-----------------+--------------+-------------+------------------+
| alltypes_plain.parquet | 1851 | Exact(8) | 11 | Absent |
| alltypes_tiny_pages.parquet | 454233 | Exact(7300) | 13 | Absent |
| lz4_raw_compressed_larger.parquet | 380836 | Exact(10000) | 1 | Absent |
+-----------------------------------+-----------------+--------------+-------------+------------------+
");
Ok(())
}
#[tokio::test]
async fn test_list_files_cache() -> Result<(), DataFusionError> {
let list_files_cache = Arc::new(DefaultListFilesCache::new(
1024,
Some(Duration::from_secs(1)),
));
let rt = RuntimeEnvBuilder::new()
.with_cache_manager(
CacheManagerConfig::default()
.with_list_files_cache(Some(list_files_cache)),
)
.build_arc()
.unwrap();
let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt);
ctx.register_object_store(
&Url::parse("mem://test_table").unwrap(),
Arc::new(InMemory::new()),
);
ctx.register_udtf(
"list_files_cache",
Arc::new(ListFilesCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);
ctx.sql(
"CREATE EXTERNAL TABLE src_table
STORED AS PARQUET
LOCATION '../parquet-testing/data/alltypes_plain.parquet'",
)
.await?
.collect()
.await?;
ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/0.parquet' STORED AS PARQUET").await?.collect().await?;
ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/1.parquet' STORED AS PARQUET").await?.collect().await?;
ctx.sql(
"CREATE EXTERNAL TABLE test_table
STORED AS PARQUET
LOCATION 'mem://test_table/'
",
)
.await?
.collect()
.await?;
let sql = "SELECT metadata_size_bytes, expires_in, metadata_list FROM list_files_cache()";
let df = ctx
.sql(sql)
.await?
.unnest_columns(&["metadata_list"])?
.with_column_renamed("metadata_list", "metadata")?
.unnest_columns(&["metadata"])?;
assert_eq!(
2,
df.clone()
.filter(col("expires_in").is_not_null())?
.count()
.await?
);
let df = df
.with_column_renamed(r#""metadata.file_size_bytes""#, "file_size_bytes")?
.with_column_renamed(r#""metadata.e_tag""#, "etag")?
.with_column(
"filename",
split_part(col(r#""metadata.file_path""#), lit("/"), lit(-1)),
)?
.select_columns(&[
"metadata_size_bytes",
"filename",
"file_size_bytes",
"etag",
])?
.sort(vec![col("filename").sort(true, false)])?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
+---------------------+-----------+-----------------+------+
| metadata_size_bytes | filename | file_size_bytes | etag |
+---------------------+-----------+-----------------+------+
| 212 | 0.parquet | 3645 | 0 |
| 212 | 1.parquet | 3645 | 1 |
+---------------------+-----------+-----------------+------+
");
Ok(())
}
}