#![cfg(feature = "tokio")]
use std::{fs, path::PathBuf, sync::Arc};
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field};
use fusio::{disk::LocalFs, executor::tokio::TokioExecutor, mem::fs::InMemoryFs};
use futures::TryStreamExt;
use crate::{
db::{BatchesThreshold, DB, Expr, NeverSeal, ScalarValue},
test_support::{execute_scan_plan, plan_scan_snapshot},
tests_internal::common::config_with_pk,
};
fn workspace_temp_dir(prefix: &str) -> PathBuf {
let base = std::env::current_dir().expect("cwd");
let dir = base.join("target").join("tmp").join(format!(
"{prefix}-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("time")
.as_nanos()
));
fs::create_dir_all(&dir).expect("create workspace temp dir");
dir
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn plan_execute_scan_merges_layers_with_residuals() -> Result<(), Box<dyn std::error::Error>>
{
let temp_root = workspace_temp_dir("plan-execute-scan");
let root_str = temp_root.to_string_lossy().into_owned();
let config = config_with_pk(
vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
],
&["id"],
);
let schema = config.schema();
let executor = Arc::new(TokioExecutor::default());
let mut db = DB::<LocalFs, TokioExecutor>::builder(config)
.on_disk(root_str.clone())?
.with_minor_compaction(1, 0)
.open_with_executor(Arc::clone(&executor))
.await?
.into_inner();
db.set_seal_policy(Arc::new(BatchesThreshold { batches: 1 }));
let immutable_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["a", "z"])) as _,
Arc::new(Int32Array::from(vec![1, 9])) as _,
],
)?;
db.ingest(immutable_batch).await?;
db.set_seal_policy(Arc::new(NeverSeal));
let mutable_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["m-live", "m-drop", "z"])) as _,
Arc::new(Int32Array::from(vec![3, 4, 0])) as _,
],
)?;
let tombstones = vec![false, true, true];
db.ingest_with_tombstones(mutable_batch, tombstones).await?;
let snapshot = db.begin_snapshot().await?;
let predicate = Expr::gt("v", ScalarValue::from(0i64));
let plan = plan_scan_snapshot(&snapshot, &db, &predicate, None, Some(2)).await?;
let stream = execute_scan_plan(&db, plan).await?;
let batches = stream.try_collect::<Vec<_>>().await?;
let mut rows: Vec<(String, i32)> = Vec::new();
for batch in batches {
let ids = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.expect("id column");
let values = batch
.column(1)
.as_any()
.downcast_ref::<Int32Array>()
.expect("value column");
for (id, v) in ids.iter().zip(values.iter()) {
if let (Some(id), Some(v)) = (id, v) {
rows.push((id.to_string(), v));
}
}
}
assert_eq!(rows.len(), 2, "limit should apply after residual filtering");
assert_eq!(rows, vec![("a".into(), 1), ("m-live".into(), 3)]);
if let Err(err) = fs::remove_dir_all(&temp_root) {
eprintln!("failed to clean temp dir {:?}: {err}", &temp_root);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn plan_execute_applies_limit_after_merge_ordering() -> Result<(), Box<dyn std::error::Error>>
{
let config = config_with_pk(
vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
],
&["id"],
);
let schema = config.schema();
let noop_exec = Arc::new(fusio::executor::NoopExecutor);
let mut db: crate::db::DbInner<InMemoryFs, fusio::executor::NoopExecutor> =
DB::<InMemoryFs, fusio::executor::NoopExecutor>::builder(config)
.in_memory("plan-execute-limit")?
.open_with_executor(noop_exec)
.await?
.into_inner();
db.set_seal_policy(Arc::new(BatchesThreshold { batches: 1 }));
let mut ids = Vec::new();
let mut values = Vec::new();
for idx in 0..1200 {
ids.push(format!("k{idx:04}"));
values.push(idx);
}
let immutable_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(ids)) as _,
Arc::new(Int32Array::from(values)) as _,
],
)?;
db.ingest(immutable_batch).await?;
db.set_seal_policy(Arc::new(NeverSeal));
let mutable_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["k2000", "k2001"])) as _,
Arc::new(Int32Array::from(vec![2000, 2001])) as _,
],
)?;
db.ingest(mutable_batch).await?;
let snapshot = db.begin_snapshot().await?;
let predicate = Expr::is_not_null("id");
let plan = plan_scan_snapshot(&snapshot, &db, &predicate, None, Some(100)).await?;
let stream = execute_scan_plan(&db, plan).await?;
let batches = stream.try_collect::<Vec<_>>().await?;
let mut seen = Vec::new();
for batch in batches {
let ids = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.expect("id column");
for id in ids.iter().flatten() {
seen.push(id.to_string());
}
}
assert_eq!(
seen.len(),
100,
"limit should cap results after merge ordering"
);
assert_eq!(seen.first().map(String::as_str), Some("k0000"));
assert_eq!(seen.last().map(String::as_str), Some("k0099"));
Ok(())
}