Skip to main content

uni_query/query/df_graph/
ext_id_lookup.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! External ID lookup execution plan for DataFusion.
5//!
6//! This module provides [`GraphExtIdLookupExec`], a DataFusion [`ExecutionPlan`] that
7//! looks up a single vertex by its external ID (`ext_id`) in the main vertices table.
8//!
9//! # Column Naming Convention
10//!
11//! The output schema includes:
12//! - `_vid` - vertex ID
13//! - `ext_id` - external ID
14//! - `_label` - vertex label name
15//! - `{variable}.{property}` - materialized properties
16
17use crate::query::df_graph::GraphExecutionContext;
18use crate::query::df_graph::common::{arrow_err, compute_plan_properties};
19use arrow_array::builder::StringBuilder;
20use arrow_array::{ArrayRef, RecordBatch, UInt64Array};
21use arrow_schema::{DataType, Field, Schema, SchemaRef};
22use datafusion::common::Result as DFResult;
23use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
24use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
26use futures::Stream;
27use std::any::Any;
28use std::collections::HashMap;
29use std::fmt;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33use uni_common::core::id::Vid;
34
35/// Execution plan for looking up a vertex by external ID.
36///
37/// Queries the main vertices table to find a vertex matching the given `ext_id`,
38/// then materializes the specified properties.
39pub struct GraphExtIdLookupExec {
40    /// Graph execution context for storage access.
41    graph_ctx: Arc<GraphExecutionContext>,
42
43    /// Variable name for column prefixing.
44    variable: String,
45
46    /// External ID to look up.
47    ext_id: String,
48
49    /// Properties to materialize.
50    projected_properties: Vec<String>,
51
52    /// Whether the lookup is optional (OPTIONAL MATCH).
53    optional: bool,
54
55    /// Output schema.
56    schema: SchemaRef,
57
58    /// Plan properties (cached).
59    properties: Arc<PlanProperties>,
60
61    /// Execution metrics.
62    metrics: ExecutionPlanMetricsSet,
63}
64
65impl fmt::Debug for GraphExtIdLookupExec {
66    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        f.debug_struct("GraphExtIdLookupExec")
68            .field("variable", &self.variable)
69            .field("ext_id", &self.ext_id)
70            .field("projected_properties", &self.projected_properties)
71            .field("optional", &self.optional)
72            .finish()
73    }
74}
75
76impl GraphExtIdLookupExec {
77    /// Create a new external ID lookup executor.
78    pub fn new(
79        graph_ctx: Arc<GraphExecutionContext>,
80        variable: impl Into<String>,
81        ext_id: impl Into<String>,
82        projected_properties: Vec<String>,
83        optional: bool,
84    ) -> Self {
85        let variable = variable.into();
86        let ext_id = ext_id.into();
87
88        // Build output schema
89        let schema = Self::build_schema(&variable, &projected_properties);
90        let properties = compute_plan_properties(schema.clone());
91
92        Self {
93            graph_ctx,
94            variable,
95            ext_id,
96            projected_properties,
97            optional,
98            schema,
99            properties,
100            metrics: ExecutionPlanMetricsSet::new(),
101        }
102    }
103
104    /// Build the output schema.
105    fn build_schema(variable: &str, properties: &[String]) -> SchemaRef {
106        let mut fields = vec![
107            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
108            Field::new(format!("{}.ext_id", variable), DataType::Utf8, false),
109            Field::new(format!("{}._label", variable), DataType::Utf8, false),
110        ];
111
112        // Add property columns with variable prefix
113        for prop in properties {
114            let col_name = format!("{}.{}", variable, prop);
115            fields.push(Field::new(&col_name, DataType::Utf8, true));
116        }
117
118        Arc::new(Schema::new(fields))
119    }
120}
121
122impl DisplayAs for GraphExtIdLookupExec {
123    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        write!(
125            f,
126            "GraphExtIdLookupExec: ext_id={}, variable={}, optional={}",
127            self.ext_id, self.variable, self.optional
128        )
129    }
130}
131
132impl ExecutionPlan for GraphExtIdLookupExec {
133    fn name(&self) -> &str {
134        "GraphExtIdLookupExec"
135    }
136
137    fn as_any(&self) -> &dyn Any {
138        self
139    }
140
141    fn schema(&self) -> SchemaRef {
142        self.schema.clone()
143    }
144
145    fn properties(&self) -> &Arc<PlanProperties> {
146        &self.properties
147    }
148
149    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
150        vec![]
151    }
152
153    fn with_new_children(
154        self: Arc<Self>,
155        children: Vec<Arc<dyn ExecutionPlan>>,
156    ) -> DFResult<Arc<dyn ExecutionPlan>> {
157        if !children.is_empty() {
158            return Err(datafusion::error::DataFusionError::Internal(
159                "GraphExtIdLookupExec has no children".to_string(),
160            ));
161        }
162        Ok(self)
163    }
164
165    fn execute(
166        &self,
167        partition: usize,
168        _context: Arc<TaskContext>,
169    ) -> DFResult<SendableRecordBatchStream> {
170        let metrics = BaselineMetrics::new(&self.metrics, partition);
171
172        Ok(Box::pin(ExtIdLookupStream::new(
173            self.graph_ctx.clone(),
174            self.variable.clone(),
175            self.ext_id.clone(),
176            self.projected_properties.clone(),
177            self.optional,
178            self.schema.clone(),
179            metrics,
180        )))
181    }
182
183    fn metrics(&self) -> Option<MetricsSet> {
184        Some(self.metrics.clone_inner())
185    }
186}
187
188/// State machine for ext_id lookup stream.
189enum ExtIdLookupState {
190    /// Initial state, ready to start lookup.
191    Init,
192    /// Executing the async lookup.
193    Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
194    /// Stream is done.
195    Done,
196}
197
198/// Stream that looks up a vertex by external ID.
199struct ExtIdLookupStream {
200    /// Graph execution context.
201    graph_ctx: Arc<GraphExecutionContext>,
202
203    /// Variable name for column prefixing.
204    variable: String,
205
206    /// External ID to look up.
207    ext_id: String,
208
209    /// Properties to materialize.
210    properties: Vec<String>,
211
212    /// Whether the lookup is optional.
213    optional: bool,
214
215    /// Output schema.
216    schema: SchemaRef,
217
218    /// Stream state.
219    state: ExtIdLookupState,
220
221    /// Metrics.
222    metrics: BaselineMetrics,
223}
224
225impl ExtIdLookupStream {
226    fn new(
227        graph_ctx: Arc<GraphExecutionContext>,
228        variable: String,
229        ext_id: String,
230        properties: Vec<String>,
231        optional: bool,
232        schema: SchemaRef,
233        metrics: BaselineMetrics,
234    ) -> Self {
235        Self {
236            graph_ctx,
237            variable,
238            ext_id,
239            properties,
240            optional,
241            schema,
242            state: ExtIdLookupState::Init,
243            metrics,
244        }
245    }
246}
247
248impl Stream for ExtIdLookupStream {
249    type Item = DFResult<RecordBatch>;
250
251    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
252        let metrics = self.metrics.clone();
253        let _timer = metrics.elapsed_compute().timer();
254        loop {
255            let state = std::mem::replace(&mut self.state, ExtIdLookupState::Done);
256
257            match state {
258                ExtIdLookupState::Init => {
259                    // Clone data for the async block
260                    let graph_ctx = self.graph_ctx.clone();
261                    let variable = self.variable.clone();
262                    let ext_id = self.ext_id.clone();
263                    let properties = self.properties.clone();
264                    let optional = self.optional;
265                    let schema = self.schema.clone();
266
267                    let fut = async move {
268                        // Check timeout
269                        graph_ctx.check_timeout().map_err(|e| {
270                            datafusion::error::DataFusionError::Execution(e.to_string())
271                        })?;
272
273                        execute_lookup(
274                            &graph_ctx,
275                            &variable,
276                            &ext_id,
277                            &properties,
278                            optional,
279                            &schema,
280                        )
281                        .await
282                    };
283
284                    self.state = ExtIdLookupState::Executing(Box::pin(fut));
285                    // Continue loop to poll the future
286                }
287                ExtIdLookupState::Executing(mut fut) => match fut.as_mut().poll(cx) {
288                    Poll::Ready(Ok(batch)) => {
289                        self.state = ExtIdLookupState::Done;
290                        self.metrics
291                            .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
292                        return Poll::Ready(batch.map(Ok));
293                    }
294                    Poll::Ready(Err(e)) => {
295                        self.state = ExtIdLookupState::Done;
296                        return Poll::Ready(Some(Err(e)));
297                    }
298                    Poll::Pending => {
299                        self.state = ExtIdLookupState::Executing(fut);
300                        return Poll::Pending;
301                    }
302                },
303                ExtIdLookupState::Done => {
304                    return Poll::Ready(None);
305                }
306            }
307        }
308    }
309}
310
311impl RecordBatchStream for ExtIdLookupStream {
312    fn schema(&self) -> SchemaRef {
313        self.schema.clone()
314    }
315}
316
317/// Execute the ext_id lookup and materialize properties.
318async fn execute_lookup(
319    graph_ctx: &GraphExecutionContext,
320    variable: &str,
321    ext_id: &str,
322    properties: &[String],
323    optional: bool,
324    schema: &SchemaRef,
325) -> DFResult<Option<RecordBatch>> {
326    let storage = graph_ctx.storage();
327
328    // Look up vertex by ext_id with snapshot isolation
329    let found_vid = storage
330        .find_vertex_by_ext_id(ext_id)
331        .await
332        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
333
334    let Some(vid) = found_vid else {
335        // No match found
336        if optional {
337            return Ok(Some(build_null_row(variable, properties, schema)?));
338        }
339        return Ok(Some(RecordBatch::new_empty(schema.clone())));
340    };
341
342    // Load properties
343    let property_manager = graph_ctx.property_manager();
344    let query_ctx = graph_ctx.query_context();
345
346    let props_opt = property_manager
347        .get_all_vertex_props_with_ctx(vid, Some(&query_ctx))
348        .await
349        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
350
351    let Some(props) = props_opt else {
352        // Vertex was deleted
353        if optional {
354            return Ok(Some(build_null_row(variable, properties, schema)?));
355        }
356        return Ok(Some(RecordBatch::new_empty(schema.clone())));
357    };
358
359    // Get labels for the vertex
360    let labels = storage
361        .find_vertex_labels_by_vid(vid)
362        .await
363        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
364        .unwrap_or_default();
365
366    let label_name = labels
367        .first()
368        .cloned()
369        .unwrap_or_else(|| "Unknown".to_string());
370
371    // Build the result batch
372    let batch = build_result_row(
373        vid,
374        ext_id,
375        &label_name,
376        &props,
377        variable,
378        properties,
379        schema,
380    )?;
381    Ok(Some(batch))
382}
383
384/// Build a null row for optional matches with no result.
385fn build_null_row(
386    _variable: &str,
387    _properties: &[String],
388    schema: &SchemaRef,
389) -> DFResult<RecordBatch> {
390    // For optional match with no result, we return a row with nulls
391    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
392
393    // All columns are null for the optional case
394    for field in schema.fields().iter() {
395        match field.data_type() {
396            DataType::UInt64 => {
397                columns.push(Arc::new(arrow_array::UInt64Array::from(vec![
398                    None as Option<u64>,
399                ])));
400            }
401            DataType::Utf8 => {
402                let mut builder = StringBuilder::new();
403                builder.append_null();
404                columns.push(Arc::new(builder.finish()));
405            }
406            _ => {
407                let mut builder = StringBuilder::new();
408                builder.append_null();
409                columns.push(Arc::new(builder.finish()));
410            }
411        }
412    }
413
414    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
415}
416
417/// Build a result row with the found vertex data.
418fn build_result_row(
419    vid: Vid,
420    ext_id: &str,
421    label: &str,
422    props: &HashMap<String, uni_common::Value>,
423    _variable: &str,
424    properties: &[String],
425    schema: &SchemaRef,
426) -> DFResult<RecordBatch> {
427    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
428
429    // _vid column
430    columns.push(Arc::new(UInt64Array::from(vec![vid.as_u64()])));
431
432    // ext_id column
433    let mut ext_id_builder = StringBuilder::new();
434    ext_id_builder.append_value(ext_id);
435    columns.push(Arc::new(ext_id_builder.finish()));
436
437    // _label column
438    let mut label_builder = StringBuilder::new();
439    label_builder.append_value(label);
440    columns.push(Arc::new(label_builder.finish()));
441
442    // Property columns
443    for prop in properties {
444        let mut builder = StringBuilder::new();
445        if let Some(val) = props.get(prop) {
446            match val {
447                uni_common::Value::String(s) => builder.append_value(s),
448                uni_common::Value::Null => builder.append_null(),
449                other => builder.append_value(other.to_string()),
450            }
451        } else {
452            builder.append_null();
453        }
454        columns.push(Arc::new(builder.finish()));
455    }
456
457    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463
464    #[test]
465    fn test_build_schema() {
466        let schema =
467            GraphExtIdLookupExec::build_schema("n", &["name".to_string(), "age".to_string()]);
468
469        assert_eq!(schema.fields().len(), 5);
470        assert_eq!(schema.field(0).name(), "n._vid");
471        assert_eq!(schema.field(1).name(), "n.ext_id");
472        assert_eq!(schema.field(2).name(), "n._label");
473        assert_eq!(schema.field(3).name(), "n.name");
474        assert_eq!(schema.field(4).name(), "n.age");
475    }
476}