Skip to main content

uni_query/query/df_graph/
ext_id_lookup.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! External ID lookup execution plan for DataFusion.
5//!
6//! This module provides [`GraphExtIdLookupExec`], a DataFusion [`ExecutionPlan`] that
7//! looks up a single vertex by its external ID (`ext_id`) in the main vertices table.
8//!
9//! # Column Naming Convention
10//!
11//! The output schema includes:
12//! - `_vid` - vertex ID
13//! - `ext_id` - external ID
14//! - `_label` - vertex label name
15//! - `{variable}.{property}` - materialized properties
16
17use crate::query::df_graph::GraphExecutionContext;
18use crate::query::df_graph::common::{arrow_err, compute_plan_properties};
19use arrow_array::builder::StringBuilder;
20use arrow_array::{ArrayRef, RecordBatch, UInt64Array};
21use arrow_schema::{DataType, Field, Schema, SchemaRef};
22use datafusion::common::Result as DFResult;
23use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
24use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
26use futures::Stream;
27use std::any::Any;
28use std::collections::HashMap;
29use std::fmt;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33use uni_common::core::id::Vid;
34
35/// Execution plan for looking up a vertex by external ID.
36///
37/// Queries the main vertices table to find a vertex matching the given `ext_id`,
38/// then materializes the specified properties.
39pub struct GraphExtIdLookupExec {
40    /// Graph execution context for storage access.
41    graph_ctx: Arc<GraphExecutionContext>,
42
43    /// Variable name for column prefixing.
44    variable: String,
45
46    /// External ID to look up.
47    ext_id: String,
48
49    /// Properties to materialize.
50    projected_properties: Vec<String>,
51
52    /// Whether the lookup is optional (OPTIONAL MATCH).
53    optional: bool,
54
55    /// Output schema.
56    schema: SchemaRef,
57
58    /// Plan properties (cached).
59    properties: PlanProperties,
60
61    /// Execution metrics.
62    metrics: ExecutionPlanMetricsSet,
63}
64
65impl fmt::Debug for GraphExtIdLookupExec {
66    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        f.debug_struct("GraphExtIdLookupExec")
68            .field("variable", &self.variable)
69            .field("ext_id", &self.ext_id)
70            .field("projected_properties", &self.projected_properties)
71            .field("optional", &self.optional)
72            .finish()
73    }
74}
75
76impl GraphExtIdLookupExec {
77    /// Create a new external ID lookup executor.
78    pub fn new(
79        graph_ctx: Arc<GraphExecutionContext>,
80        variable: impl Into<String>,
81        ext_id: impl Into<String>,
82        projected_properties: Vec<String>,
83        optional: bool,
84    ) -> Self {
85        let variable = variable.into();
86        let ext_id = ext_id.into();
87
88        // Build output schema
89        let schema = Self::build_schema(&variable, &projected_properties);
90        let properties = compute_plan_properties(schema.clone());
91
92        Self {
93            graph_ctx,
94            variable,
95            ext_id,
96            projected_properties,
97            optional,
98            schema,
99            properties,
100            metrics: ExecutionPlanMetricsSet::new(),
101        }
102    }
103
104    /// Build the output schema.
105    fn build_schema(variable: &str, properties: &[String]) -> SchemaRef {
106        let mut fields = vec![
107            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
108            Field::new(format!("{}.ext_id", variable), DataType::Utf8, false),
109            Field::new(format!("{}._label", variable), DataType::Utf8, false),
110        ];
111
112        // Add property columns with variable prefix
113        for prop in properties {
114            let col_name = format!("{}.{}", variable, prop);
115            fields.push(Field::new(&col_name, DataType::Utf8, true));
116        }
117
118        Arc::new(Schema::new(fields))
119    }
120}
121
122impl DisplayAs for GraphExtIdLookupExec {
123    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        write!(
125            f,
126            "GraphExtIdLookupExec: ext_id={}, variable={}, optional={}",
127            self.ext_id, self.variable, self.optional
128        )
129    }
130}
131
132impl ExecutionPlan for GraphExtIdLookupExec {
133    fn name(&self) -> &str {
134        "GraphExtIdLookupExec"
135    }
136
137    fn as_any(&self) -> &dyn Any {
138        self
139    }
140
141    fn schema(&self) -> SchemaRef {
142        self.schema.clone()
143    }
144
145    fn properties(&self) -> &PlanProperties {
146        &self.properties
147    }
148
149    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
150        vec![]
151    }
152
153    fn with_new_children(
154        self: Arc<Self>,
155        children: Vec<Arc<dyn ExecutionPlan>>,
156    ) -> DFResult<Arc<dyn ExecutionPlan>> {
157        if !children.is_empty() {
158            return Err(datafusion::error::DataFusionError::Internal(
159                "GraphExtIdLookupExec has no children".to_string(),
160            ));
161        }
162        Ok(self)
163    }
164
165    fn execute(
166        &self,
167        partition: usize,
168        _context: Arc<TaskContext>,
169    ) -> DFResult<SendableRecordBatchStream> {
170        let metrics = BaselineMetrics::new(&self.metrics, partition);
171
172        Ok(Box::pin(ExtIdLookupStream::new(
173            self.graph_ctx.clone(),
174            self.variable.clone(),
175            self.ext_id.clone(),
176            self.projected_properties.clone(),
177            self.optional,
178            self.schema.clone(),
179            metrics,
180        )))
181    }
182
183    fn metrics(&self) -> Option<MetricsSet> {
184        Some(self.metrics.clone_inner())
185    }
186}
187
188/// State machine for ext_id lookup stream.
189enum ExtIdLookupState {
190    /// Initial state, ready to start lookup.
191    Init,
192    /// Executing the async lookup.
193    Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
194    /// Stream is done.
195    Done,
196}
197
198/// Stream that looks up a vertex by external ID.
199struct ExtIdLookupStream {
200    /// Graph execution context.
201    graph_ctx: Arc<GraphExecutionContext>,
202
203    /// Variable name for column prefixing.
204    variable: String,
205
206    /// External ID to look up.
207    ext_id: String,
208
209    /// Properties to materialize.
210    properties: Vec<String>,
211
212    /// Whether the lookup is optional.
213    optional: bool,
214
215    /// Output schema.
216    schema: SchemaRef,
217
218    /// Stream state.
219    state: ExtIdLookupState,
220
221    /// Metrics.
222    metrics: BaselineMetrics,
223}
224
225impl ExtIdLookupStream {
226    fn new(
227        graph_ctx: Arc<GraphExecutionContext>,
228        variable: String,
229        ext_id: String,
230        properties: Vec<String>,
231        optional: bool,
232        schema: SchemaRef,
233        metrics: BaselineMetrics,
234    ) -> Self {
235        Self {
236            graph_ctx,
237            variable,
238            ext_id,
239            properties,
240            optional,
241            schema,
242            state: ExtIdLookupState::Init,
243            metrics,
244        }
245    }
246}
247
248impl Stream for ExtIdLookupStream {
249    type Item = DFResult<RecordBatch>;
250
251    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
252        loop {
253            let state = std::mem::replace(&mut self.state, ExtIdLookupState::Done);
254
255            match state {
256                ExtIdLookupState::Init => {
257                    // Clone data for the async block
258                    let graph_ctx = self.graph_ctx.clone();
259                    let variable = self.variable.clone();
260                    let ext_id = self.ext_id.clone();
261                    let properties = self.properties.clone();
262                    let optional = self.optional;
263                    let schema = self.schema.clone();
264
265                    let fut = async move {
266                        // Check timeout
267                        graph_ctx.check_timeout().map_err(|e| {
268                            datafusion::error::DataFusionError::Execution(e.to_string())
269                        })?;
270
271                        execute_lookup(
272                            &graph_ctx,
273                            &variable,
274                            &ext_id,
275                            &properties,
276                            optional,
277                            &schema,
278                        )
279                        .await
280                    };
281
282                    self.state = ExtIdLookupState::Executing(Box::pin(fut));
283                    // Continue loop to poll the future
284                }
285                ExtIdLookupState::Executing(mut fut) => match fut.as_mut().poll(cx) {
286                    Poll::Ready(Ok(batch)) => {
287                        self.state = ExtIdLookupState::Done;
288                        self.metrics
289                            .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
290                        return Poll::Ready(batch.map(Ok));
291                    }
292                    Poll::Ready(Err(e)) => {
293                        self.state = ExtIdLookupState::Done;
294                        return Poll::Ready(Some(Err(e)));
295                    }
296                    Poll::Pending => {
297                        self.state = ExtIdLookupState::Executing(fut);
298                        return Poll::Pending;
299                    }
300                },
301                ExtIdLookupState::Done => {
302                    return Poll::Ready(None);
303                }
304            }
305        }
306    }
307}
308
309impl RecordBatchStream for ExtIdLookupStream {
310    fn schema(&self) -> SchemaRef {
311        self.schema.clone()
312    }
313}
314
315/// Execute the ext_id lookup and materialize properties.
316async fn execute_lookup(
317    graph_ctx: &GraphExecutionContext,
318    variable: &str,
319    ext_id: &str,
320    properties: &[String],
321    optional: bool,
322    schema: &SchemaRef,
323) -> DFResult<Option<RecordBatch>> {
324    let storage = graph_ctx.storage();
325
326    // Look up vertex by ext_id with snapshot isolation
327    let found_vid = storage
328        .find_vertex_by_ext_id(ext_id)
329        .await
330        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
331
332    let Some(vid) = found_vid else {
333        // No match found
334        if optional {
335            return Ok(Some(build_null_row(variable, properties, schema)?));
336        }
337        return Ok(Some(RecordBatch::new_empty(schema.clone())));
338    };
339
340    // Load properties
341    let property_manager = graph_ctx.property_manager();
342    let query_ctx = graph_ctx.query_context();
343
344    let props_opt = property_manager
345        .get_all_vertex_props_with_ctx(vid, Some(&query_ctx))
346        .await
347        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
348
349    let Some(props) = props_opt else {
350        // Vertex was deleted
351        if optional {
352            return Ok(Some(build_null_row(variable, properties, schema)?));
353        }
354        return Ok(Some(RecordBatch::new_empty(schema.clone())));
355    };
356
357    // Get labels for the vertex
358    let labels = storage
359        .find_vertex_labels_by_vid(vid)
360        .await
361        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
362        .unwrap_or_default();
363
364    let label_name = labels
365        .first()
366        .cloned()
367        .unwrap_or_else(|| "Unknown".to_string());
368
369    // Build the result batch
370    let batch = build_result_row(
371        vid,
372        ext_id,
373        &label_name,
374        &props,
375        variable,
376        properties,
377        schema,
378    )?;
379    Ok(Some(batch))
380}
381
382/// Build a null row for optional matches with no result.
383fn build_null_row(
384    _variable: &str,
385    _properties: &[String],
386    schema: &SchemaRef,
387) -> DFResult<RecordBatch> {
388    // For optional match with no result, we return a row with nulls
389    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
390
391    // All columns are null for the optional case
392    for field in schema.fields().iter() {
393        match field.data_type() {
394            DataType::UInt64 => {
395                columns.push(Arc::new(arrow_array::UInt64Array::from(vec![
396                    None as Option<u64>,
397                ])));
398            }
399            DataType::Utf8 => {
400                let mut builder = StringBuilder::new();
401                builder.append_null();
402                columns.push(Arc::new(builder.finish()));
403            }
404            _ => {
405                let mut builder = StringBuilder::new();
406                builder.append_null();
407                columns.push(Arc::new(builder.finish()));
408            }
409        }
410    }
411
412    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
413}
414
415/// Build a result row with the found vertex data.
416fn build_result_row(
417    vid: Vid,
418    ext_id: &str,
419    label: &str,
420    props: &HashMap<String, uni_common::Value>,
421    _variable: &str,
422    properties: &[String],
423    schema: &SchemaRef,
424) -> DFResult<RecordBatch> {
425    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
426
427    // _vid column
428    columns.push(Arc::new(UInt64Array::from(vec![vid.as_u64()])));
429
430    // ext_id column
431    let mut ext_id_builder = StringBuilder::new();
432    ext_id_builder.append_value(ext_id);
433    columns.push(Arc::new(ext_id_builder.finish()));
434
435    // _label column
436    let mut label_builder = StringBuilder::new();
437    label_builder.append_value(label);
438    columns.push(Arc::new(label_builder.finish()));
439
440    // Property columns
441    for prop in properties {
442        let mut builder = StringBuilder::new();
443        if let Some(val) = props.get(prop) {
444            match val {
445                uni_common::Value::String(s) => builder.append_value(s),
446                uni_common::Value::Null => builder.append_null(),
447                other => builder.append_value(other.to_string()),
448            }
449        } else {
450            builder.append_null();
451        }
452        columns.push(Arc::new(builder.finish()));
453    }
454
455    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461
462    #[test]
463    fn test_build_schema() {
464        let schema =
465            GraphExtIdLookupExec::build_schema("n", &["name".to_string(), "age".to_string()]);
466
467        assert_eq!(schema.fields().len(), 5);
468        assert_eq!(schema.field(0).name(), "n._vid");
469        assert_eq!(schema.field(1).name(), "n.ext_id");
470        assert_eq!(schema.field(2).name(), "n._label");
471        assert_eq!(schema.field(3).name(), "n.name");
472        assert_eq!(schema.field(4).name(), "n.age");
473    }
474}