1use crate::convert::{to_fb_expand_dir, to_fb_sort_dir, to_fb_vector_metric};
2use crate::expr::{create_expr_vec, serialize_expr};
3use crate::plan::common::{create_coldef_vec, create_string_vec};
4use crate::plexus_generated::plexus as fb;
5use crate::{Op, Plan, SerdeError};
6use flatbuffers::FlatBufferBuilder;
7
8pub fn serialize_plan(plan: &Plan) -> Result<Vec<u8>, SerdeError> {
9 let mut fbb = FlatBufferBuilder::new();
10
11 let producer = fbb.create_string(&plan.version.producer);
12 let version = fb::Version::create(
13 &mut fbb,
14 &fb::VersionArgs {
15 major: plan.version.major,
16 minor: plan.version.minor,
17 patch: plan.version.patch,
18 producer: Some(producer),
19 },
20 );
21
22 let mut ops_offsets = Vec::with_capacity(plan.ops.len());
23 for op in &plan.ops {
24 let (node_type, node) = match op {
25 Op::ScanNodes {
26 labels,
27 schema,
28 must_labels,
29 forbidden_labels,
30 est_rows,
31 selectivity,
32 graph_ref,
33 } => {
34 let labels_off = create_string_vec(&mut fbb, labels);
35 let schema_off = create_coldef_vec(&mut fbb, schema);
36 let must_labels_off = create_string_vec(&mut fbb, must_labels);
37 let forbidden_labels_off = create_string_vec(&mut fbb, forbidden_labels);
38 let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
39 let off = fb::ScanNodesOp::create(
40 &mut fbb,
41 &fb::ScanNodesOpArgs {
42 labels: Some(labels_off),
43 schema: Some(schema_off),
44 must_labels: Some(must_labels_off),
45 forbidden_labels: Some(forbidden_labels_off),
46 est_rows: *est_rows,
47 selectivity: *selectivity,
48 graph_ref: graph_ref_off,
49 },
50 );
51 (fb::OpNode::ScanNodesOp, Some(off.as_union_value()))
52 }
53 Op::ScanRels {
54 types,
55 schema,
56 src_labels,
57 dst_labels,
58 est_rows,
59 selectivity,
60 } => {
61 let types_off = create_string_vec(&mut fbb, types);
62 let schema_off = create_coldef_vec(&mut fbb, schema);
63 let src_labels_off = create_string_vec(&mut fbb, src_labels);
64 let dst_labels_off = create_string_vec(&mut fbb, dst_labels);
65 let off = fb::ScanRelsOp::create(
66 &mut fbb,
67 &fb::ScanRelsOpArgs {
68 types: Some(types_off),
69 schema: Some(schema_off),
70 src_labels: Some(src_labels_off),
71 dst_labels: Some(dst_labels_off),
72 est_rows: *est_rows,
73 selectivity: *selectivity,
74 },
75 );
76 (fb::OpNode::ScanRelsOp, Some(off.as_union_value()))
77 }
78 Op::Expand {
79 input,
80 src_col,
81 types,
82 dir,
83 schema,
84 src_var,
85 rel_var,
86 dst_var,
87 legal_src_labels,
88 legal_dst_labels,
89 est_degree,
90 graph_ref,
91 } => {
92 let types_off = create_string_vec(&mut fbb, types);
93 let schema_off = create_coldef_vec(&mut fbb, schema);
94 let src_var_off = fbb.create_string(src_var);
95 let rel_var_off = fbb.create_string(rel_var);
96 let dst_var_off = fbb.create_string(dst_var);
97 let legal_src_labels_off = create_string_vec(&mut fbb, legal_src_labels);
98 let legal_dst_labels_off = create_string_vec(&mut fbb, legal_dst_labels);
99 let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
100 let off = fb::ExpandOp::create(
101 &mut fbb,
102 &fb::ExpandOpArgs {
103 input: *input,
104 src_col: *src_col,
105 types: Some(types_off),
106 dir: to_fb_expand_dir(*dir),
107 schema: Some(schema_off),
108 src_var: Some(src_var_off),
109 rel_var: Some(rel_var_off),
110 dst_var: Some(dst_var_off),
111 legal_src_labels: Some(legal_src_labels_off),
112 legal_dst_labels: Some(legal_dst_labels_off),
113 est_degree: *est_degree,
114 graph_ref: graph_ref_off,
115 },
116 );
117 (fb::OpNode::ExpandOp, Some(off.as_union_value()))
118 }
119 Op::OptionalExpand {
120 input,
121 src_col,
122 types,
123 dir,
124 schema,
125 src_var,
126 rel_var,
127 dst_var,
128 legal_src_labels,
129 legal_dst_labels,
130 graph_ref,
131 } => {
132 let types_off = create_string_vec(&mut fbb, types);
133 let schema_off = create_coldef_vec(&mut fbb, schema);
134 let src_var_off = fbb.create_string(src_var);
135 let rel_var_off = fbb.create_string(rel_var);
136 let dst_var_off = fbb.create_string(dst_var);
137 let legal_src_labels_off = create_string_vec(&mut fbb, legal_src_labels);
138 let legal_dst_labels_off = create_string_vec(&mut fbb, legal_dst_labels);
139 let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
140 let off = fb::OptionalExpandOp::create(
141 &mut fbb,
142 &fb::OptionalExpandOpArgs {
143 input: *input,
144 src_col: *src_col,
145 types: Some(types_off),
146 dir: to_fb_expand_dir(*dir),
147 schema: Some(schema_off),
148 src_var: Some(src_var_off),
149 rel_var: Some(rel_var_off),
150 dst_var: Some(dst_var_off),
151 legal_src_labels: Some(legal_src_labels_off),
152 legal_dst_labels: Some(legal_dst_labels_off),
153 graph_ref: graph_ref_off,
154 },
155 );
156 (fb::OpNode::OptionalExpandOp, Some(off.as_union_value()))
157 }
158 Op::SemiExpand {
159 input,
160 src_col,
161 types,
162 dir,
163 schema,
164 legal_src_labels,
165 legal_dst_labels,
166 } => {
167 let types_off = create_string_vec(&mut fbb, types);
168 let schema_off = create_coldef_vec(&mut fbb, schema);
169 let legal_src_labels_off = create_string_vec(&mut fbb, legal_src_labels);
170 let legal_dst_labels_off = create_string_vec(&mut fbb, legal_dst_labels);
171 let off = fb::SemiExpandOp::create(
172 &mut fbb,
173 &fb::SemiExpandOpArgs {
174 input: *input,
175 src_col: *src_col,
176 types: Some(types_off),
177 dir: to_fb_expand_dir(*dir),
178 schema: Some(schema_off),
179 legal_src_labels: Some(legal_src_labels_off),
180 legal_dst_labels: Some(legal_dst_labels_off),
181 },
182 );
183 (fb::OpNode::SemiExpandOp, Some(off.as_union_value()))
184 }
185 Op::ExpandVarLen {
186 input,
187 src_col,
188 types,
189 dir,
190 min_hops,
191 max_hops,
192 schema,
193 src_var,
194 path_var,
195 dst_var,
196 graph_ref,
197 } => {
198 let types_off = create_string_vec(&mut fbb, types);
199 let schema_off = create_coldef_vec(&mut fbb, schema);
200 let src_var_off = fbb.create_string(src_var);
201 let path_var_off = fbb.create_string(path_var);
202 let dst_var_off = fbb.create_string(dst_var);
203 let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
204 let off = fb::ExpandVarLenOp::create(
205 &mut fbb,
206 &fb::ExpandVarLenOpArgs {
207 input: *input,
208 src_col: *src_col,
209 types: Some(types_off),
210 dir: to_fb_expand_dir(*dir),
211 min_hops: *min_hops,
212 max_hops: *max_hops,
213 schema: Some(schema_off),
214 src_var: Some(src_var_off),
215 path_var: Some(path_var_off),
216 dst_var: Some(dst_var_off),
217 graph_ref: graph_ref_off,
218 },
219 );
220 (fb::OpNode::ExpandVarLenOp, Some(off.as_union_value()))
221 }
222 Op::Filter { input, predicate } => {
223 let pred_off = serialize_expr(&mut fbb, predicate);
224 let off = fb::FilterOp::create(
225 &mut fbb,
226 &fb::FilterOpArgs {
227 input: *input,
228 predicate: Some(pred_off),
229 },
230 );
231 (fb::OpNode::FilterOp, Some(off.as_union_value()))
232 }
233 Op::BlockMarker {
234 input,
235 block_id,
236 branch_id,
237 } => {
238 let off = fb::BlockMarkerOp::create(
239 &mut fbb,
240 &fb::BlockMarkerOpArgs {
241 input: *input,
242 block_id: *block_id,
243 branch_id: *branch_id,
244 },
245 );
246 (fb::OpNode::BlockMarkerOp, Some(off.as_union_value()))
247 }
248 Op::Project {
249 input,
250 exprs,
251 schema,
252 } => {
253 let exprs_off = create_expr_vec(&mut fbb, exprs);
254 let schema_off = create_coldef_vec(&mut fbb, schema);
255 let off = fb::ProjectOp::create(
256 &mut fbb,
257 &fb::ProjectOpArgs {
258 input: *input,
259 exprs: Some(exprs_off),
260 schema: Some(schema_off),
261 },
262 );
263 (fb::OpNode::ProjectOp, Some(off.as_union_value()))
264 }
265 Op::Aggregate {
266 input,
267 keys,
268 aggs,
269 schema,
270 } => {
271 let keys_off = fbb.create_vector(keys);
272 let aggs_off = create_expr_vec(&mut fbb, aggs);
273 let schema_off = create_coldef_vec(&mut fbb, schema);
274 let off = fb::AggregateOp::create(
275 &mut fbb,
276 &fb::AggregateOpArgs {
277 input: *input,
278 keys: Some(keys_off),
279 aggs: Some(aggs_off),
280 schema: Some(schema_off),
281 },
282 );
283 (fb::OpNode::AggregateOp, Some(off.as_union_value()))
284 }
285 Op::Sort { input, keys, dirs } => {
286 let dirs_fb: Vec<_> = dirs.iter().copied().map(to_fb_sort_dir).collect();
287 let dirs_off = fbb.create_vector(&dirs_fb);
288 let keys_off = fbb.create_vector(keys);
289 let off = fb::SortOp::create(
290 &mut fbb,
291 &fb::SortOpArgs {
292 input: *input,
293 keys: Some(keys_off),
294 dirs: Some(dirs_off),
295 },
296 );
297 (fb::OpNode::SortOp, Some(off.as_union_value()))
298 }
299 Op::Limit {
300 input,
301 count,
302 skip,
303 cursor,
304 emit_cursor,
305 } => {
306 let cursor_off = cursor
307 .as_ref()
308 .map(|bytes| fbb.create_vector(bytes.iter().map(|&b| b as i8).collect::<Vec<i8>>().as_slice()));
309 let off = fb::LimitOp::create(
310 &mut fbb,
311 &fb::LimitOpArgs {
312 input: *input,
313 count: *count,
314 skip: *skip,
315 cursor: cursor_off,
316 emit_cursor: *emit_cursor,
317 },
318 );
319 (fb::OpNode::LimitOp, Some(off.as_union_value()))
320 }
321 Op::Unwind {
322 input,
323 list_expr,
324 out_var,
325 schema,
326 } => {
327 let list_expr_off = serialize_expr(&mut fbb, list_expr);
328 let out_var_off = fbb.create_string(out_var);
329 let schema_off = create_coldef_vec(&mut fbb, schema);
330 let off = fb::UnwindOp::create(
331 &mut fbb,
332 &fb::UnwindOpArgs {
333 input: *input,
334 list_expr: Some(list_expr_off),
335 out_var: Some(out_var_off),
336 schema: Some(schema_off),
337 },
338 );
339 (fb::OpNode::UnwindOp, Some(off.as_union_value()))
340 }
341 Op::PathConstruct {
342 input,
343 rel_cols,
344 schema,
345 } => {
346 let rel_cols_off = fbb.create_vector(rel_cols);
347 let schema_off = create_coldef_vec(&mut fbb, schema);
348 let off = fb::PathConstructOp::create(
349 &mut fbb,
350 &fb::PathConstructOpArgs {
351 input: *input,
352 rel_cols: Some(rel_cols_off),
353 schema: Some(schema_off),
354 },
355 );
356 (fb::OpNode::PathConstructOp, Some(off.as_union_value()))
357 }
358 Op::Union {
359 lhs,
360 rhs,
361 all,
362 schema,
363 } => {
364 let schema_off = create_coldef_vec(&mut fbb, schema);
365 let off = fb::UnionOp::create(
366 &mut fbb,
367 &fb::UnionOpArgs {
368 lhs: *lhs,
369 rhs: *rhs,
370 all: *all,
371 schema: Some(schema_off),
372 },
373 );
374 (fb::OpNode::UnionOp, Some(off.as_union_value()))
375 }
376 Op::CreateNode {
377 input,
378 labels,
379 props,
380 schema,
381 out_var,
382 } => {
383 let labels_off = create_string_vec(&mut fbb, labels);
384 let props_off = serialize_expr(&mut fbb, props);
385 let schema_off = create_coldef_vec(&mut fbb, schema);
386 let out_var_off = fbb.create_string(out_var);
387 let off = fb::CreateNodeOp::create(
388 &mut fbb,
389 &fb::CreateNodeOpArgs {
390 input: *input,
391 labels: Some(labels_off),
392 props: Some(props_off),
393 schema: Some(schema_off),
394 out_var: Some(out_var_off),
395 },
396 );
397 (fb::OpNode::CreateNodeOp, Some(off.as_union_value()))
398 }
399 Op::CreateRel {
400 input,
401 src_col,
402 dst_col,
403 rel_type,
404 props,
405 schema,
406 out_var,
407 } => {
408 let rel_type_off = fbb.create_string(rel_type);
409 let props_off = serialize_expr(&mut fbb, props);
410 let schema_off = create_coldef_vec(&mut fbb, schema);
411 let out_var_off = fbb.create_string(out_var);
412 let off = fb::CreateRelOp::create(
413 &mut fbb,
414 &fb::CreateRelOpArgs {
415 input: *input,
416 src_col: *src_col,
417 dst_col: *dst_col,
418 rel_type: Some(rel_type_off),
419 props: Some(props_off),
420 schema: Some(schema_off),
421 out_var: Some(out_var_off),
422 },
423 );
424 (fb::OpNode::CreateRelOp, Some(off.as_union_value()))
425 }
426 Op::Merge {
427 input,
428 pattern,
429 on_create_props,
430 on_match_props,
431 schema,
432 } => {
433 let pattern_off = serialize_expr(&mut fbb, pattern);
434 let on_create_props_off = serialize_expr(&mut fbb, on_create_props);
435 let on_match_props_off = serialize_expr(&mut fbb, on_match_props);
436 let schema_off = create_coldef_vec(&mut fbb, schema);
437 let off = fb::MergeOp::create(
438 &mut fbb,
439 &fb::MergeOpArgs {
440 input: *input,
441 pattern: Some(pattern_off),
442 on_create_props: Some(on_create_props_off),
443 on_match_props: Some(on_match_props_off),
444 schema: Some(schema_off),
445 },
446 );
447 (fb::OpNode::MergeOp, Some(off.as_union_value()))
448 }
449 Op::Delete {
450 input,
451 target_col,
452 detach,
453 schema,
454 } => {
455 let schema_off = create_coldef_vec(&mut fbb, schema);
456 let off = fb::DeleteOp::create(
457 &mut fbb,
458 &fb::DeleteOpArgs {
459 input: *input,
460 target_col: *target_col,
461 detach: *detach,
462 schema: Some(schema_off),
463 },
464 );
465 (fb::OpNode::DeleteOp, Some(off.as_union_value()))
466 }
467 Op::SetProperty {
468 input,
469 target_col,
470 key,
471 value_expr,
472 schema,
473 } => {
474 let key_off = fbb.create_string(key);
475 let value_expr_off = serialize_expr(&mut fbb, value_expr);
476 let schema_off = create_coldef_vec(&mut fbb, schema);
477 let off = fb::SetPropertyOp::create(
478 &mut fbb,
479 &fb::SetPropertyOpArgs {
480 input: *input,
481 target_col: *target_col,
482 key: Some(key_off),
483 value_expr: Some(value_expr_off),
484 schema: Some(schema_off),
485 },
486 );
487 (fb::OpNode::SetPropertyOp, Some(off.as_union_value()))
488 }
489 Op::RemoveProperty {
490 input,
491 target_col,
492 key,
493 schema,
494 } => {
495 let key_off = fbb.create_string(key);
496 let schema_off = create_coldef_vec(&mut fbb, schema);
497 let off = fb::RemovePropertyOp::create(
498 &mut fbb,
499 &fb::RemovePropertyOpArgs {
500 input: *input,
501 target_col: *target_col,
502 key: Some(key_off),
503 schema: Some(schema_off),
504 },
505 );
506 (fb::OpNode::RemovePropertyOp, Some(off.as_union_value()))
507 }
508 Op::VectorScan {
509 input,
510 collection,
511 query_vector,
512 metric,
513 top_k,
514 approx_hint,
515 schema,
516 } => {
517 let collection_off = fbb.create_string(collection);
518 let query_vector_off = serialize_expr(&mut fbb, query_vector);
519 let schema_off = create_coldef_vec(&mut fbb, schema);
520 let off = fb::VectorScanOp::create(
521 &mut fbb,
522 &fb::VectorScanOpArgs {
523 input: *input,
524 collection: Some(collection_off),
525 query_vector: Some(query_vector_off),
526 metric: to_fb_vector_metric(*metric),
527 top_k: *top_k,
528 approx_hint: *approx_hint,
529 schema: Some(schema_off),
530 },
531 );
532 (fb::OpNode::VectorScanOp, Some(off.as_union_value()))
533 }
534 Op::Rerank {
535 input,
536 score_expr,
537 top_k,
538 schema,
539 } => {
540 let score_expr_off = serialize_expr(&mut fbb, score_expr);
541 let schema_off = create_coldef_vec(&mut fbb, schema);
542 let off = fb::RerankOp::create(
543 &mut fbb,
544 &fb::RerankOpArgs {
545 input: *input,
546 score_expr: Some(score_expr_off),
547 top_k: *top_k,
548 schema: Some(schema_off),
549 },
550 );
551 (fb::OpNode::RerankOp, Some(off.as_union_value()))
552 }
553 Op::Return { input } => {
554 let off = fb::ReturnOp::create(&mut fbb, &fb::ReturnOpArgs { input: *input });
555 (fb::OpNode::ReturnOp, Some(off.as_union_value()))
556 }
557 Op::ConstRow => {
558 let off = fb::ConstRowOp::create(&mut fbb, &fb::ConstRowOpArgs {});
559 (fb::OpNode::ConstRowOp, Some(off.as_union_value()))
560 }
561 };
562
563 let plan_op = fb::PlanOp::create(&mut fbb, &fb::PlanOpArgs { node_type, node });
564 ops_offsets.push(plan_op);
565 }
566
567 let ops = fbb.create_vector(&ops_offsets);
568 let root = fb::PlexusPlan::create(
569 &mut fbb,
570 &fb::PlexusPlanArgs {
571 version: Some(version),
572 ops: Some(ops),
573 root_op: plan.root_op,
574 },
575 );
576 fb::finish_plexus_plan_buffer(&mut fbb, root);
577 Ok(fbb.finished_data().to_vec())
578}