Skip to main content

hirn_exec/operators/
context_assembly.rs

1//! `ContextAssemblyExec` — Arrow-native THINK context assembly operator.
2//!
3//! This operator makes THINK context assembly a visible DataFusion physical
4//! plan node.  It takes an upstream `ExecutionPlan` input (pipeline mode),
5//! materialises the batches, and calls the per-query [`ContextAssemblyRuntime`]
6//! registered in [`HirnSessionExt`].
7//!
8//! The operator emits exactly one output row: `{ assembly_json: LargeBinary }`
9//! containing the JSON-serialised `ThinkAssemblyOutput`.  This makes context
10//! assembly a true DataFusion pipeline step visible in EXPLAIN ANALYZE output.
11
12use std::any::Any;
13use std::fmt;
14use std::sync::Arc;
15
16use arrow_array::{LargeBinaryArray, RecordBatch};
17use arrow_schema::SchemaRef;
18use datafusion_common::{DataFusionError, Result};
19use datafusion_execution::{SendableRecordBatchStream, TaskContext};
20use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
21use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
22use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
23use futures::StreamExt as _;
24
25use crate::extensions::HirnSessionExt;
26
27// ── Operator ────────────────────────────────────────────────────────────────
28// (AssemblyMode::Payload removed — pre-assembled roundtrip was pure overhead)
29
30/// DataFusion physical operator that emits a THINK context assembly result.
31///
32/// Pipeline-breaking: produces exactly one output row
33/// `{ assembly_json: LargeBinary }`.  Parallelism is always 1.
34#[derive(Debug, Clone)]
35pub struct ContextAssemblyExec {
36    input: Arc<dyn ExecutionPlan>,
37    /// Output schema: single `assembly_json LargeBinary` column.
38    schema: SchemaRef,
39    properties: PlanProperties,
40}
41
42impl ContextAssemblyExec {
43    /// Create a `ContextAssemblyExec` that materialises `input` batches and
44    /// calls the [`ContextAssemblyRuntime`] registered in [`HirnSessionExt`].
45    pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
46        let schema = output_schema();
47        let properties = Self::make_properties(schema.clone());
48        Self {
49            input,
50            schema,
51            properties,
52        }
53    }
54
55    fn make_properties(schema: SchemaRef) -> PlanProperties {
56        PlanProperties::new(
57            datafusion_physical_expr::EquivalenceProperties::new(schema),
58            datafusion_physical_plan::Partitioning::UnknownPartitioning(1),
59            EmissionType::Final,
60            Boundedness::Bounded,
61        )
62    }
63}
64
65impl DisplayAs for ContextAssemblyExec {
66    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        write!(f, "ContextAssemblyExec mode=pipeline")
68    }
69}
70
71impl ExecutionPlan for ContextAssemblyExec {
72    fn name(&self) -> &str {
73        "ContextAssemblyExec"
74    }
75
76    fn as_any(&self) -> &dyn Any {
77        self
78    }
79
80    fn schema(&self) -> SchemaRef {
81        self.schema.clone()
82    }
83
84    fn properties(&self) -> &PlanProperties {
85        &self.properties
86    }
87
88    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
89        vec![&self.input]
90    }
91
92    fn with_new_children(
93        self: Arc<Self>,
94        mut children: Vec<Arc<dyn ExecutionPlan>>,
95    ) -> Result<Arc<dyn ExecutionPlan>> {
96        if children.len() != 1 {
97            return Err(DataFusionError::Plan(format!(
98                "ContextAssemblyExec requires exactly 1 child, got {}",
99                children.len()
100            )));
101        }
102        Ok(Arc::new(Self::new(children.swap_remove(0))))
103    }
104
105    fn execute(
106        &self,
107        partition: usize,
108        context: Arc<TaskContext>,
109    ) -> Result<SendableRecordBatchStream> {
110        let schema = self.schema.clone();
111        let mut input_stream = self.input.execute(partition, context.clone())?;
112
113        let fut = async move {
114            let mut candidate_batches: Vec<RecordBatch> = Vec::new();
115            while let Some(batch_result) = input_stream.next().await {
116                candidate_batches.push(batch_result.map_err(|e| {
117                    DataFusionError::Execution(format!(
118                        "ContextAssemblyExec: input batch error: {e}"
119                    ))
120                })?);
121            }
122
123            let ext = context
124                .session_config()
125                .options()
126                .extensions
127                .get::<HirnSessionExt>()
128                .cloned()
129                .ok_or_else(|| {
130                    DataFusionError::Execution(
131                        "ContextAssemblyExec requires HirnSessionExt".to_string(),
132                    )
133                })?;
134
135            let runtime = ext.context_assembly_runtime().ok_or_else(|| {
136                DataFusionError::Execution(
137                    "ContextAssemblyExec requires a context assembly runtime".to_string(),
138                )
139            })?;
140
141            let payload: Vec<u8> = runtime
142                .assemble_from_batches(candidate_batches)
143                .await
144                .map_err(|e| DataFusionError::Execution(e.to_string()))?;
145
146            let output = RecordBatch::try_new(
147                schema.clone(),
148                vec![Arc::new(LargeBinaryArray::from(vec![payload.as_slice()]))],
149            )?;
150            Ok(output)
151        };
152
153        let stream = futures::stream::once(fut);
154        Ok(Box::pin(RecordBatchStreamAdapter::new(
155            self.schema.clone(),
156            stream,
157        )))
158    }
159}
160
161/// Output schema for `ContextAssemblyExec`.
162///
163/// A single `assembly_json LargeBinary` column containing the JSON-encoded
164/// `ThinkAssemblyOutput`.
165pub fn output_schema() -> SchemaRef {
166    use arrow_schema::{DataType, Field, Schema};
167    Arc::new(Schema::new(vec![Field::new(
168        "assembly_json",
169        DataType::LargeBinary,
170        false,
171    )]))
172}