uni_query/query/df_graph/
ext_id_lookup.rs1use crate::query::df_graph::GraphExecutionContext;
18use crate::query::df_graph::common::{arrow_err, compute_plan_properties};
19use arrow_array::builder::StringBuilder;
20use arrow_array::{ArrayRef, RecordBatch, UInt64Array};
21use arrow_schema::{DataType, Field, Schema, SchemaRef};
22use datafusion::common::Result as DFResult;
23use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
24use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
26use futures::Stream;
27use std::any::Any;
28use std::collections::HashMap;
29use std::fmt;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33use uni_common::core::id::Vid;
34use uni_store::storage::main_vertex::MainVertexDataset;
35
36pub struct GraphExtIdLookupExec {
41 graph_ctx: Arc<GraphExecutionContext>,
43
44 variable: String,
46
47 ext_id: String,
49
50 projected_properties: Vec<String>,
52
53 optional: bool,
55
56 schema: SchemaRef,
58
59 properties: PlanProperties,
61
62 metrics: ExecutionPlanMetricsSet,
64}
65
66impl fmt::Debug for GraphExtIdLookupExec {
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 f.debug_struct("GraphExtIdLookupExec")
69 .field("variable", &self.variable)
70 .field("ext_id", &self.ext_id)
71 .field("projected_properties", &self.projected_properties)
72 .field("optional", &self.optional)
73 .finish()
74 }
75}
76
77impl GraphExtIdLookupExec {
78 pub fn new(
80 graph_ctx: Arc<GraphExecutionContext>,
81 variable: impl Into<String>,
82 ext_id: impl Into<String>,
83 projected_properties: Vec<String>,
84 optional: bool,
85 ) -> Self {
86 let variable = variable.into();
87 let ext_id = ext_id.into();
88
89 let schema = Self::build_schema(&variable, &projected_properties);
91 let properties = compute_plan_properties(schema.clone());
92
93 Self {
94 graph_ctx,
95 variable,
96 ext_id,
97 projected_properties,
98 optional,
99 schema,
100 properties,
101 metrics: ExecutionPlanMetricsSet::new(),
102 }
103 }
104
105 fn build_schema(variable: &str, properties: &[String]) -> SchemaRef {
107 let mut fields = vec![
108 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
109 Field::new(format!("{}.ext_id", variable), DataType::Utf8, false),
110 Field::new(format!("{}._label", variable), DataType::Utf8, false),
111 ];
112
113 for prop in properties {
115 let col_name = format!("{}.{}", variable, prop);
116 fields.push(Field::new(&col_name, DataType::Utf8, true));
117 }
118
119 Arc::new(Schema::new(fields))
120 }
121}
122
123impl DisplayAs for GraphExtIdLookupExec {
124 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 write!(
126 f,
127 "GraphExtIdLookupExec: ext_id={}, variable={}, optional={}",
128 self.ext_id, self.variable, self.optional
129 )
130 }
131}
132
133impl ExecutionPlan for GraphExtIdLookupExec {
134 fn name(&self) -> &str {
135 "GraphExtIdLookupExec"
136 }
137
138 fn as_any(&self) -> &dyn Any {
139 self
140 }
141
142 fn schema(&self) -> SchemaRef {
143 self.schema.clone()
144 }
145
146 fn properties(&self) -> &PlanProperties {
147 &self.properties
148 }
149
150 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
151 vec![]
152 }
153
154 fn with_new_children(
155 self: Arc<Self>,
156 children: Vec<Arc<dyn ExecutionPlan>>,
157 ) -> DFResult<Arc<dyn ExecutionPlan>> {
158 if !children.is_empty() {
159 return Err(datafusion::error::DataFusionError::Internal(
160 "GraphExtIdLookupExec has no children".to_string(),
161 ));
162 }
163 Ok(self)
164 }
165
166 fn execute(
167 &self,
168 partition: usize,
169 _context: Arc<TaskContext>,
170 ) -> DFResult<SendableRecordBatchStream> {
171 let metrics = BaselineMetrics::new(&self.metrics, partition);
172
173 Ok(Box::pin(ExtIdLookupStream::new(
174 self.graph_ctx.clone(),
175 self.variable.clone(),
176 self.ext_id.clone(),
177 self.projected_properties.clone(),
178 self.optional,
179 self.schema.clone(),
180 metrics,
181 )))
182 }
183
184 fn metrics(&self) -> Option<MetricsSet> {
185 Some(self.metrics.clone_inner())
186 }
187}
188
189enum ExtIdLookupState {
191 Init,
193 Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
195 Done,
197}
198
199struct ExtIdLookupStream {
201 graph_ctx: Arc<GraphExecutionContext>,
203
204 variable: String,
206
207 ext_id: String,
209
210 properties: Vec<String>,
212
213 optional: bool,
215
216 schema: SchemaRef,
218
219 state: ExtIdLookupState,
221
222 metrics: BaselineMetrics,
224}
225
226impl ExtIdLookupStream {
227 fn new(
228 graph_ctx: Arc<GraphExecutionContext>,
229 variable: String,
230 ext_id: String,
231 properties: Vec<String>,
232 optional: bool,
233 schema: SchemaRef,
234 metrics: BaselineMetrics,
235 ) -> Self {
236 Self {
237 graph_ctx,
238 variable,
239 ext_id,
240 properties,
241 optional,
242 schema,
243 state: ExtIdLookupState::Init,
244 metrics,
245 }
246 }
247}
248
249impl Stream for ExtIdLookupStream {
250 type Item = DFResult<RecordBatch>;
251
252 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
253 loop {
254 let state = std::mem::replace(&mut self.state, ExtIdLookupState::Done);
255
256 match state {
257 ExtIdLookupState::Init => {
258 let graph_ctx = self.graph_ctx.clone();
260 let variable = self.variable.clone();
261 let ext_id = self.ext_id.clone();
262 let properties = self.properties.clone();
263 let optional = self.optional;
264 let schema = self.schema.clone();
265
266 let fut = async move {
267 graph_ctx.check_timeout().map_err(|e| {
269 datafusion::error::DataFusionError::Execution(e.to_string())
270 })?;
271
272 execute_lookup(
273 &graph_ctx,
274 &variable,
275 &ext_id,
276 &properties,
277 optional,
278 &schema,
279 )
280 .await
281 };
282
283 self.state = ExtIdLookupState::Executing(Box::pin(fut));
284 }
286 ExtIdLookupState::Executing(mut fut) => match fut.as_mut().poll(cx) {
287 Poll::Ready(Ok(batch)) => {
288 self.state = ExtIdLookupState::Done;
289 self.metrics
290 .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
291 return Poll::Ready(batch.map(Ok));
292 }
293 Poll::Ready(Err(e)) => {
294 self.state = ExtIdLookupState::Done;
295 return Poll::Ready(Some(Err(e)));
296 }
297 Poll::Pending => {
298 self.state = ExtIdLookupState::Executing(fut);
299 return Poll::Pending;
300 }
301 },
302 ExtIdLookupState::Done => {
303 return Poll::Ready(None);
304 }
305 }
306 }
307 }
308}
309
310impl RecordBatchStream for ExtIdLookupStream {
311 fn schema(&self) -> SchemaRef {
312 self.schema.clone()
313 }
314}
315
316async fn execute_lookup(
318 graph_ctx: &GraphExecutionContext,
319 variable: &str,
320 ext_id: &str,
321 properties: &[String],
322 optional: bool,
323 schema: &SchemaRef,
324) -> DFResult<Option<RecordBatch>> {
325 let storage = graph_ctx.storage();
326 let lancedb = storage.lancedb_store();
327
328 let found_vid =
330 MainVertexDataset::find_by_ext_id(lancedb, ext_id, storage.version_high_water_mark())
331 .await
332 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
333
334 let Some(vid) = found_vid else {
335 if optional {
337 return Ok(Some(build_null_row(variable, properties, schema)?));
338 }
339 return Ok(Some(RecordBatch::new_empty(schema.clone())));
340 };
341
342 let property_manager = graph_ctx.property_manager();
344 let query_ctx = graph_ctx.query_context();
345
346 let props_opt = property_manager
347 .get_all_vertex_props_with_ctx(vid, Some(&query_ctx))
348 .await
349 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
350
351 let Some(props) = props_opt else {
352 if optional {
354 return Ok(Some(build_null_row(variable, properties, schema)?));
355 }
356 return Ok(Some(RecordBatch::new_empty(schema.clone())));
357 };
358
359 let labels =
361 MainVertexDataset::find_labels_by_vid(lancedb, vid, storage.version_high_water_mark())
362 .await
363 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
364 .unwrap_or_default();
365
366 let label_name = labels
367 .first()
368 .cloned()
369 .unwrap_or_else(|| "Unknown".to_string());
370
371 let batch = build_result_row(
373 vid,
374 ext_id,
375 &label_name,
376 &props,
377 variable,
378 properties,
379 schema,
380 )?;
381 Ok(Some(batch))
382}
383
384fn build_null_row(
386 _variable: &str,
387 _properties: &[String],
388 schema: &SchemaRef,
389) -> DFResult<RecordBatch> {
390 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
392
393 for field in schema.fields().iter() {
395 match field.data_type() {
396 DataType::UInt64 => {
397 columns.push(Arc::new(arrow_array::UInt64Array::from(vec![
398 None as Option<u64>,
399 ])));
400 }
401 DataType::Utf8 => {
402 let mut builder = StringBuilder::new();
403 builder.append_null();
404 columns.push(Arc::new(builder.finish()));
405 }
406 _ => {
407 let mut builder = StringBuilder::new();
408 builder.append_null();
409 columns.push(Arc::new(builder.finish()));
410 }
411 }
412 }
413
414 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
415}
416
417fn build_result_row(
419 vid: Vid,
420 ext_id: &str,
421 label: &str,
422 props: &HashMap<String, uni_common::Value>,
423 _variable: &str,
424 properties: &[String],
425 schema: &SchemaRef,
426) -> DFResult<RecordBatch> {
427 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
428
429 columns.push(Arc::new(UInt64Array::from(vec![vid.as_u64()])));
431
432 let mut ext_id_builder = StringBuilder::new();
434 ext_id_builder.append_value(ext_id);
435 columns.push(Arc::new(ext_id_builder.finish()));
436
437 let mut label_builder = StringBuilder::new();
439 label_builder.append_value(label);
440 columns.push(Arc::new(label_builder.finish()));
441
442 for prop in properties {
444 let mut builder = StringBuilder::new();
445 if let Some(val) = props.get(prop) {
446 match val {
447 uni_common::Value::String(s) => builder.append_value(s),
448 uni_common::Value::Null => builder.append_null(),
449 other => builder.append_value(other.to_string()),
450 }
451 } else {
452 builder.append_null();
453 }
454 columns.push(Arc::new(builder.finish()));
455 }
456
457 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463
464 #[test]
465 fn test_build_schema() {
466 let schema =
467 GraphExtIdLookupExec::build_schema("n", &["name".to_string(), "age".to_string()]);
468
469 assert_eq!(schema.fields().len(), 5);
470 assert_eq!(schema.field(0).name(), "n._vid");
471 assert_eq!(schema.field(1).name(), "n.ext_id");
472 assert_eq!(schema.field(2).name(), "n._label");
473 assert_eq!(schema.field(3).name(), "n.name");
474 assert_eq!(schema.field(4).name(), "n.age");
475 }
476}