use crate::DistributedTaskContext;
use crate::common::require_one_child;
use datafusion::execution::TaskContext;
use datafusion::physical_expr_common::metrics::MetricsSet;
use datafusion::physical_plan::ExecutionPlanProperties;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::{
error::Result,
execution::SendableRecordBatchStream,
physical_plan::{
DisplayAs, DisplayFormatType, EmptyRecordBatchStream, ExecutionPlan, Partitioning,
PlanProperties,
},
};
use futures::TryStreamExt;
use std::{fmt::Formatter, sync::Arc};
#[derive(Debug)]
pub struct PartitionIsolatorExec {
pub(crate) input: Arc<dyn ExecutionPlan>,
pub(crate) properties: Arc<PlanProperties>,
pub(crate) n_tasks: usize,
pub(crate) metrics: ExecutionPlanMetricsSet,
}
impl PartitionIsolatorExec {
pub fn new(input: Arc<dyn ExecutionPlan>, n_tasks: usize) -> Self {
let input_partitions = input.properties().partitioning.partition_count();
let partition_count = Self::partition_groups(input_partitions, n_tasks)[0].len();
let properties = <PlanProperties as Clone>::clone(&input.properties().clone())
.with_partitioning(Partitioning::UnknownPartitioning(partition_count));
Self {
input: input.clone(),
properties: Arc::new(properties),
n_tasks,
metrics: ExecutionPlanMetricsSet::new(),
}
}
fn partition_groups(input_partitions: usize, n_tasks: usize) -> Vec<Vec<usize>> {
let q = input_partitions / n_tasks;
let r = input_partitions % n_tasks;
let mut off = 0;
(0..n_tasks)
.map(|i| q + if i < r { 1 } else { 0 })
.map(|n| {
let result = (off..(off + n)).collect();
off += n;
result
})
.collect()
}
pub(crate) fn partition_group(
input_partitions: usize,
task_i: usize,
n_tasks: usize,
) -> Vec<usize> {
Self::partition_groups(input_partitions, n_tasks)[task_i].clone()
}
}
impl DisplayAs for PartitionIsolatorExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
let input_partitions = self.input.output_partitioning().partition_count();
write!(f, "PartitionIsolatorExec: ")?;
match t {
DisplayFormatType::Verbose => {
let partition_groups = Self::partition_groups(input_partitions, self.n_tasks);
let n: usize = partition_groups.iter().map(|v| v.len()).sum();
let mut partitions = vec![];
for _ in 0..self.n_tasks {
partitions.push(vec!["__".to_string(); n]);
}
for (i, partition_group) in partition_groups.iter().enumerate() {
for (j, p) in partition_group.iter().enumerate() {
partitions[i][*p] = format!("p{j}")
}
write!(f, "t{i}:[{}] ", partitions[i].join(","))?;
}
}
_ => {
write!(f, "tasks={} partitions={}", self.n_tasks, input_partitions)?;
}
}
Ok(())
}
}
impl ExecutionPlan for PartitionIsolatorExec {
fn name(&self) -> &str {
"PartitionIsolatorExec"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let input = require_one_child(children)?;
Ok(Arc::new(Self::new(input, self.n_tasks)))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let task_context = DistributedTaskContext::from_ctx(&context);
let metric = MetricBuilder::new(&self.metrics).output_rows(partition);
let input_partitions = self.input.output_partitioning().partition_count();
let partition_group = Self::partition_group(
input_partitions,
task_context.task_index,
task_context.task_count,
);
match partition_group.get(partition) {
Some(actual_partition_number) => {
if *actual_partition_number >= input_partitions {
Ok(Box::pin(EmptyRecordBatchStream::new(self.input.schema()))
as SendableRecordBatchStream)
} else {
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
self.input
.execute(*actual_partition_number, context)?
.inspect_ok(move |v| metric.add(v.num_rows())),
)))
}
}
None => Ok(Box::pin(EmptyRecordBatchStream::new(self.input.schema()))
as SendableRecordBatchStream),
}
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_partition_groups() {
assert_eq!(
PartitionIsolatorExec::partition_groups(2, 1),
vec![vec![0, 1]]
);
assert_eq!(
PartitionIsolatorExec::partition_groups(6, 2),
vec![vec![0, 1, 2], vec![3, 4, 5]]
);
assert_eq!(
PartitionIsolatorExec::partition_groups(6, 3),
vec![vec![0, 1], vec![2, 3], vec![4, 5]]
);
assert_eq!(
PartitionIsolatorExec::partition_groups(6, 4),
vec![vec![0, 1], vec![2, 3], vec![4], vec![5]]
);
assert_eq!(
PartitionIsolatorExec::partition_groups(10, 3),
vec![vec![0, 1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]
);
assert_eq!(
PartitionIsolatorExec::partition_groups(10, 4),
vec![vec![0, 1, 2], vec![3, 4, 5], vec![6, 7], vec![8, 9]]
);
}
}