Skip to main content

uni_query/query/df_graph/
ext_id_lookup.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! External ID lookup execution plan for DataFusion.
5//!
6//! This module provides [`GraphExtIdLookupExec`], a DataFusion [`ExecutionPlan`] that
7//! looks up a single vertex by its external ID (`ext_id`) in the main vertices table.
8//!
9//! # Column Naming Convention
10//!
11//! The output schema includes:
12//! - `_vid` - vertex ID
13//! - `ext_id` - external ID
14//! - `_label` - vertex label name
15//! - `{variable}.{property}` - materialized properties
16
17use crate::query::df_graph::GraphExecutionContext;
18use crate::query::df_graph::common::{arrow_err, compute_plan_properties};
19use arrow_array::builder::StringBuilder;
20use arrow_array::{ArrayRef, RecordBatch, UInt64Array};
21use arrow_schema::{DataType, Field, Schema, SchemaRef};
22use datafusion::common::Result as DFResult;
23use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
24use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
26use futures::Stream;
27use std::any::Any;
28use std::collections::HashMap;
29use std::fmt;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33use uni_common::core::id::Vid;
34use uni_store::storage::main_vertex::MainVertexDataset;
35
36/// Execution plan for looking up a vertex by external ID.
37///
38/// Queries the main vertices table to find a vertex matching the given `ext_id`,
39/// then materializes the specified properties.
40pub struct GraphExtIdLookupExec {
41    /// Graph execution context for storage access.
42    graph_ctx: Arc<GraphExecutionContext>,
43
44    /// Variable name for column prefixing.
45    variable: String,
46
47    /// External ID to look up.
48    ext_id: String,
49
50    /// Properties to materialize.
51    projected_properties: Vec<String>,
52
53    /// Whether the lookup is optional (OPTIONAL MATCH).
54    optional: bool,
55
56    /// Output schema.
57    schema: SchemaRef,
58
59    /// Plan properties (cached).
60    properties: PlanProperties,
61
62    /// Execution metrics.
63    metrics: ExecutionPlanMetricsSet,
64}
65
66impl fmt::Debug for GraphExtIdLookupExec {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        f.debug_struct("GraphExtIdLookupExec")
69            .field("variable", &self.variable)
70            .field("ext_id", &self.ext_id)
71            .field("projected_properties", &self.projected_properties)
72            .field("optional", &self.optional)
73            .finish()
74    }
75}
76
77impl GraphExtIdLookupExec {
78    /// Create a new external ID lookup executor.
79    pub fn new(
80        graph_ctx: Arc<GraphExecutionContext>,
81        variable: impl Into<String>,
82        ext_id: impl Into<String>,
83        projected_properties: Vec<String>,
84        optional: bool,
85    ) -> Self {
86        let variable = variable.into();
87        let ext_id = ext_id.into();
88
89        // Build output schema
90        let schema = Self::build_schema(&variable, &projected_properties);
91        let properties = compute_plan_properties(schema.clone());
92
93        Self {
94            graph_ctx,
95            variable,
96            ext_id,
97            projected_properties,
98            optional,
99            schema,
100            properties,
101            metrics: ExecutionPlanMetricsSet::new(),
102        }
103    }
104
105    /// Build the output schema.
106    fn build_schema(variable: &str, properties: &[String]) -> SchemaRef {
107        let mut fields = vec![
108            Field::new(format!("{}._vid", variable), DataType::UInt64, false),
109            Field::new(format!("{}.ext_id", variable), DataType::Utf8, false),
110            Field::new(format!("{}._label", variable), DataType::Utf8, false),
111        ];
112
113        // Add property columns with variable prefix
114        for prop in properties {
115            let col_name = format!("{}.{}", variable, prop);
116            fields.push(Field::new(&col_name, DataType::Utf8, true));
117        }
118
119        Arc::new(Schema::new(fields))
120    }
121}
122
123impl DisplayAs for GraphExtIdLookupExec {
124    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        write!(
126            f,
127            "GraphExtIdLookupExec: ext_id={}, variable={}, optional={}",
128            self.ext_id, self.variable, self.optional
129        )
130    }
131}
132
133impl ExecutionPlan for GraphExtIdLookupExec {
134    fn name(&self) -> &str {
135        "GraphExtIdLookupExec"
136    }
137
138    fn as_any(&self) -> &dyn Any {
139        self
140    }
141
142    fn schema(&self) -> SchemaRef {
143        self.schema.clone()
144    }
145
146    fn properties(&self) -> &PlanProperties {
147        &self.properties
148    }
149
150    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
151        vec![]
152    }
153
154    fn with_new_children(
155        self: Arc<Self>,
156        children: Vec<Arc<dyn ExecutionPlan>>,
157    ) -> DFResult<Arc<dyn ExecutionPlan>> {
158        if !children.is_empty() {
159            return Err(datafusion::error::DataFusionError::Internal(
160                "GraphExtIdLookupExec has no children".to_string(),
161            ));
162        }
163        Ok(self)
164    }
165
166    fn execute(
167        &self,
168        partition: usize,
169        _context: Arc<TaskContext>,
170    ) -> DFResult<SendableRecordBatchStream> {
171        let metrics = BaselineMetrics::new(&self.metrics, partition);
172
173        Ok(Box::pin(ExtIdLookupStream::new(
174            self.graph_ctx.clone(),
175            self.variable.clone(),
176            self.ext_id.clone(),
177            self.projected_properties.clone(),
178            self.optional,
179            self.schema.clone(),
180            metrics,
181        )))
182    }
183
184    fn metrics(&self) -> Option<MetricsSet> {
185        Some(self.metrics.clone_inner())
186    }
187}
188
189/// State machine for ext_id lookup stream.
190enum ExtIdLookupState {
191    /// Initial state, ready to start lookup.
192    Init,
193    /// Executing the async lookup.
194    Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
195    /// Stream is done.
196    Done,
197}
198
199/// Stream that looks up a vertex by external ID.
200struct ExtIdLookupStream {
201    /// Graph execution context.
202    graph_ctx: Arc<GraphExecutionContext>,
203
204    /// Variable name for column prefixing.
205    variable: String,
206
207    /// External ID to look up.
208    ext_id: String,
209
210    /// Properties to materialize.
211    properties: Vec<String>,
212
213    /// Whether the lookup is optional.
214    optional: bool,
215
216    /// Output schema.
217    schema: SchemaRef,
218
219    /// Stream state.
220    state: ExtIdLookupState,
221
222    /// Metrics.
223    metrics: BaselineMetrics,
224}
225
226impl ExtIdLookupStream {
227    fn new(
228        graph_ctx: Arc<GraphExecutionContext>,
229        variable: String,
230        ext_id: String,
231        properties: Vec<String>,
232        optional: bool,
233        schema: SchemaRef,
234        metrics: BaselineMetrics,
235    ) -> Self {
236        Self {
237            graph_ctx,
238            variable,
239            ext_id,
240            properties,
241            optional,
242            schema,
243            state: ExtIdLookupState::Init,
244            metrics,
245        }
246    }
247}
248
249impl Stream for ExtIdLookupStream {
250    type Item = DFResult<RecordBatch>;
251
252    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
253        loop {
254            let state = std::mem::replace(&mut self.state, ExtIdLookupState::Done);
255
256            match state {
257                ExtIdLookupState::Init => {
258                    // Clone data for the async block
259                    let graph_ctx = self.graph_ctx.clone();
260                    let variable = self.variable.clone();
261                    let ext_id = self.ext_id.clone();
262                    let properties = self.properties.clone();
263                    let optional = self.optional;
264                    let schema = self.schema.clone();
265
266                    let fut = async move {
267                        // Check timeout
268                        graph_ctx.check_timeout().map_err(|e| {
269                            datafusion::error::DataFusionError::Execution(e.to_string())
270                        })?;
271
272                        execute_lookup(
273                            &graph_ctx,
274                            &variable,
275                            &ext_id,
276                            &properties,
277                            optional,
278                            &schema,
279                        )
280                        .await
281                    };
282
283                    self.state = ExtIdLookupState::Executing(Box::pin(fut));
284                    // Continue loop to poll the future
285                }
286                ExtIdLookupState::Executing(mut fut) => match fut.as_mut().poll(cx) {
287                    Poll::Ready(Ok(batch)) => {
288                        self.state = ExtIdLookupState::Done;
289                        self.metrics
290                            .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
291                        return Poll::Ready(batch.map(Ok));
292                    }
293                    Poll::Ready(Err(e)) => {
294                        self.state = ExtIdLookupState::Done;
295                        return Poll::Ready(Some(Err(e)));
296                    }
297                    Poll::Pending => {
298                        self.state = ExtIdLookupState::Executing(fut);
299                        return Poll::Pending;
300                    }
301                },
302                ExtIdLookupState::Done => {
303                    return Poll::Ready(None);
304                }
305            }
306        }
307    }
308}
309
310impl RecordBatchStream for ExtIdLookupStream {
311    fn schema(&self) -> SchemaRef {
312        self.schema.clone()
313    }
314}
315
316/// Execute the ext_id lookup and materialize properties.
317async fn execute_lookup(
318    graph_ctx: &GraphExecutionContext,
319    variable: &str,
320    ext_id: &str,
321    properties: &[String],
322    optional: bool,
323    schema: &SchemaRef,
324) -> DFResult<Option<RecordBatch>> {
325    let storage = graph_ctx.storage();
326    let lancedb = storage.lancedb_store();
327
328    // Look up vertex by ext_id with snapshot isolation
329    let found_vid =
330        MainVertexDataset::find_by_ext_id(lancedb, ext_id, storage.version_high_water_mark())
331            .await
332            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
333
334    let Some(vid) = found_vid else {
335        // No match found
336        if optional {
337            return Ok(Some(build_null_row(variable, properties, schema)?));
338        }
339        return Ok(Some(RecordBatch::new_empty(schema.clone())));
340    };
341
342    // Load properties
343    let property_manager = graph_ctx.property_manager();
344    let query_ctx = graph_ctx.query_context();
345
346    let props_opt = property_manager
347        .get_all_vertex_props_with_ctx(vid, Some(&query_ctx))
348        .await
349        .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
350
351    let Some(props) = props_opt else {
352        // Vertex was deleted
353        if optional {
354            return Ok(Some(build_null_row(variable, properties, schema)?));
355        }
356        return Ok(Some(RecordBatch::new_empty(schema.clone())));
357    };
358
359    // Get labels for the vertex
360    let labels =
361        MainVertexDataset::find_labels_by_vid(lancedb, vid, storage.version_high_water_mark())
362            .await
363            .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
364            .unwrap_or_default();
365
366    let label_name = labels
367        .first()
368        .cloned()
369        .unwrap_or_else(|| "Unknown".to_string());
370
371    // Build the result batch
372    let batch = build_result_row(
373        vid,
374        ext_id,
375        &label_name,
376        &props,
377        variable,
378        properties,
379        schema,
380    )?;
381    Ok(Some(batch))
382}
383
384/// Build a null row for optional matches with no result.
385fn build_null_row(
386    _variable: &str,
387    _properties: &[String],
388    schema: &SchemaRef,
389) -> DFResult<RecordBatch> {
390    // For optional match with no result, we return a row with nulls
391    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
392
393    // All columns are null for the optional case
394    for field in schema.fields().iter() {
395        match field.data_type() {
396            DataType::UInt64 => {
397                columns.push(Arc::new(arrow_array::UInt64Array::from(vec![
398                    None as Option<u64>,
399                ])));
400            }
401            DataType::Utf8 => {
402                let mut builder = StringBuilder::new();
403                builder.append_null();
404                columns.push(Arc::new(builder.finish()));
405            }
406            _ => {
407                let mut builder = StringBuilder::new();
408                builder.append_null();
409                columns.push(Arc::new(builder.finish()));
410            }
411        }
412    }
413
414    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
415}
416
417/// Build a result row with the found vertex data.
418fn build_result_row(
419    vid: Vid,
420    ext_id: &str,
421    label: &str,
422    props: &HashMap<String, uni_common::Value>,
423    _variable: &str,
424    properties: &[String],
425    schema: &SchemaRef,
426) -> DFResult<RecordBatch> {
427    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
428
429    // _vid column
430    columns.push(Arc::new(UInt64Array::from(vec![vid.as_u64()])));
431
432    // ext_id column
433    let mut ext_id_builder = StringBuilder::new();
434    ext_id_builder.append_value(ext_id);
435    columns.push(Arc::new(ext_id_builder.finish()));
436
437    // _label column
438    let mut label_builder = StringBuilder::new();
439    label_builder.append_value(label);
440    columns.push(Arc::new(label_builder.finish()));
441
442    // Property columns
443    for prop in properties {
444        let mut builder = StringBuilder::new();
445        if let Some(val) = props.get(prop) {
446            match val {
447                uni_common::Value::String(s) => builder.append_value(s),
448                uni_common::Value::Null => builder.append_null(),
449                other => builder.append_value(other.to_string()),
450            }
451        } else {
452            builder.append_null();
453        }
454        columns.push(Arc::new(builder.finish()));
455    }
456
457    RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463
464    #[test]
465    fn test_build_schema() {
466        let schema =
467            GraphExtIdLookupExec::build_schema("n", &["name".to_string(), "age".to_string()]);
468
469        assert_eq!(schema.fields().len(), 5);
470        assert_eq!(schema.field(0).name(), "n._vid");
471        assert_eq!(schema.field(1).name(), "n.ext_id");
472        assert_eq!(schema.field(2).name(), "n._label");
473        assert_eq!(schema.field(3).name(), "n.name");
474        assert_eq!(schema.field(4).name(), "n.age");
475    }
476}