1use crate::convert::{to_fb_expand_dir, to_fb_sort_dir, to_fb_vector_metric};
2use crate::expr::{create_expr_vec, serialize_expr};
3use crate::plan::common::{create_coldef_vec, create_string_vec};
4use crate::plexus_generated::plexus as fb;
5use crate::{Op, Plan, SerdeError};
6use flatbuffers::FlatBufferBuilder;
7
8pub fn serialize_plan(plan: &Plan) -> Result<Vec<u8>, SerdeError> {
9 let mut fbb = FlatBufferBuilder::new();
10
11 let producer = fbb.create_string(&plan.version.producer);
12 let version = fb::Version::create(
13 &mut fbb,
14 &fb::VersionArgs {
15 major: plan.version.major,
16 minor: plan.version.minor,
17 patch: plan.version.patch,
18 producer: Some(producer),
19 },
20 );
21
22 let mut ops_offsets = Vec::with_capacity(plan.ops.len());
23 for op in &plan.ops {
24 let (node_type, node) = match op {
25 Op::ScanNodes {
26 labels,
27 schema,
28 must_labels,
29 forbidden_labels,
30 est_rows,
31 selectivity,
32 graph_ref,
33 } => {
34 let labels_off = create_string_vec(&mut fbb, labels);
35 let schema_off = create_coldef_vec(&mut fbb, schema);
36 let must_labels_off = create_string_vec(&mut fbb, must_labels);
37 let forbidden_labels_off = create_string_vec(&mut fbb, forbidden_labels);
38 let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
39 let off = fb::ScanNodesOp::create(
40 &mut fbb,
41 &fb::ScanNodesOpArgs {
42 labels: Some(labels_off),
43 schema: Some(schema_off),
44 must_labels: Some(must_labels_off),
45 forbidden_labels: Some(forbidden_labels_off),
46 est_rows: *est_rows,
47 selectivity: *selectivity,
48 graph_ref: graph_ref_off,
49 },
50 );
51 (fb::OpNode::ScanNodesOp, Some(off.as_union_value()))
52 }
53 Op::ScanRels {
54 types,
55 schema,
56 src_labels,
57 dst_labels,
58 est_rows,
59 selectivity,
60 } => {
61 let types_off = create_string_vec(&mut fbb, types);
62 let schema_off = create_coldef_vec(&mut fbb, schema);
63 let src_labels_off = create_string_vec(&mut fbb, src_labels);
64 let dst_labels_off = create_string_vec(&mut fbb, dst_labels);
65 let off = fb::ScanRelsOp::create(
66 &mut fbb,
67 &fb::ScanRelsOpArgs {
68 types: Some(types_off),
69 schema: Some(schema_off),
70 src_labels: Some(src_labels_off),
71 dst_labels: Some(dst_labels_off),
72 est_rows: *est_rows,
73 selectivity: *selectivity,
74 },
75 );
76 (fb::OpNode::ScanRelsOp, Some(off.as_union_value()))
77 }
78 Op::Expand {
79 input,
80 src_col,
81 types,
82 dir,
83 schema,
84 src_var,
85 rel_var,
86 dst_var,
87 legal_src_labels,
88 legal_dst_labels,
89 est_degree,
90 graph_ref,
91 } => {
92 let types_off = create_string_vec(&mut fbb, types);
93 let schema_off = create_coldef_vec(&mut fbb, schema);
94 let src_var_off = fbb.create_string(src_var);
95 let rel_var_off = fbb.create_string(rel_var);
96 let dst_var_off = fbb.create_string(dst_var);
97 let legal_src_labels_off = create_string_vec(&mut fbb, legal_src_labels);
98 let legal_dst_labels_off = create_string_vec(&mut fbb, legal_dst_labels);
99 let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
100 let off = fb::ExpandOp::create(
101 &mut fbb,
102 &fb::ExpandOpArgs {
103 input: *input,
104 src_col: *src_col,
105 types: Some(types_off),
106 dir: to_fb_expand_dir(*dir),
107 schema: Some(schema_off),
108 src_var: Some(src_var_off),
109 rel_var: Some(rel_var_off),
110 dst_var: Some(dst_var_off),
111 legal_src_labels: Some(legal_src_labels_off),
112 legal_dst_labels: Some(legal_dst_labels_off),
113 est_degree: *est_degree,
114 graph_ref: graph_ref_off,
115 },
116 );
117 (fb::OpNode::ExpandOp, Some(off.as_union_value()))
118 }
119 Op::OptionalExpand {
120 input,
121 src_col,
122 types,
123 dir,
124 schema,
125 src_var,
126 rel_var,
127 dst_var,
128 legal_src_labels,
129 legal_dst_labels,
130 graph_ref,
131 } => {
132 let types_off = create_string_vec(&mut fbb, types);
133 let schema_off = create_coldef_vec(&mut fbb, schema);
134 let src_var_off = fbb.create_string(src_var);
135 let rel_var_off = fbb.create_string(rel_var);
136 let dst_var_off = fbb.create_string(dst_var);
137 let legal_src_labels_off = create_string_vec(&mut fbb, legal_src_labels);
138 let legal_dst_labels_off = create_string_vec(&mut fbb, legal_dst_labels);
139 let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
140 let off = fb::OptionalExpandOp::create(
141 &mut fbb,
142 &fb::OptionalExpandOpArgs {
143 input: *input,
144 src_col: *src_col,
145 types: Some(types_off),
146 dir: to_fb_expand_dir(*dir),
147 schema: Some(schema_off),
148 src_var: Some(src_var_off),
149 rel_var: Some(rel_var_off),
150 dst_var: Some(dst_var_off),
151 legal_src_labels: Some(legal_src_labels_off),
152 legal_dst_labels: Some(legal_dst_labels_off),
153 graph_ref: graph_ref_off,
154 },
155 );
156 (fb::OpNode::OptionalExpandOp, Some(off.as_union_value()))
157 }
158 Op::SemiExpand {
159 input,
160 src_col,
161 types,
162 dir,
163 schema,
164 legal_src_labels,
165 legal_dst_labels,
166 } => {
167 let types_off = create_string_vec(&mut fbb, types);
168 let schema_off = create_coldef_vec(&mut fbb, schema);
169 let legal_src_labels_off = create_string_vec(&mut fbb, legal_src_labels);
170 let legal_dst_labels_off = create_string_vec(&mut fbb, legal_dst_labels);
171 let off = fb::SemiExpandOp::create(
172 &mut fbb,
173 &fb::SemiExpandOpArgs {
174 input: *input,
175 src_col: *src_col,
176 types: Some(types_off),
177 dir: to_fb_expand_dir(*dir),
178 schema: Some(schema_off),
179 legal_src_labels: Some(legal_src_labels_off),
180 legal_dst_labels: Some(legal_dst_labels_off),
181 },
182 );
183 (fb::OpNode::SemiExpandOp, Some(off.as_union_value()))
184 }
185 Op::ExpandVarLen {
186 input,
187 src_col,
188 types,
189 dir,
190 min_hops,
191 max_hops,
192 schema,
193 src_var,
194 path_var,
195 dst_var,
196 graph_ref,
197 } => {
198 let types_off = create_string_vec(&mut fbb, types);
199 let schema_off = create_coldef_vec(&mut fbb, schema);
200 let src_var_off = fbb.create_string(src_var);
201 let path_var_off = fbb.create_string(path_var);
202 let dst_var_off = fbb.create_string(dst_var);
203 let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
204 let off = fb::ExpandVarLenOp::create(
205 &mut fbb,
206 &fb::ExpandVarLenOpArgs {
207 input: *input,
208 src_col: *src_col,
209 types: Some(types_off),
210 dir: to_fb_expand_dir(*dir),
211 min_hops: *min_hops,
212 max_hops: *max_hops,
213 schema: Some(schema_off),
214 src_var: Some(src_var_off),
215 path_var: Some(path_var_off),
216 dst_var: Some(dst_var_off),
217 graph_ref: graph_ref_off,
218 },
219 );
220 (fb::OpNode::ExpandVarLenOp, Some(off.as_union_value()))
221 }
222 Op::Filter { input, predicate } => {
223 let pred_off = serialize_expr(&mut fbb, predicate);
224 let off = fb::FilterOp::create(
225 &mut fbb,
226 &fb::FilterOpArgs {
227 input: *input,
228 predicate: Some(pred_off),
229 },
230 );
231 (fb::OpNode::FilterOp, Some(off.as_union_value()))
232 }
233 Op::BlockMarker {
234 input,
235 block_id,
236 branch_id,
237 } => {
238 let off = fb::BlockMarkerOp::create(
239 &mut fbb,
240 &fb::BlockMarkerOpArgs {
241 input: *input,
242 block_id: *block_id,
243 branch_id: *branch_id,
244 },
245 );
246 (fb::OpNode::BlockMarkerOp, Some(off.as_union_value()))
247 }
248 Op::Project {
249 input,
250 exprs,
251 schema,
252 } => {
253 let exprs_off = create_expr_vec(&mut fbb, exprs);
254 let schema_off = create_coldef_vec(&mut fbb, schema);
255 let off = fb::ProjectOp::create(
256 &mut fbb,
257 &fb::ProjectOpArgs {
258 input: *input,
259 exprs: Some(exprs_off),
260 schema: Some(schema_off),
261 },
262 );
263 (fb::OpNode::ProjectOp, Some(off.as_union_value()))
264 }
265 Op::Aggregate {
266 input,
267 keys,
268 aggs,
269 schema,
270 } => {
271 let keys_off = fbb.create_vector(keys);
272 let aggs_off = create_expr_vec(&mut fbb, aggs);
273 let schema_off = create_coldef_vec(&mut fbb, schema);
274 let off = fb::AggregateOp::create(
275 &mut fbb,
276 &fb::AggregateOpArgs {
277 input: *input,
278 keys: Some(keys_off),
279 aggs: Some(aggs_off),
280 schema: Some(schema_off),
281 },
282 );
283 (fb::OpNode::AggregateOp, Some(off.as_union_value()))
284 }
285 Op::Sort { input, keys, dirs } => {
286 let dirs_fb: Vec<_> = dirs.iter().copied().map(to_fb_sort_dir).collect();
287 let dirs_off = fbb.create_vector(&dirs_fb);
288 let keys_off = fbb.create_vector(keys);
289 let off = fb::SortOp::create(
290 &mut fbb,
291 &fb::SortOpArgs {
292 input: *input,
293 keys: Some(keys_off),
294 dirs: Some(dirs_off),
295 },
296 );
297 (fb::OpNode::SortOp, Some(off.as_union_value()))
298 }
299 Op::Limit { input, count, skip } => {
300 let off = fb::LimitOp::create(
301 &mut fbb,
302 &fb::LimitOpArgs {
303 input: *input,
304 count: *count,
305 skip: *skip,
306 },
307 );
308 (fb::OpNode::LimitOp, Some(off.as_union_value()))
309 }
310 Op::Unwind {
311 input,
312 list_expr,
313 out_var,
314 schema,
315 } => {
316 let list_expr_off = serialize_expr(&mut fbb, list_expr);
317 let out_var_off = fbb.create_string(out_var);
318 let schema_off = create_coldef_vec(&mut fbb, schema);
319 let off = fb::UnwindOp::create(
320 &mut fbb,
321 &fb::UnwindOpArgs {
322 input: *input,
323 list_expr: Some(list_expr_off),
324 out_var: Some(out_var_off),
325 schema: Some(schema_off),
326 },
327 );
328 (fb::OpNode::UnwindOp, Some(off.as_union_value()))
329 }
330 Op::PathConstruct {
331 input,
332 rel_cols,
333 schema,
334 } => {
335 let rel_cols_off = fbb.create_vector(rel_cols);
336 let schema_off = create_coldef_vec(&mut fbb, schema);
337 let off = fb::PathConstructOp::create(
338 &mut fbb,
339 &fb::PathConstructOpArgs {
340 input: *input,
341 rel_cols: Some(rel_cols_off),
342 schema: Some(schema_off),
343 },
344 );
345 (fb::OpNode::PathConstructOp, Some(off.as_union_value()))
346 }
347 Op::Union {
348 lhs,
349 rhs,
350 all,
351 schema,
352 } => {
353 let schema_off = create_coldef_vec(&mut fbb, schema);
354 let off = fb::UnionOp::create(
355 &mut fbb,
356 &fb::UnionOpArgs {
357 lhs: *lhs,
358 rhs: *rhs,
359 all: *all,
360 schema: Some(schema_off),
361 },
362 );
363 (fb::OpNode::UnionOp, Some(off.as_union_value()))
364 }
365 Op::CreateNode {
366 input,
367 labels,
368 props,
369 schema,
370 out_var,
371 } => {
372 let labels_off = create_string_vec(&mut fbb, labels);
373 let props_off = serialize_expr(&mut fbb, props);
374 let schema_off = create_coldef_vec(&mut fbb, schema);
375 let out_var_off = fbb.create_string(out_var);
376 let off = fb::CreateNodeOp::create(
377 &mut fbb,
378 &fb::CreateNodeOpArgs {
379 input: *input,
380 labels: Some(labels_off),
381 props: Some(props_off),
382 schema: Some(schema_off),
383 out_var: Some(out_var_off),
384 },
385 );
386 (fb::OpNode::CreateNodeOp, Some(off.as_union_value()))
387 }
388 Op::CreateRel {
389 input,
390 src_col,
391 dst_col,
392 rel_type,
393 props,
394 schema,
395 out_var,
396 } => {
397 let rel_type_off = fbb.create_string(rel_type);
398 let props_off = serialize_expr(&mut fbb, props);
399 let schema_off = create_coldef_vec(&mut fbb, schema);
400 let out_var_off = fbb.create_string(out_var);
401 let off = fb::CreateRelOp::create(
402 &mut fbb,
403 &fb::CreateRelOpArgs {
404 input: *input,
405 src_col: *src_col,
406 dst_col: *dst_col,
407 rel_type: Some(rel_type_off),
408 props: Some(props_off),
409 schema: Some(schema_off),
410 out_var: Some(out_var_off),
411 },
412 );
413 (fb::OpNode::CreateRelOp, Some(off.as_union_value()))
414 }
415 Op::Merge {
416 input,
417 pattern,
418 on_create_props,
419 on_match_props,
420 schema,
421 } => {
422 let pattern_off = serialize_expr(&mut fbb, pattern);
423 let on_create_props_off = serialize_expr(&mut fbb, on_create_props);
424 let on_match_props_off = serialize_expr(&mut fbb, on_match_props);
425 let schema_off = create_coldef_vec(&mut fbb, schema);
426 let off = fb::MergeOp::create(
427 &mut fbb,
428 &fb::MergeOpArgs {
429 input: *input,
430 pattern: Some(pattern_off),
431 on_create_props: Some(on_create_props_off),
432 on_match_props: Some(on_match_props_off),
433 schema: Some(schema_off),
434 },
435 );
436 (fb::OpNode::MergeOp, Some(off.as_union_value()))
437 }
438 Op::Delete {
439 input,
440 target_col,
441 detach,
442 schema,
443 } => {
444 let schema_off = create_coldef_vec(&mut fbb, schema);
445 let off = fb::DeleteOp::create(
446 &mut fbb,
447 &fb::DeleteOpArgs {
448 input: *input,
449 target_col: *target_col,
450 detach: *detach,
451 schema: Some(schema_off),
452 },
453 );
454 (fb::OpNode::DeleteOp, Some(off.as_union_value()))
455 }
456 Op::SetProperty {
457 input,
458 target_col,
459 key,
460 value_expr,
461 schema,
462 } => {
463 let key_off = fbb.create_string(key);
464 let value_expr_off = serialize_expr(&mut fbb, value_expr);
465 let schema_off = create_coldef_vec(&mut fbb, schema);
466 let off = fb::SetPropertyOp::create(
467 &mut fbb,
468 &fb::SetPropertyOpArgs {
469 input: *input,
470 target_col: *target_col,
471 key: Some(key_off),
472 value_expr: Some(value_expr_off),
473 schema: Some(schema_off),
474 },
475 );
476 (fb::OpNode::SetPropertyOp, Some(off.as_union_value()))
477 }
478 Op::RemoveProperty {
479 input,
480 target_col,
481 key,
482 schema,
483 } => {
484 let key_off = fbb.create_string(key);
485 let schema_off = create_coldef_vec(&mut fbb, schema);
486 let off = fb::RemovePropertyOp::create(
487 &mut fbb,
488 &fb::RemovePropertyOpArgs {
489 input: *input,
490 target_col: *target_col,
491 key: Some(key_off),
492 schema: Some(schema_off),
493 },
494 );
495 (fb::OpNode::RemovePropertyOp, Some(off.as_union_value()))
496 }
497 Op::VectorScan {
498 input,
499 collection,
500 query_vector,
501 metric,
502 top_k,
503 approx_hint,
504 schema,
505 } => {
506 let collection_off = fbb.create_string(collection);
507 let query_vector_off = serialize_expr(&mut fbb, query_vector);
508 let schema_off = create_coldef_vec(&mut fbb, schema);
509 let off = fb::VectorScanOp::create(
510 &mut fbb,
511 &fb::VectorScanOpArgs {
512 input: *input,
513 collection: Some(collection_off),
514 query_vector: Some(query_vector_off),
515 metric: to_fb_vector_metric(*metric),
516 top_k: *top_k,
517 approx_hint: *approx_hint,
518 schema: Some(schema_off),
519 },
520 );
521 (fb::OpNode::VectorScanOp, Some(off.as_union_value()))
522 }
523 Op::Rerank {
524 input,
525 score_expr,
526 top_k,
527 schema,
528 } => {
529 let score_expr_off = serialize_expr(&mut fbb, score_expr);
530 let schema_off = create_coldef_vec(&mut fbb, schema);
531 let off = fb::RerankOp::create(
532 &mut fbb,
533 &fb::RerankOpArgs {
534 input: *input,
535 score_expr: Some(score_expr_off),
536 top_k: *top_k,
537 schema: Some(schema_off),
538 },
539 );
540 (fb::OpNode::RerankOp, Some(off.as_union_value()))
541 }
542 Op::Return { input } => {
543 let off = fb::ReturnOp::create(&mut fbb, &fb::ReturnOpArgs { input: *input });
544 (fb::OpNode::ReturnOp, Some(off.as_union_value()))
545 }
546 Op::ConstRow => {
547 let off = fb::ConstRowOp::create(&mut fbb, &fb::ConstRowOpArgs {});
548 (fb::OpNode::ConstRowOp, Some(off.as_union_value()))
549 }
550 };
551
552 let plan_op = fb::PlanOp::create(&mut fbb, &fb::PlanOpArgs { node_type, node });
553 ops_offsets.push(plan_op);
554 }
555
556 let ops = fbb.create_vector(&ops_offsets);
557 let root = fb::PlexusPlan::create(
558 &mut fbb,
559 &fb::PlexusPlanArgs {
560 version: Some(version),
561 ops: Some(ops),
562 root_op: plan.root_op,
563 },
564 );
565 fb::finish_plexus_plan_buffer(&mut fbb, root);
566 Ok(fbb.finished_data().to_vec())
567}