use std::{
collections::HashMap,
fmt::{self, Formatter},
sync::{Arc, Mutex, OnceLock},
time::Duration,
};
use chrono::{DateTime, Utc};
use arrow_array::RecordBatch;
use arrow_schema::Schema as ArrowSchema;
use datafusion::physical_plan::metrics::MetricType;
use datafusion::{
catalog::streaming::StreamingTable,
dataframe::DataFrame,
execution::{
TaskContext,
context::{SessionConfig, SessionContext},
disk_manager::DiskManagerBuilder,
memory_pool::FairSpillPool,
runtime_env::RuntimeEnvBuilder,
},
physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
analyze::AnalyzeExec,
display::DisplayableExecutionPlan,
execution_plan::{Boundedness, CardinalityEffect, EmissionType},
metrics::MetricValue,
stream::RecordBatchStreamAdapter,
streaming::PartitionStream,
},
};
use datafusion_common::{DataFusionError, Statistics};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
use futures::{StreamExt, stream};
use lance_arrow::SchemaExt;
use lance_core::{
Error, Result,
utils::{
futures::FinallyStreamExt,
tracing::{EXECUTION_PLAN_RUN, StreamTracingExt, TRACE_EXECUTION},
},
};
use log::{debug, info, warn};
use tracing::Span;
use crate::udf::register_functions;
use crate::{
chunker::StrictBatchSizeStream,
utils::{
BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC, IOPS_METRIC,
MetricsExt, PARTS_LOADED_METRIC, REQUESTS_METRIC,
},
};
pub struct OneShotExec {
stream: Mutex<Option<SendableRecordBatchStream>>,
schema: Arc<ArrowSchema>,
properties: PlanProperties,
}
impl OneShotExec {
pub fn new(stream: SendableRecordBatchStream) -> Self {
let schema = stream.schema();
Self {
stream: Mutex::new(Some(stream)),
schema: schema.clone(),
properties: PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::RoundRobinBatch(1),
EmissionType::Incremental,
Boundedness::Bounded,
),
}
}
pub fn from_batch(batch: RecordBatch) -> Self {
let schema = batch.schema();
let stream = Box::pin(RecordBatchStreamAdapter::new(
schema,
stream::iter(vec![Ok(batch)]),
));
Self::new(stream)
}
}
impl std::fmt::Debug for OneShotExec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let stream = self.stream.lock().unwrap();
f.debug_struct("OneShotExec")
.field("exhausted", &stream.is_none())
.field("schema", self.schema.as_ref())
.finish()
}
}
impl DisplayAs for OneShotExec {
fn fmt_as(
&self,
t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
let stream = self.stream.lock().unwrap();
let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
let columns = self
.schema
.field_names()
.iter()
.cloned()
.cloned()
.collect::<Vec<_>>();
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"OneShotStream: {}columns=[{}]",
exhausted,
columns.join(",")
)
}
DisplayFormatType::TreeRender => {
write!(
f,
"OneShotStream\nexhausted={}\ncolumns=[{}]",
exhausted,
columns.join(",")
)
}
}
}
}
impl ExecutionPlan for OneShotExec {
fn name(&self) -> &str {
"OneShotExec"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> arrow_schema::SchemaRef {
self.schema.clone()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
if !children.is_empty() {
return Err(datafusion_common::DataFusionError::Internal(
"OneShotExec does not support children".to_string(),
));
}
Ok(self)
}
fn execute(
&self,
_partition: usize,
_context: Arc<datafusion::execution::TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream> {
let stream = self
.stream
.lock()
.map_err(|err| DataFusionError::Execution(err.to_string()))?
.take();
if let Some(stream) = stream {
Ok(stream)
} else {
Err(DataFusionError::Execution(
"OneShotExec has already been executed".to_string(),
))
}
}
fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
Ok(Statistics::new_unknown(&self.schema))
}
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
&self.properties
}
}
struct TracedExec {
input: Arc<dyn ExecutionPlan>,
properties: PlanProperties,
span: Span,
}
impl TracedExec {
pub fn new(input: Arc<dyn ExecutionPlan>, span: Span) -> Self {
Self {
properties: input.properties().clone(),
input,
span,
}
}
}
impl DisplayAs for TracedExec {
fn fmt_as(
&self,
t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default
| DisplayFormatType::Verbose
| DisplayFormatType::TreeRender => {
write!(f, "TracedExec")
}
}
}
}
impl std::fmt::Debug for TracedExec {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "TracedExec")
}
}
impl ExecutionPlan for TracedExec {
fn name(&self) -> &str {
"TracedExec"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self {
input: children[0].clone(),
properties: self.properties.clone(),
span: self.span.clone(),
}))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream> {
let _guard = self.span.enter();
let stream = self.input.execute(partition, context)?;
let schema = stream.schema();
let stream = stream.stream_in_span(self.span.clone());
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
}
pub type ExecutionStatsCallback = Arc<dyn Fn(&ExecutionSummaryCounts) + Send + Sync>;
#[derive(Default, Clone)]
pub struct LanceExecutionOptions {
pub use_spilling: bool,
pub mem_pool_size: Option<u64>,
pub max_temp_directory_size: Option<u64>,
pub batch_size: Option<usize>,
pub target_partition: Option<usize>,
pub execution_stats_callback: Option<ExecutionStatsCallback>,
pub skip_logging: bool,
}
impl std::fmt::Debug for LanceExecutionOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LanceExecutionOptions")
.field("use_spilling", &self.use_spilling)
.field("mem_pool_size", &self.mem_pool_size)
.field("max_temp_directory_size", &self.max_temp_directory_size)
.field("batch_size", &self.batch_size)
.field("target_partition", &self.target_partition)
.field("skip_logging", &self.skip_logging)
.field(
"execution_stats_callback",
&self.execution_stats_callback.is_some(),
)
.finish()
}
}
const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
const DEFAULT_LANCE_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024;
impl LanceExecutionOptions {
pub fn mem_pool_size(&self) -> u64 {
self.mem_pool_size.unwrap_or_else(|| {
std::env::var("LANCE_MEM_POOL_SIZE")
.map(|s| match s.parse::<u64>() {
Ok(v) => v,
Err(e) => {
warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
DEFAULT_LANCE_MEM_POOL_SIZE
}
})
.unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
})
}
pub fn max_temp_directory_size(&self) -> u64 {
self.max_temp_directory_size.unwrap_or_else(|| {
std::env::var("LANCE_MAX_TEMP_DIRECTORY_SIZE")
.map(|s| match s.parse::<u64>() {
Ok(v) => v,
Err(e) => {
warn!(
"Failed to parse LANCE_MAX_TEMP_DIRECTORY_SIZE: {}, using default",
e
);
DEFAULT_LANCE_MAX_TEMP_DIRECTORY_SIZE
}
})
.unwrap_or(DEFAULT_LANCE_MAX_TEMP_DIRECTORY_SIZE)
})
}
pub fn use_spilling(&self) -> bool {
if !self.use_spilling {
return false;
}
std::env::var("LANCE_BYPASS_SPILLING")
.map(|_| {
info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
false
})
.unwrap_or(true)
}
}
pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
let mut session_config = SessionConfig::new();
let mut runtime_env_builder = RuntimeEnvBuilder::new();
if let Some(target_partition) = options.target_partition {
session_config = session_config.with_target_partitions(target_partition);
}
if options.use_spilling() {
let disk_manager_builder = DiskManagerBuilder::default()
.with_max_temp_directory_size(options.max_temp_directory_size());
runtime_env_builder = runtime_env_builder
.with_disk_manager_builder(disk_manager_builder)
.with_memory_pool(Arc::new(FairSpillPool::new(
options.mem_pool_size() as usize
)));
}
let runtime_env = runtime_env_builder.build_arc().unwrap();
let ctx = SessionContext::new_with_config_rt(session_config, runtime_env);
register_functions(&ctx);
ctx
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct SessionContextCacheKey {
mem_pool_size: u64,
max_temp_directory_size: u64,
target_partition: Option<usize>,
use_spilling: bool,
}
impl SessionContextCacheKey {
fn from_options(options: &LanceExecutionOptions) -> Self {
Self {
mem_pool_size: options.mem_pool_size(),
max_temp_directory_size: options.max_temp_directory_size(),
target_partition: options.target_partition,
use_spilling: options.use_spilling(),
}
}
}
struct CachedSessionContext {
context: SessionContext,
last_access: std::time::Instant,
}
fn get_session_cache() -> &'static Mutex<HashMap<SessionContextCacheKey, CachedSessionContext>> {
static SESSION_CACHE: OnceLock<Mutex<HashMap<SessionContextCacheKey, CachedSessionContext>>> =
OnceLock::new();
SESSION_CACHE.get_or_init(|| Mutex::new(HashMap::new()))
}
fn get_max_cache_size() -> usize {
const DEFAULT_CACHE_SIZE: usize = 4;
static MAX_CACHE_SIZE: OnceLock<usize> = OnceLock::new();
*MAX_CACHE_SIZE.get_or_init(|| {
std::env::var("LANCE_SESSION_CACHE_SIZE")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_CACHE_SIZE)
})
}
pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
let key = SessionContextCacheKey::from_options(options);
let mut cache = get_session_cache()
.lock()
.unwrap_or_else(|e| e.into_inner());
if let Some(entry) = cache.get_mut(&key) {
entry.last_access = std::time::Instant::now();
return entry.context.clone();
}
if cache.len() >= get_max_cache_size()
&& let Some(lru_key) = cache
.iter()
.min_by_key(|(_, v)| v.last_access)
.map(|(k, _)| k.clone())
{
cache.remove(&lru_key);
}
let context = new_session_context(options);
cache.insert(
key,
CachedSessionContext {
context: context.clone(),
last_access: std::time::Instant::now(),
},
);
context
}
fn get_task_context(
session_ctx: &SessionContext,
options: &LanceExecutionOptions,
) -> Arc<TaskContext> {
let mut state = session_ctx.state();
if let Some(batch_size) = options.batch_size.as_ref() {
state.config_mut().options_mut().execution.batch_size = *batch_size;
}
state.task_ctx()
}
#[derive(Default, Clone, Debug, PartialEq, Eq)]
pub struct ExecutionSummaryCounts {
pub iops: usize,
pub requests: usize,
pub bytes_read: usize,
pub indices_loaded: usize,
pub parts_loaded: usize,
pub index_comparisons: usize,
pub all_counts: HashMap<String, usize>,
}
pub fn collect_execution_metrics(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
if let Some(metrics) = node.metrics() {
for (metric_name, count) in metrics.iter_counts() {
match metric_name.as_ref() {
IOPS_METRIC => counts.iops += count.value(),
REQUESTS_METRIC => counts.requests += count.value(),
BYTES_READ_METRIC => counts.bytes_read += count.value(),
INDICES_LOADED_METRIC => counts.indices_loaded += count.value(),
PARTS_LOADED_METRIC => counts.parts_loaded += count.value(),
INDEX_COMPARISONS_METRIC => counts.index_comparisons += count.value(),
_ => {
let existing = counts
.all_counts
.entry(metric_name.as_ref().to_string())
.or_insert(0);
*existing += count.value();
}
}
}
for (metric_name, gauge) in metrics.iter_gauges() {
match metric_name.as_ref() {
IOPS_METRIC => counts.iops += gauge.value(),
REQUESTS_METRIC => counts.requests += gauge.value(),
BYTES_READ_METRIC => counts.bytes_read += gauge.value(),
_ => {}
}
}
}
for child in node.children() {
collect_execution_metrics(child.as_ref(), counts);
}
}
fn report_plan_summary_metrics(plan: &dyn ExecutionPlan, options: &LanceExecutionOptions) {
let output_rows = plan
.metrics()
.map(|m| m.output_rows().unwrap_or(0))
.unwrap_or(0);
let mut counts = ExecutionSummaryCounts::default();
collect_execution_metrics(plan, &mut counts);
tracing::info!(
target: TRACE_EXECUTION,
r#type = EXECUTION_PLAN_RUN,
plan_summary = display_plan_one_liner(plan),
output_rows,
iops = counts.iops,
requests = counts.requests,
bytes_read = counts.bytes_read,
indices_loaded = counts.indices_loaded,
parts_loaded = counts.parts_loaded,
index_comparisons = counts.index_comparisons,
);
if let Some(callback) = options.execution_stats_callback.as_ref() {
callback(&counts);
}
}
fn display_plan_one_liner(plan: &dyn ExecutionPlan) -> String {
let mut output = String::new();
display_plan_one_liner_impl(plan, &mut output);
output
}
fn display_plan_one_liner_impl(plan: &dyn ExecutionPlan, output: &mut String) {
let name = plan.name().trim_end_matches("Exec");
output.push_str(name);
let children = plan.children();
if !children.is_empty() {
output.push('(');
for (i, child) in children.iter().enumerate() {
if i > 0 {
output.push(',');
}
display_plan_one_liner_impl(child.as_ref(), output);
}
output.push(')');
}
}
pub fn execute_plan(
plan: Arc<dyn ExecutionPlan>,
options: LanceExecutionOptions,
) -> Result<SendableRecordBatchStream> {
if !options.skip_logging {
debug!(
"Executing plan:\n{}",
DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
);
}
let session_ctx = get_session_context(&options);
assert_eq!(plan.properties().partitioning.partition_count(), 1);
let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
let schema = stream.schema();
let stream = stream.finally(move || {
if !options.skip_logging {
report_plan_summary_metrics(plan.as_ref(), &options);
}
});
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
pub async fn analyze_plan(
plan: Arc<dyn ExecutionPlan>,
options: LanceExecutionOptions,
) -> Result<String> {
let plan = Arc::new(TracedExec::new(plan, Span::current()));
let schema = plan.schema();
let analyze = Arc::new(AnalyzeExec::new(
true,
true,
vec![MetricType::SUMMARY],
plan,
schema,
));
let session_ctx = get_session_context(&options);
assert_eq!(analyze.properties().partitioning.partition_count(), 1);
let mut stream = analyze
.execute(0, get_task_context(&session_ctx, &options))
.map_err(|err| Error::io(format!("Failed to execute analyze plan: {}", err)))?;
while (stream.next().await).is_some() {}
let result = format_plan(analyze);
Ok(result)
}
pub fn format_plan(plan: Arc<dyn ExecutionPlan>) -> String {
struct CalculateVisitor {
highest_index: usize,
index_to_elapsed: HashMap<usize, Duration>,
}
struct SubtreeMetrics {
min_start: Option<DateTime<Utc>>,
max_end: Option<DateTime<Utc>>,
}
impl CalculateVisitor {
fn calculate_metrics(&mut self, plan: &Arc<dyn ExecutionPlan>) -> SubtreeMetrics {
self.highest_index += 1;
let plan_index = self.highest_index;
let (mut min_start, mut max_end) = Self::node_timerange(plan);
for child in plan.children() {
let child_metrics = self.calculate_metrics(child);
min_start = Self::min_option(min_start, child_metrics.min_start);
max_end = Self::max_option(max_end, child_metrics.max_end);
}
let elapsed = match (min_start, max_end) {
(Some(start), Some(end)) => Some((end - start).to_std().unwrap_or_default()),
_ => None,
};
if let Some(e) = elapsed {
self.index_to_elapsed.insert(plan_index, e);
}
SubtreeMetrics { min_start, max_end }
}
fn node_timerange(
plan: &Arc<dyn ExecutionPlan>,
) -> (Option<DateTime<Utc>>, Option<DateTime<Utc>>) {
let Some(metrics) = plan.metrics() else {
return (None, None);
};
let min_start = metrics
.iter()
.filter_map(|m| match m.value() {
MetricValue::StartTimestamp(ts) => ts.value(),
_ => None,
})
.min();
let max_end = metrics
.iter()
.filter_map(|m| match m.value() {
MetricValue::EndTimestamp(ts) => ts.value(),
_ => None,
})
.max();
(min_start, max_end)
}
fn min_option(a: Option<DateTime<Utc>>, b: Option<DateTime<Utc>>) -> Option<DateTime<Utc>> {
[a, b].into_iter().flatten().min()
}
fn max_option(a: Option<DateTime<Utc>>, b: Option<DateTime<Utc>>) -> Option<DateTime<Utc>> {
[a, b].into_iter().flatten().max()
}
}
struct PrintVisitor {
highest_index: usize,
indent: usize,
}
impl PrintVisitor {
fn write_output(
&mut self,
plan: &Arc<dyn ExecutionPlan>,
f: &mut Formatter,
calcs: &CalculateVisitor,
) -> std::fmt::Result {
self.highest_index += 1;
write!(f, "{:indent$}", "", indent = self.indent * 2)?;
let displayable =
datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref());
let plan_str = displayable.one_line().to_string();
let plan_str = plan_str.trim();
match calcs.index_to_elapsed.get(&self.highest_index) {
Some(elapsed) => match plan_str.find(": ") {
Some(i) => write!(
f,
"{}: elapsed={elapsed:?}, {}",
&plan_str[..i],
&plan_str[i + 2..]
)?,
None => write!(f, "{plan_str}, elapsed={elapsed:?}")?,
},
None => write!(f, "{plan_str}")?,
}
if let Some(metrics) = plan.metrics() {
let metrics = metrics
.aggregate_by_name()
.sorted_for_display()
.timestamps_removed();
write!(f, ", metrics=[{metrics}]")?;
} else {
write!(f, ", metrics=[]")?;
}
writeln!(f)?;
self.indent += 1;
for child in plan.children() {
self.write_output(child, f, calcs)?;
}
self.indent -= 1;
std::fmt::Result::Ok(())
}
}
struct PrintWrapper {
plan: Arc<dyn ExecutionPlan>,
}
impl fmt::Display for PrintWrapper {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
let mut calcs = CalculateVisitor {
highest_index: 0,
index_to_elapsed: HashMap::new(),
};
calcs.calculate_metrics(&self.plan);
let mut prints = PrintVisitor {
highest_index: 0,
indent: 0,
};
prints.write_output(&self.plan, f, &calcs)
}
}
let wrapper = PrintWrapper { plan };
format!("{}", wrapper)
}
pub trait SessionContextExt {
fn read_one_shot(
&self,
data: SendableRecordBatchStream,
) -> datafusion::common::Result<DataFrame>;
}
pub struct OneShotPartitionStream {
data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
schema: Arc<ArrowSchema>,
}
impl std::fmt::Debug for OneShotPartitionStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let data = self.data.lock().unwrap();
f.debug_struct("OneShotPartitionStream")
.field("exhausted", &data.is_none())
.field("schema", self.schema.as_ref())
.finish()
}
}
impl OneShotPartitionStream {
pub fn new(data: SendableRecordBatchStream) -> Self {
let schema = data.schema();
Self {
data: Arc::new(Mutex::new(Some(data))),
schema,
}
}
}
impl PartitionStream for OneShotPartitionStream {
fn schema(&self) -> &arrow_schema::SchemaRef {
&self.schema
}
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let mut stream = self.data.lock().unwrap();
stream
.take()
.expect("Attempt to consume a one shot dataframe multiple times")
}
}
impl SessionContextExt for SessionContext {
fn read_one_shot(
&self,
data: SendableRecordBatchStream,
) -> datafusion::common::Result<DataFrame> {
let schema = data.schema();
let part_stream = Arc::new(OneShotPartitionStream::new(data));
let provider = StreamingTable::try_new(schema, vec![part_stream])?;
self.read_table(Arc::new(provider))
}
}
#[derive(Clone, Debug)]
pub struct StrictBatchSizeExec {
input: Arc<dyn ExecutionPlan>,
batch_size: usize,
}
impl StrictBatchSizeExec {
pub fn new(input: Arc<dyn ExecutionPlan>, batch_size: usize) -> Self {
Self { input, batch_size }
}
}
impl DisplayAs for StrictBatchSizeExec {
fn fmt_as(
&self,
_t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "StrictBatchSizeExec")
}
}
impl ExecutionPlan for StrictBatchSizeExec {
fn name(&self) -> &str {
"StrictBatchSizeExec"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn properties(&self) -> &PlanProperties {
self.input.properties()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self {
input: children[0].clone(),
batch_size: self.batch_size,
}))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream> {
let stream = self.input.execute(partition, context)?;
let schema = stream.schema();
let stream = StrictBatchSizeStream::new(stream, self.batch_size);
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
}
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![false]
}
fn partition_statistics(
&self,
partition: Option<usize>,
) -> datafusion_common::Result<Statistics> {
self.input.partition_statistics(partition)
}
fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::Equal
}
fn supports_limit_pushdown(&self) -> bool {
true
}
}
#[cfg(test)]
mod tests {
use super::*;
static CACHE_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
#[test]
fn test_session_context_cache() {
let _lock = CACHE_TEST_LOCK.lock().unwrap();
let cache = get_session_cache();
cache.lock().unwrap().clear();
let opts1 = LanceExecutionOptions::default();
let _ctx1 = get_session_context(&opts1);
{
let cache_guard = cache.lock().unwrap();
assert_eq!(cache_guard.len(), 1);
}
let _ctx1_again = get_session_context(&opts1);
{
let cache_guard = cache.lock().unwrap();
assert_eq!(cache_guard.len(), 1);
}
let opts2 = LanceExecutionOptions {
use_spilling: true,
..Default::default()
};
let _ctx2 = get_session_context(&opts2);
{
let cache_guard = cache.lock().unwrap();
assert_eq!(cache_guard.len(), 2);
}
}
#[test]
fn test_session_context_cache_lru_eviction() {
let _lock = CACHE_TEST_LOCK.lock().unwrap();
let cache = get_session_cache();
cache.lock().unwrap().clear();
let configs: Vec<LanceExecutionOptions> = (0..4)
.map(|i| LanceExecutionOptions {
mem_pool_size: Some((i + 1) as u64 * 1024 * 1024),
..Default::default()
})
.collect();
for config in &configs {
let _ctx = get_session_context(config);
}
{
let cache_guard = cache.lock().unwrap();
assert_eq!(cache_guard.len(), 4);
}
std::thread::sleep(std::time::Duration::from_millis(1));
let _ctx = get_session_context(&configs[0]);
let opts5 = LanceExecutionOptions {
mem_pool_size: Some(5 * 1024 * 1024),
..Default::default()
};
let _ctx5 = get_session_context(&opts5);
{
let cache_guard = cache.lock().unwrap();
assert_eq!(cache_guard.len(), 4);
let key0 = SessionContextCacheKey::from_options(&configs[0]);
assert!(
cache_guard.contains_key(&key0),
"config[0] should still be cached after recent access"
);
let key1 = SessionContextCacheKey::from_options(&configs[1]);
assert!(
!cache_guard.contains_key(&key1),
"config[1] should have been evicted"
);
let key5 = SessionContextCacheKey::from_options(&opts5);
assert!(
cache_guard.contains_key(&key5),
"new config should be cached"
);
}
}
}