hirn_exec/operators/
context_assembly.rs1use std::any::Any;
13use std::fmt;
14use std::sync::Arc;
15
16use arrow_array::{LargeBinaryArray, RecordBatch};
17use arrow_schema::SchemaRef;
18use datafusion_common::{DataFusionError, Result};
19use datafusion_execution::{SendableRecordBatchStream, TaskContext};
20use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
21use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
22use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
23use futures::StreamExt as _;
24
25use crate::extensions::HirnSessionExt;
26
27#[derive(Debug, Clone)]
35pub struct ContextAssemblyExec {
36 input: Arc<dyn ExecutionPlan>,
37 schema: SchemaRef,
39 properties: PlanProperties,
40}
41
42impl ContextAssemblyExec {
43 pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
46 let schema = output_schema();
47 let properties = Self::make_properties(schema.clone());
48 Self {
49 input,
50 schema,
51 properties,
52 }
53 }
54
55 fn make_properties(schema: SchemaRef) -> PlanProperties {
56 PlanProperties::new(
57 datafusion_physical_expr::EquivalenceProperties::new(schema),
58 datafusion_physical_plan::Partitioning::UnknownPartitioning(1),
59 EmissionType::Final,
60 Boundedness::Bounded,
61 )
62 }
63}
64
65impl DisplayAs for ContextAssemblyExec {
66 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 write!(f, "ContextAssemblyExec mode=pipeline")
68 }
69}
70
71impl ExecutionPlan for ContextAssemblyExec {
72 fn name(&self) -> &str {
73 "ContextAssemblyExec"
74 }
75
76 fn as_any(&self) -> &dyn Any {
77 self
78 }
79
80 fn schema(&self) -> SchemaRef {
81 self.schema.clone()
82 }
83
84 fn properties(&self) -> &PlanProperties {
85 &self.properties
86 }
87
88 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
89 vec![&self.input]
90 }
91
92 fn with_new_children(
93 self: Arc<Self>,
94 mut children: Vec<Arc<dyn ExecutionPlan>>,
95 ) -> Result<Arc<dyn ExecutionPlan>> {
96 if children.len() != 1 {
97 return Err(DataFusionError::Plan(format!(
98 "ContextAssemblyExec requires exactly 1 child, got {}",
99 children.len()
100 )));
101 }
102 Ok(Arc::new(Self::new(children.swap_remove(0))))
103 }
104
105 fn execute(
106 &self,
107 partition: usize,
108 context: Arc<TaskContext>,
109 ) -> Result<SendableRecordBatchStream> {
110 let schema = self.schema.clone();
111 let mut input_stream = self.input.execute(partition, context.clone())?;
112
113 let fut = async move {
114 let mut candidate_batches: Vec<RecordBatch> = Vec::new();
115 while let Some(batch_result) = input_stream.next().await {
116 candidate_batches.push(batch_result.map_err(|e| {
117 DataFusionError::Execution(format!(
118 "ContextAssemblyExec: input batch error: {e}"
119 ))
120 })?);
121 }
122
123 let ext = context
124 .session_config()
125 .options()
126 .extensions
127 .get::<HirnSessionExt>()
128 .cloned()
129 .ok_or_else(|| {
130 DataFusionError::Execution(
131 "ContextAssemblyExec requires HirnSessionExt".to_string(),
132 )
133 })?;
134
135 let runtime = ext.context_assembly_runtime().ok_or_else(|| {
136 DataFusionError::Execution(
137 "ContextAssemblyExec requires a context assembly runtime".to_string(),
138 )
139 })?;
140
141 let payload: Vec<u8> = runtime
142 .assemble_from_batches(candidate_batches)
143 .await
144 .map_err(|e| DataFusionError::Execution(e.to_string()))?;
145
146 let output = RecordBatch::try_new(
147 schema.clone(),
148 vec![Arc::new(LargeBinaryArray::from(vec![payload.as_slice()]))],
149 )?;
150 Ok(output)
151 };
152
153 let stream = futures::stream::once(fut);
154 Ok(Box::pin(RecordBatchStreamAdapter::new(
155 self.schema.clone(),
156 stream,
157 )))
158 }
159}
160
161pub fn output_schema() -> SchemaRef {
166 use arrow_schema::{DataType, Field, Schema};
167 Arc::new(Schema::new(vec![Field::new(
168 "assembly_json",
169 DataType::LargeBinary,
170 false,
171 )]))
172}