uni_query/query/df_graph/
ext_id_lookup.rs1use crate::query::df_graph::GraphExecutionContext;
18use crate::query::df_graph::common::{arrow_err, compute_plan_properties};
19use arrow_array::builder::StringBuilder;
20use arrow_array::{ArrayRef, RecordBatch, UInt64Array};
21use arrow_schema::{DataType, Field, Schema, SchemaRef};
22use datafusion::common::Result as DFResult;
23use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
24use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
26use futures::Stream;
27use std::any::Any;
28use std::collections::HashMap;
29use std::fmt;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::task::{Context, Poll};
33use uni_common::core::id::Vid;
34
35pub struct GraphExtIdLookupExec {
40 graph_ctx: Arc<GraphExecutionContext>,
42
43 variable: String,
45
46 ext_id: String,
48
49 projected_properties: Vec<String>,
51
52 optional: bool,
54
55 schema: SchemaRef,
57
58 properties: Arc<PlanProperties>,
60
61 metrics: ExecutionPlanMetricsSet,
63}
64
65impl fmt::Debug for GraphExtIdLookupExec {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 f.debug_struct("GraphExtIdLookupExec")
68 .field("variable", &self.variable)
69 .field("ext_id", &self.ext_id)
70 .field("projected_properties", &self.projected_properties)
71 .field("optional", &self.optional)
72 .finish()
73 }
74}
75
76impl GraphExtIdLookupExec {
77 pub fn new(
79 graph_ctx: Arc<GraphExecutionContext>,
80 variable: impl Into<String>,
81 ext_id: impl Into<String>,
82 projected_properties: Vec<String>,
83 optional: bool,
84 ) -> Self {
85 let variable = variable.into();
86 let ext_id = ext_id.into();
87
88 let schema = Self::build_schema(&variable, &projected_properties);
90 let properties = compute_plan_properties(schema.clone());
91
92 Self {
93 graph_ctx,
94 variable,
95 ext_id,
96 projected_properties,
97 optional,
98 schema,
99 properties,
100 metrics: ExecutionPlanMetricsSet::new(),
101 }
102 }
103
104 fn build_schema(variable: &str, properties: &[String]) -> SchemaRef {
106 let mut fields = vec![
107 Field::new(format!("{}._vid", variable), DataType::UInt64, false),
108 Field::new(format!("{}.ext_id", variable), DataType::Utf8, false),
109 Field::new(format!("{}._label", variable), DataType::Utf8, false),
110 ];
111
112 for prop in properties {
114 let col_name = format!("{}.{}", variable, prop);
115 fields.push(Field::new(&col_name, DataType::Utf8, true));
116 }
117
118 Arc::new(Schema::new(fields))
119 }
120}
121
122impl DisplayAs for GraphExtIdLookupExec {
123 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 write!(
125 f,
126 "GraphExtIdLookupExec: ext_id={}, variable={}, optional={}",
127 self.ext_id, self.variable, self.optional
128 )
129 }
130}
131
132impl ExecutionPlan for GraphExtIdLookupExec {
133 fn name(&self) -> &str {
134 "GraphExtIdLookupExec"
135 }
136
137 fn as_any(&self) -> &dyn Any {
138 self
139 }
140
141 fn schema(&self) -> SchemaRef {
142 self.schema.clone()
143 }
144
145 fn properties(&self) -> &Arc<PlanProperties> {
146 &self.properties
147 }
148
149 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
150 vec![]
151 }
152
153 fn with_new_children(
154 self: Arc<Self>,
155 children: Vec<Arc<dyn ExecutionPlan>>,
156 ) -> DFResult<Arc<dyn ExecutionPlan>> {
157 if !children.is_empty() {
158 return Err(datafusion::error::DataFusionError::Internal(
159 "GraphExtIdLookupExec has no children".to_string(),
160 ));
161 }
162 Ok(self)
163 }
164
165 fn execute(
166 &self,
167 partition: usize,
168 _context: Arc<TaskContext>,
169 ) -> DFResult<SendableRecordBatchStream> {
170 let metrics = BaselineMetrics::new(&self.metrics, partition);
171
172 Ok(Box::pin(ExtIdLookupStream::new(
173 self.graph_ctx.clone(),
174 self.variable.clone(),
175 self.ext_id.clone(),
176 self.projected_properties.clone(),
177 self.optional,
178 self.schema.clone(),
179 metrics,
180 )))
181 }
182
183 fn metrics(&self) -> Option<MetricsSet> {
184 Some(self.metrics.clone_inner())
185 }
186}
187
188enum ExtIdLookupState {
190 Init,
192 Executing(Pin<Box<dyn std::future::Future<Output = DFResult<Option<RecordBatch>>> + Send>>),
194 Done,
196}
197
198struct ExtIdLookupStream {
200 graph_ctx: Arc<GraphExecutionContext>,
202
203 variable: String,
205
206 ext_id: String,
208
209 properties: Vec<String>,
211
212 optional: bool,
214
215 schema: SchemaRef,
217
218 state: ExtIdLookupState,
220
221 metrics: BaselineMetrics,
223}
224
225impl ExtIdLookupStream {
226 fn new(
227 graph_ctx: Arc<GraphExecutionContext>,
228 variable: String,
229 ext_id: String,
230 properties: Vec<String>,
231 optional: bool,
232 schema: SchemaRef,
233 metrics: BaselineMetrics,
234 ) -> Self {
235 Self {
236 graph_ctx,
237 variable,
238 ext_id,
239 properties,
240 optional,
241 schema,
242 state: ExtIdLookupState::Init,
243 metrics,
244 }
245 }
246}
247
248impl Stream for ExtIdLookupStream {
249 type Item = DFResult<RecordBatch>;
250
251 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
252 let metrics = self.metrics.clone();
253 let _timer = metrics.elapsed_compute().timer();
254 loop {
255 let state = std::mem::replace(&mut self.state, ExtIdLookupState::Done);
256
257 match state {
258 ExtIdLookupState::Init => {
259 let graph_ctx = self.graph_ctx.clone();
261 let variable = self.variable.clone();
262 let ext_id = self.ext_id.clone();
263 let properties = self.properties.clone();
264 let optional = self.optional;
265 let schema = self.schema.clone();
266
267 let fut = async move {
268 graph_ctx.check_timeout().map_err(|e| {
270 datafusion::error::DataFusionError::Execution(e.to_string())
271 })?;
272
273 execute_lookup(
274 &graph_ctx,
275 &variable,
276 &ext_id,
277 &properties,
278 optional,
279 &schema,
280 )
281 .await
282 };
283
284 self.state = ExtIdLookupState::Executing(Box::pin(fut));
285 }
287 ExtIdLookupState::Executing(mut fut) => match fut.as_mut().poll(cx) {
288 Poll::Ready(Ok(batch)) => {
289 self.state = ExtIdLookupState::Done;
290 self.metrics
291 .record_output(batch.as_ref().map(|b| b.num_rows()).unwrap_or(0));
292 return Poll::Ready(batch.map(Ok));
293 }
294 Poll::Ready(Err(e)) => {
295 self.state = ExtIdLookupState::Done;
296 return Poll::Ready(Some(Err(e)));
297 }
298 Poll::Pending => {
299 self.state = ExtIdLookupState::Executing(fut);
300 return Poll::Pending;
301 }
302 },
303 ExtIdLookupState::Done => {
304 return Poll::Ready(None);
305 }
306 }
307 }
308 }
309}
310
311impl RecordBatchStream for ExtIdLookupStream {
312 fn schema(&self) -> SchemaRef {
313 self.schema.clone()
314 }
315}
316
317async fn execute_lookup(
319 graph_ctx: &GraphExecutionContext,
320 variable: &str,
321 ext_id: &str,
322 properties: &[String],
323 optional: bool,
324 schema: &SchemaRef,
325) -> DFResult<Option<RecordBatch>> {
326 let storage = graph_ctx.storage();
327
328 let found_vid = storage
330 .find_vertex_by_ext_id(ext_id)
331 .await
332 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
333
334 let Some(vid) = found_vid else {
335 if optional {
337 return Ok(Some(build_null_row(variable, properties, schema)?));
338 }
339 return Ok(Some(RecordBatch::new_empty(schema.clone())));
340 };
341
342 let property_manager = graph_ctx.property_manager();
344 let query_ctx = graph_ctx.query_context();
345
346 let props_opt = property_manager
347 .get_all_vertex_props_with_ctx(vid, Some(&query_ctx))
348 .await
349 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
350
351 let Some(props) = props_opt else {
352 if optional {
354 return Ok(Some(build_null_row(variable, properties, schema)?));
355 }
356 return Ok(Some(RecordBatch::new_empty(schema.clone())));
357 };
358
359 let labels = storage
361 .find_vertex_labels_by_vid(vid)
362 .await
363 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
364 .unwrap_or_default();
365
366 let label_name = labels
367 .first()
368 .cloned()
369 .unwrap_or_else(|| "Unknown".to_string());
370
371 let batch = build_result_row(
373 vid,
374 ext_id,
375 &label_name,
376 &props,
377 variable,
378 properties,
379 schema,
380 )?;
381 Ok(Some(batch))
382}
383
384fn build_null_row(
386 _variable: &str,
387 _properties: &[String],
388 schema: &SchemaRef,
389) -> DFResult<RecordBatch> {
390 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
392
393 for field in schema.fields().iter() {
395 match field.data_type() {
396 DataType::UInt64 => {
397 columns.push(Arc::new(arrow_array::UInt64Array::from(vec![
398 None as Option<u64>,
399 ])));
400 }
401 DataType::Utf8 => {
402 let mut builder = StringBuilder::new();
403 builder.append_null();
404 columns.push(Arc::new(builder.finish()));
405 }
406 _ => {
407 let mut builder = StringBuilder::new();
408 builder.append_null();
409 columns.push(Arc::new(builder.finish()));
410 }
411 }
412 }
413
414 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
415}
416
417fn build_result_row(
419 vid: Vid,
420 ext_id: &str,
421 label: &str,
422 props: &HashMap<String, uni_common::Value>,
423 _variable: &str,
424 properties: &[String],
425 schema: &SchemaRef,
426) -> DFResult<RecordBatch> {
427 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
428
429 columns.push(Arc::new(UInt64Array::from(vec![vid.as_u64()])));
431
432 let mut ext_id_builder = StringBuilder::new();
434 ext_id_builder.append_value(ext_id);
435 columns.push(Arc::new(ext_id_builder.finish()));
436
437 let mut label_builder = StringBuilder::new();
439 label_builder.append_value(label);
440 columns.push(Arc::new(label_builder.finish()));
441
442 for prop in properties {
444 let mut builder = StringBuilder::new();
445 if let Some(val) = props.get(prop) {
446 match val {
447 uni_common::Value::String(s) => builder.append_value(s),
448 uni_common::Value::Null => builder.append_null(),
449 other => builder.append_value(other.to_string()),
450 }
451 } else {
452 builder.append_null();
453 }
454 columns.push(Arc::new(builder.finish()));
455 }
456
457 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463
464 #[test]
465 fn test_build_schema() {
466 let schema =
467 GraphExtIdLookupExec::build_schema("n", &["name".to_string(), "age".to_string()]);
468
469 assert_eq!(schema.fields().len(), 5);
470 assert_eq!(schema.field(0).name(), "n._vid");
471 assert_eq!(schema.field(1).name(), "n.ext_id");
472 assert_eq!(schema.field(2).name(), "n._label");
473 assert_eq!(schema.field(3).name(), "n.name");
474 assert_eq!(schema.field(4).name(), "n.age");
475 }
476}