use arrow::{
array::{Array, AsArray},
compute::{FilterBuilder, interleave_record_batch, prep_null_mask_filter},
row::{RowConverter, Rows, SortField},
};
use datafusion_expr::{ColumnarValue, Operator};
use std::mem::size_of;
use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
use super::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, RecordOutput,
};
use crate::spill::get_record_batch_memory_size;
use crate::{SendableRecordBatchStream, stream::RecordBatchStreamAdapter};
use arrow::array::{ArrayRef, RecordBatch};
use arrow::datatypes::SchemaRef;
use datafusion_common::{
HashMap, Result, ScalarValue, internal_datafusion_err, internal_err,
};
use datafusion_execution::{
memory_pool::{MemoryConsumer, MemoryReservation},
runtime_env::RuntimeEnv,
};
use datafusion_physical_expr::{
PhysicalExpr,
expressions::{BinaryExpr, DynamicFilterPhysicalExpr, is_not_null, is_null, lit},
};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use parking_lot::RwLock;
pub struct TopK {
schema: SchemaRef,
metrics: TopKMetrics,
reservation: MemoryReservation,
batch_size: usize,
expr: LexOrdering,
row_converter: RowConverter,
scratch_rows: Rows,
heap: TopKHeap,
common_sort_prefix_converter: Option<RowConverter>,
common_sort_prefix: Arc<[PhysicalSortExpr]>,
filter: Arc<RwLock<TopKDynamicFilters>>,
pub(crate) finished: bool,
}
#[derive(Debug, Clone)]
pub struct TopKDynamicFilters {
threshold_row: Option<Vec<u8>>,
expr: Arc<DynamicFilterPhysicalExpr>,
}
impl TopKDynamicFilters {
pub fn new(expr: Arc<DynamicFilterPhysicalExpr>) -> Self {
Self {
threshold_row: None,
expr,
}
}
pub fn expr(&self) -> Arc<DynamicFilterPhysicalExpr> {
Arc::clone(&self.expr)
}
}
const ESTIMATED_BYTES_PER_ROW: usize = 20;
fn build_sort_fields(
ordering: &[PhysicalSortExpr],
schema: &SchemaRef,
) -> Result<Vec<SortField>> {
ordering
.iter()
.map(|e| {
Ok(SortField::new_with_options(
e.expr.data_type(schema)?,
e.options,
))
})
.collect::<Result<_>>()
}
impl TopK {
#[expect(clippy::too_many_arguments)]
#[expect(clippy::needless_pass_by_value)]
pub fn try_new(
partition_id: usize,
schema: SchemaRef,
common_sort_prefix: Vec<PhysicalSortExpr>,
expr: LexOrdering,
k: usize,
batch_size: usize,
runtime: Arc<RuntimeEnv>,
metrics: &ExecutionPlanMetricsSet,
filter: Arc<RwLock<TopKDynamicFilters>>,
) -> Result<Self> {
let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
.register(&runtime.memory_pool);
let sort_fields = build_sort_fields(&expr, &schema)?;
let row_converter = RowConverter::new(sort_fields)?;
let scratch_rows =
row_converter.empty_rows(batch_size, ESTIMATED_BYTES_PER_ROW * batch_size);
let prefix_row_converter = if common_sort_prefix.is_empty() {
None
} else {
let input_sort_fields = build_sort_fields(&common_sort_prefix, &schema)?;
Some(RowConverter::new(input_sort_fields)?)
};
Ok(Self {
schema: Arc::clone(&schema),
metrics: TopKMetrics::new(metrics, partition_id),
reservation,
batch_size,
expr,
row_converter,
scratch_rows,
heap: TopKHeap::new(k, batch_size),
common_sort_prefix_converter: prefix_row_converter,
common_sort_prefix: Arc::from(common_sort_prefix),
finished: false,
filter,
})
}
#[expect(clippy::needless_pass_by_value)]
pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
let baseline = self.metrics.baseline.clone();
let _timer = baseline.elapsed_compute().timer();
let mut sort_keys: Vec<ArrayRef> = self
.expr
.iter()
.map(|expr| {
let value = expr.expr.evaluate(&batch)?;
value.into_array(batch.num_rows())
})
.collect::<Result<Vec<_>>>()?;
let mut selected_rows = None;
let filter = self.filter.read().expr.current()?;
let filtered = filter.evaluate(&batch)?;
let num_rows = batch.num_rows();
let array = filtered.into_array(num_rows)?;
let mut filter = array.as_boolean().clone();
let true_count = filter.true_count();
if true_count == 0 {
return Ok(());
}
if true_count < num_rows {
if filter.nulls().is_some() {
filter = prep_null_mask_filter(&filter);
}
let filter_predicate = FilterBuilder::new(&filter);
let filter_predicate = if sort_keys.len() > 1 {
filter_predicate.optimize().build()
} else {
filter_predicate.build()
};
selected_rows = Some(filter);
sort_keys = sort_keys
.iter()
.map(|key| filter_predicate.filter(key).map_err(|x| x.into()))
.collect::<Result<Vec<_>>>()?;
}
let rows = &mut self.scratch_rows;
rows.clear();
self.row_converter.append(rows, &sort_keys)?;
let mut batch_entry = self.heap.register_batch(batch.clone());
let replacements = match selected_rows {
Some(filter) => {
self.find_new_topk_items(filter.values().set_indices(), &mut batch_entry)
}
None => self.find_new_topk_items(0..sort_keys[0].len(), &mut batch_entry),
};
if replacements > 0 {
self.metrics.row_replacements.add(replacements);
self.heap.insert_batch_entry(batch_entry);
self.heap.maybe_compact()?;
self.reservation.try_resize(self.size())?;
self.attempt_early_completion(&batch)?;
self.update_filter()?;
}
Ok(())
}
fn find_new_topk_items(
&mut self,
items: impl Iterator<Item = usize>,
batch_entry: &mut RecordBatchEntry,
) -> usize {
let mut replacements = 0;
let rows = &mut self.scratch_rows;
for (index, row) in items.zip(rows.iter()) {
match self.heap.max() {
Some(max_row) if row.as_ref() >= max_row.row() => {}
None | Some(_) => {
self.heap.add(batch_entry, row, index);
replacements += 1;
}
}
}
replacements
}
fn update_filter(&mut self) -> Result<()> {
let Some(max_row) = self.heap.max() else {
return Ok(());
};
let new_threshold_row = &max_row.row;
let needs_update = self
.filter
.read()
.threshold_row
.as_ref()
.map(|current_row| {
new_threshold_row < current_row
})
.unwrap_or(true);
if !needs_update {
return Ok(());
}
let thresholds = match self.heap.get_threshold_values(&self.expr)? {
Some(t) => t,
None => return Ok(()),
};
let predicate = Self::build_filter_expression(&self.expr, &thresholds)?;
let new_threshold = new_threshold_row.to_vec();
let mut filter = self.filter.write();
let old_threshold = filter.threshold_row.take();
match old_threshold {
Some(old_threshold) => {
if new_threshold.as_slice() < old_threshold.as_slice() {
filter.threshold_row = Some(new_threshold);
} else {
filter.threshold_row = Some(old_threshold);
return Ok(());
}
}
None => {
filter.threshold_row = Some(new_threshold);
}
};
if let Some(pred) = predicate
&& !pred.eq(&lit(true))
{
filter.expr.update(pred)?;
}
Ok(())
}
fn build_filter_expression(
sort_exprs: &[PhysicalSortExpr],
thresholds: &[ScalarValue],
) -> Result<Option<Arc<dyn PhysicalExpr>>> {
let mut filters: Vec<Arc<dyn PhysicalExpr>> =
Vec::with_capacity(thresholds.len());
let mut prev_sort_expr: Option<Arc<dyn PhysicalExpr>> = None;
for (sort_expr, value) in sort_exprs.iter().zip(thresholds.iter()) {
let op = if sort_expr.options.descending {
Operator::Gt
} else {
Operator::Lt
};
let value_null = value.is_null();
let comparison = Arc::new(BinaryExpr::new(
Arc::clone(&sort_expr.expr),
op,
lit(value.clone()),
));
let comparison_with_null = match (sort_expr.options.nulls_first, value_null) {
(true, true) => lit(false),
(true, false) => Arc::new(BinaryExpr::new(
is_null(Arc::clone(&sort_expr.expr))?,
Operator::Or,
comparison,
)),
(false, true) => is_not_null(Arc::clone(&sort_expr.expr))?,
(false, false) => comparison,
};
let mut eq_expr = Arc::new(BinaryExpr::new(
Arc::clone(&sort_expr.expr),
Operator::Eq,
lit(value.clone()),
));
if value_null {
eq_expr = Arc::new(BinaryExpr::new(
is_null(Arc::clone(&sort_expr.expr))?,
Operator::Or,
eq_expr,
));
}
match prev_sort_expr.take() {
None => {
prev_sort_expr = Some(eq_expr);
filters.push(comparison_with_null);
}
Some(p) => {
filters.push(Arc::new(BinaryExpr::new(
Arc::clone(&p),
Operator::And,
comparison_with_null,
)));
prev_sort_expr =
Some(Arc::new(BinaryExpr::new(p, Operator::And, eq_expr)));
}
}
}
let dynamic_predicate = filters
.into_iter()
.reduce(|a, b| Arc::new(BinaryExpr::new(a, Operator::Or, b)));
Ok(dynamic_predicate)
}
fn attempt_early_completion(&mut self, batch: &RecordBatch) -> Result<()> {
if batch.num_rows() == 0 {
return Ok(());
}
let Some(prefix_converter) = &self.common_sort_prefix_converter else {
return Ok(());
};
let Some(max_topk_row) = self.heap.max() else {
return Ok(());
};
let last_row_idx = batch.num_rows() - 1;
let mut batch_prefix_scratch =
prefix_converter.empty_rows(1, ESTIMATED_BYTES_PER_ROW);
self.compute_common_sort_prefix(batch, last_row_idx, &mut batch_prefix_scratch)?;
let store_entry = self
.heap
.store
.get(max_topk_row.batch_id)
.ok_or(internal_datafusion_err!("Invalid batch id in topK heap"))?;
let max_batch = &store_entry.batch;
let mut heap_prefix_scratch =
prefix_converter.empty_rows(1, ESTIMATED_BYTES_PER_ROW); self.compute_common_sort_prefix(
max_batch,
max_topk_row.index,
&mut heap_prefix_scratch,
)?;
if batch_prefix_scratch.row(0).as_ref() > heap_prefix_scratch.row(0).as_ref() {
self.finished = true;
}
Ok(())
}
fn compute_common_sort_prefix(
&self,
batch: &RecordBatch,
last_row_idx: usize,
scratch: &mut Rows,
) -> Result<()> {
let last_row: Vec<ArrayRef> = self
.common_sort_prefix
.iter()
.map(|expr| {
expr.expr
.evaluate(&batch.slice(last_row_idx, 1))?
.into_array(1)
})
.collect::<Result<_>>()?;
self.common_sort_prefix_converter
.as_ref()
.unwrap()
.append(scratch, &last_row)?;
Ok(())
}
pub fn emit(self) -> Result<SendableRecordBatchStream> {
let Self {
schema,
metrics,
reservation: _,
batch_size,
expr: _,
row_converter: _,
scratch_rows: _,
mut heap,
common_sort_prefix_converter: _,
common_sort_prefix: _,
finished: _,
filter,
} = self;
let _timer = metrics.baseline.elapsed_compute().timer();
filter.read().expr().mark_complete();
let mut batches = vec![];
if let Some(mut batch) = heap.emit()? {
(&batch).record_output(&metrics.baseline);
loop {
if batch.num_rows() <= batch_size {
batches.push(Ok(batch));
break;
} else {
batches.push(Ok(batch.slice(0, batch_size)));
let remaining_length = batch.num_rows() - batch_size;
batch = batch.slice(batch_size, remaining_length);
}
}
};
Ok(Box::pin(RecordBatchStreamAdapter::new(
schema,
futures::stream::iter(batches),
)))
}
fn size(&self) -> usize {
size_of::<Self>()
+ self.row_converter.size()
+ self.scratch_rows.size()
+ self.heap.size()
}
}
struct TopKMetrics {
pub baseline: BaselineMetrics,
pub row_replacements: Count,
}
impl TopKMetrics {
fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
baseline: BaselineMetrics::new(metrics, partition),
row_replacements: MetricBuilder::new(metrics)
.counter("row_replacements", partition),
}
}
}
struct TopKHeap {
k: usize,
batch_size: usize,
inner: BinaryHeap<TopKRow>,
store: RecordBatchStore,
owned_bytes: usize,
}
impl TopKHeap {
fn new(k: usize, batch_size: usize) -> Self {
assert!(k > 0);
Self {
k,
batch_size,
inner: BinaryHeap::new(),
store: RecordBatchStore::new(),
owned_bytes: 0,
}
}
pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
self.store.register(batch)
}
pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
self.store.insert(entry)
}
fn max(&self) -> Option<&TopKRow> {
if self.inner.len() < self.k {
None
} else {
self.inner.peek()
}
}
fn add(
&mut self,
batch_entry: &mut RecordBatchEntry,
row: impl AsRef<[u8]>,
index: usize,
) {
let batch_id = batch_entry.id;
batch_entry.uses += 1;
assert!(self.inner.len() <= self.k);
let row = row.as_ref();
if self.inner.len() == self.k {
let mut prev_min = self.inner.peek_mut().unwrap();
if prev_min.batch_id == batch_entry.id {
batch_entry.uses -= 1;
} else {
self.store.unuse(prev_min.batch_id);
}
self.owned_bytes -= prev_min.owned_size();
prev_min.replace_with(row, batch_id, index);
self.owned_bytes += prev_min.owned_size();
} else {
let new_row = TopKRow::new(row, batch_id, index);
self.owned_bytes += new_row.owned_size();
self.inner.push(new_row);
};
}
pub fn emit(&mut self) -> Result<Option<RecordBatch>> {
Ok(self.emit_with_state()?.0)
}
pub fn emit_with_state(&mut self) -> Result<(Option<RecordBatch>, Vec<TopKRow>)> {
let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();
if self.store.is_empty() {
return Ok((None, topk_rows));
}
let mut record_batches = Vec::new();
let mut batch_id_array_pos = HashMap::new();
for (array_pos, (batch_id, batch)) in self.store.batches.iter().enumerate() {
record_batches.push(&batch.batch);
batch_id_array_pos.insert(*batch_id, array_pos);
}
let indices: Vec<_> = topk_rows
.iter()
.map(|k| (batch_id_array_pos[&k.batch_id], k.index))
.collect();
let new_batch = interleave_record_batch(&record_batches, &indices)?;
Ok((Some(new_batch), topk_rows))
}
pub fn maybe_compact(&mut self) -> Result<()> {
let max_unused_rows = (20 * self.batch_size) + self.k;
let unused_rows = self.store.unused_rows();
if self.store.len() <= 2 || unused_rows < max_unused_rows {
return Ok(());
}
let num_rows = self.inner.len();
let (new_batch, mut topk_rows) = self.emit_with_state()?;
let Some(new_batch) = new_batch else {
return Ok(());
};
self.store.clear();
let mut batch_entry = self.register_batch(new_batch);
batch_entry.uses = num_rows;
for (i, topk_row) in topk_rows.iter_mut().enumerate() {
topk_row.batch_id = batch_entry.id;
topk_row.index = i;
}
self.insert_batch_entry(batch_entry);
self.inner = BinaryHeap::from(topk_rows);
Ok(())
}
fn size(&self) -> usize {
size_of::<Self>()
+ (self.inner.capacity() * size_of::<TopKRow>())
+ self.store.size()
+ self.owned_bytes
}
fn get_threshold_values(
&self,
sort_exprs: &[PhysicalSortExpr],
) -> Result<Option<Vec<ScalarValue>>> {
let max_row = match self.max() {
Some(row) => row,
None => return Ok(None),
};
let batch_entry = match self.store.get(max_row.batch_id) {
Some(entry) => entry,
None => return internal_err!("Invalid batch ID in TopKRow"),
};
let mut scalar_values = Vec::with_capacity(sort_exprs.len());
for sort_expr in sort_exprs {
let expr = Arc::clone(&sort_expr.expr);
let value = expr.evaluate(&batch_entry.batch.slice(max_row.index, 1))?;
let scalar = match value {
ColumnarValue::Scalar(scalar) => scalar,
ColumnarValue::Array(array) if array.len() == 1 => {
ScalarValue::try_from_array(&array, 0)?
}
array => {
return internal_err!("Expected a scalar value, got {:?}", array);
}
};
scalar_values.push(scalar);
}
Ok(Some(scalar_values))
}
}
#[derive(Debug, PartialEq)]
struct TopKRow {
row: Vec<u8>,
batch_id: u32,
index: usize,
}
impl TopKRow {
fn new(row: impl AsRef<[u8]>, batch_id: u32, index: usize) -> Self {
Self {
row: row.as_ref().to_vec(),
batch_id,
index,
}
}
fn replace_with(&mut self, new_row: impl AsRef<[u8]>, batch_id: u32, index: usize) {
self.row.clear();
self.row.extend_from_slice(new_row.as_ref());
self.batch_id = batch_id;
self.index = index;
}
fn owned_size(&self) -> usize {
self.row.capacity()
}
fn row(&self) -> &[u8] {
self.row.as_slice()
}
}
impl Eq for TopKRow {}
impl PartialOrd for TopKRow {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for TopKRow {
fn cmp(&self, other: &Self) -> Ordering {
self.row.cmp(&other.row)
}
}
#[derive(Debug)]
struct RecordBatchEntry {
id: u32,
batch: RecordBatch,
uses: usize,
}
#[derive(Debug)]
struct RecordBatchStore {
next_id: u32,
batches: HashMap<u32, RecordBatchEntry>,
batches_size: usize,
}
impl RecordBatchStore {
fn new() -> Self {
Self {
next_id: 0,
batches: HashMap::new(),
batches_size: 0,
}
}
pub fn register(&mut self, batch: RecordBatch) -> RecordBatchEntry {
let id = self.next_id;
self.next_id += 1;
RecordBatchEntry { id, batch, uses: 0 }
}
pub fn insert(&mut self, entry: RecordBatchEntry) {
if entry.uses > 0 {
self.batches_size += get_record_batch_memory_size(&entry.batch);
self.batches.insert(entry.id, entry);
}
}
fn clear(&mut self) {
self.batches.clear();
self.batches_size = 0;
}
fn get(&self, id: u32) -> Option<&RecordBatchEntry> {
self.batches.get(&id)
}
fn len(&self) -> usize {
self.batches.len()
}
fn unused_rows(&self) -> usize {
self.batches
.values()
.map(|batch_entry| batch_entry.batch.num_rows() - batch_entry.uses)
.sum()
}
fn is_empty(&self) -> bool {
self.batches.is_empty()
}
pub fn unuse(&mut self, id: u32) {
let remove = if let Some(batch_entry) = self.batches.get_mut(&id) {
batch_entry.uses = batch_entry.uses.checked_sub(1).expect("underflow");
batch_entry.uses == 0
} else {
panic!("No entry for id {id}");
};
if remove {
let old_entry = self.batches.remove(&id).unwrap();
self.batches_size = self
.batches_size
.checked_sub(get_record_batch_memory_size(&old_entry.batch))
.unwrap();
}
}
pub fn size(&self) -> usize {
size_of::<Self>()
+ self.batches.capacity() * (size_of::<u32>() + size_of::<RecordBatchEntry>())
+ self.batches_size
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{Float64Array, Int32Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};
use arrow_schema::SortOptions;
use datafusion_common::assert_batches_eq;
use datafusion_physical_expr::expressions::col;
use futures::TryStreamExt;
#[test]
fn test_record_batch_store_size() {
let schema = Arc::new(Schema::new(vec![
Field::new("ints", DataType::Int32, true),
Field::new("float64", DataType::Float64, false),
]));
let mut record_batch_store = RecordBatchStore::new();
let int_array =
Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]); let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
let record_batch_entry = RecordBatchEntry {
id: 0,
batch: RecordBatch::try_new(
schema,
vec![Arc::new(int_array), Arc::new(float64_array)],
)
.unwrap(),
uses: 1,
};
record_batch_store.insert(record_batch_entry);
assert_eq!(record_batch_store.batches_size, 60);
record_batch_store.unuse(0);
assert_eq!(record_batch_store.batches_size, 0);
}
#[tokio::test]
async fn test_try_finish_marks_finished_with_prefix() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Float64, false),
]));
let sort_expr_a = PhysicalSortExpr {
expr: col("a", schema.as_ref())?,
options: SortOptions::default(),
};
let sort_expr_b = PhysicalSortExpr {
expr: col("b", schema.as_ref())?,
options: SortOptions::default(),
};
let prefix = vec![sort_expr_a.clone()];
let full_expr = LexOrdering::from([sort_expr_a, sort_expr_b]);
let runtime = Arc::new(RuntimeEnv::default());
let metrics = ExecutionPlanMetricsSet::new();
let mut topk = TopK::try_new(
0,
Arc::clone(&schema),
prefix,
full_expr,
3,
2,
runtime,
&metrics,
Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
DynamicFilterPhysicalExpr::new(vec![], lit(true)),
)))),
)?;
let array_a1: ArrayRef =
Arc::new(Int32Array::from(vec![Some(1), Some(1), Some(2)]));
let array_b1: ArrayRef = Arc::new(Float64Array::from(vec![20.0, 15.0, 30.0]));
let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![array_a1, array_b1])?;
topk.insert_batch(batch1)?;
assert!(
!topk.finished,
"Expected 'finished' to be false after the first batch."
);
let array_a2: ArrayRef = Arc::new(Int32Array::from(vec![Some(2), Some(3)]));
let array_b2: ArrayRef = Arc::new(Float64Array::from(vec![10.0, 20.0]));
let batch2 = RecordBatch::try_new(Arc::clone(&schema), vec![array_a2, array_b2])?;
topk.insert_batch(batch2)?;
assert!(
topk.finished,
"Expected 'finished' to be true after the second batch."
);
let results: Vec<_> = topk.emit()?.try_collect().await?;
assert_batches_eq!(
&[
"+---+------+",
"| a | b |",
"+---+------+",
"| 1 | 15.0 |",
"| 1 | 20.0 |",
"| 2 | 10.0 |",
"+---+------+",
],
&results
);
Ok(())
}
#[tokio::test]
async fn test_topk_marks_filter_complete() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let sort_expr = PhysicalSortExpr {
expr: col("a", schema.as_ref())?,
options: SortOptions::default(),
};
let full_expr = LexOrdering::from([sort_expr.clone()]);
let prefix = vec![sort_expr];
let runtime = Arc::new(RuntimeEnv::default());
let metrics = ExecutionPlanMetricsSet::new();
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], lit(true)));
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
let mut topk = TopK::try_new(
0,
Arc::clone(&schema),
prefix,
full_expr,
2,
10,
runtime,
&metrics,
Arc::new(RwLock::new(TopKDynamicFilters::new(dynamic_filter))),
)?;
let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(1), Some(2)]));
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array])?;
topk.insert_batch(batch)?;
let _results: Vec<_> = topk.emit()?.try_collect().await?;
dynamic_filter_clone.wait_complete().await;
Ok(())
}
}