Skip to main content

plexus_serde/plan/
serialize.rs

1use crate::convert::{to_fb_expand_dir, to_fb_sort_dir, to_fb_vector_metric};
2use crate::expr::{create_expr_vec, serialize_expr};
3use crate::plan::common::{create_coldef_vec, create_string_vec};
4use crate::plexus_generated::plexus as fb;
5use crate::{Op, Plan, SerdeError};
6use flatbuffers::FlatBufferBuilder;
7
8pub fn serialize_plan(plan: &Plan) -> Result<Vec<u8>, SerdeError> {
9    let mut fbb = FlatBufferBuilder::new();
10
11    let producer = fbb.create_string(&plan.version.producer);
12    let version = fb::Version::create(
13        &mut fbb,
14        &fb::VersionArgs {
15            major: plan.version.major,
16            minor: plan.version.minor,
17            patch: plan.version.patch,
18            producer: Some(producer),
19        },
20    );
21
22    let mut ops_offsets = Vec::with_capacity(plan.ops.len());
23    for op in &plan.ops {
24        let (node_type, node) = match op {
25            Op::ScanNodes {
26                labels,
27                schema,
28                must_labels,
29                forbidden_labels,
30                est_rows,
31                selectivity,
32                graph_ref,
33            } => {
34                let labels_off = create_string_vec(&mut fbb, labels);
35                let schema_off = create_coldef_vec(&mut fbb, schema);
36                let must_labels_off = create_string_vec(&mut fbb, must_labels);
37                let forbidden_labels_off = create_string_vec(&mut fbb, forbidden_labels);
38                let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
39                let off = fb::ScanNodesOp::create(
40                    &mut fbb,
41                    &fb::ScanNodesOpArgs {
42                        labels: Some(labels_off),
43                        schema: Some(schema_off),
44                        must_labels: Some(must_labels_off),
45                        forbidden_labels: Some(forbidden_labels_off),
46                        est_rows: *est_rows,
47                        selectivity: *selectivity,
48                        graph_ref: graph_ref_off,
49                    },
50                );
51                (fb::OpNode::ScanNodesOp, Some(off.as_union_value()))
52            }
53            Op::ScanRels {
54                types,
55                schema,
56                src_labels,
57                dst_labels,
58                est_rows,
59                selectivity,
60            } => {
61                let types_off = create_string_vec(&mut fbb, types);
62                let schema_off = create_coldef_vec(&mut fbb, schema);
63                let src_labels_off = create_string_vec(&mut fbb, src_labels);
64                let dst_labels_off = create_string_vec(&mut fbb, dst_labels);
65                let off = fb::ScanRelsOp::create(
66                    &mut fbb,
67                    &fb::ScanRelsOpArgs {
68                        types: Some(types_off),
69                        schema: Some(schema_off),
70                        src_labels: Some(src_labels_off),
71                        dst_labels: Some(dst_labels_off),
72                        est_rows: *est_rows,
73                        selectivity: *selectivity,
74                    },
75                );
76                (fb::OpNode::ScanRelsOp, Some(off.as_union_value()))
77            }
78            Op::Expand {
79                input,
80                src_col,
81                types,
82                dir,
83                schema,
84                src_var,
85                rel_var,
86                dst_var,
87                legal_src_labels,
88                legal_dst_labels,
89                est_degree,
90                graph_ref,
91            } => {
92                let types_off = create_string_vec(&mut fbb, types);
93                let schema_off = create_coldef_vec(&mut fbb, schema);
94                let src_var_off = fbb.create_string(src_var);
95                let rel_var_off = fbb.create_string(rel_var);
96                let dst_var_off = fbb.create_string(dst_var);
97                let legal_src_labels_off = create_string_vec(&mut fbb, legal_src_labels);
98                let legal_dst_labels_off = create_string_vec(&mut fbb, legal_dst_labels);
99                let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
100                let off = fb::ExpandOp::create(
101                    &mut fbb,
102                    &fb::ExpandOpArgs {
103                        input: *input,
104                        src_col: *src_col,
105                        types: Some(types_off),
106                        dir: to_fb_expand_dir(*dir),
107                        schema: Some(schema_off),
108                        src_var: Some(src_var_off),
109                        rel_var: Some(rel_var_off),
110                        dst_var: Some(dst_var_off),
111                        legal_src_labels: Some(legal_src_labels_off),
112                        legal_dst_labels: Some(legal_dst_labels_off),
113                        est_degree: *est_degree,
114                        graph_ref: graph_ref_off,
115                    },
116                );
117                (fb::OpNode::ExpandOp, Some(off.as_union_value()))
118            }
119            Op::OptionalExpand {
120                input,
121                src_col,
122                types,
123                dir,
124                schema,
125                src_var,
126                rel_var,
127                dst_var,
128                legal_src_labels,
129                legal_dst_labels,
130                graph_ref,
131            } => {
132                let types_off = create_string_vec(&mut fbb, types);
133                let schema_off = create_coldef_vec(&mut fbb, schema);
134                let src_var_off = fbb.create_string(src_var);
135                let rel_var_off = fbb.create_string(rel_var);
136                let dst_var_off = fbb.create_string(dst_var);
137                let legal_src_labels_off = create_string_vec(&mut fbb, legal_src_labels);
138                let legal_dst_labels_off = create_string_vec(&mut fbb, legal_dst_labels);
139                let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
140                let off = fb::OptionalExpandOp::create(
141                    &mut fbb,
142                    &fb::OptionalExpandOpArgs {
143                        input: *input,
144                        src_col: *src_col,
145                        types: Some(types_off),
146                        dir: to_fb_expand_dir(*dir),
147                        schema: Some(schema_off),
148                        src_var: Some(src_var_off),
149                        rel_var: Some(rel_var_off),
150                        dst_var: Some(dst_var_off),
151                        legal_src_labels: Some(legal_src_labels_off),
152                        legal_dst_labels: Some(legal_dst_labels_off),
153                        graph_ref: graph_ref_off,
154                    },
155                );
156                (fb::OpNode::OptionalExpandOp, Some(off.as_union_value()))
157            }
158            Op::SemiExpand {
159                input,
160                src_col,
161                types,
162                dir,
163                schema,
164                legal_src_labels,
165                legal_dst_labels,
166            } => {
167                let types_off = create_string_vec(&mut fbb, types);
168                let schema_off = create_coldef_vec(&mut fbb, schema);
169                let legal_src_labels_off = create_string_vec(&mut fbb, legal_src_labels);
170                let legal_dst_labels_off = create_string_vec(&mut fbb, legal_dst_labels);
171                let off = fb::SemiExpandOp::create(
172                    &mut fbb,
173                    &fb::SemiExpandOpArgs {
174                        input: *input,
175                        src_col: *src_col,
176                        types: Some(types_off),
177                        dir: to_fb_expand_dir(*dir),
178                        schema: Some(schema_off),
179                        legal_src_labels: Some(legal_src_labels_off),
180                        legal_dst_labels: Some(legal_dst_labels_off),
181                    },
182                );
183                (fb::OpNode::SemiExpandOp, Some(off.as_union_value()))
184            }
185            Op::ExpandVarLen {
186                input,
187                src_col,
188                types,
189                dir,
190                min_hops,
191                max_hops,
192                schema,
193                src_var,
194                path_var,
195                dst_var,
196                graph_ref,
197            } => {
198                let types_off = create_string_vec(&mut fbb, types);
199                let schema_off = create_coldef_vec(&mut fbb, schema);
200                let src_var_off = fbb.create_string(src_var);
201                let path_var_off = fbb.create_string(path_var);
202                let dst_var_off = fbb.create_string(dst_var);
203                let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
204                let off = fb::ExpandVarLenOp::create(
205                    &mut fbb,
206                    &fb::ExpandVarLenOpArgs {
207                        input: *input,
208                        src_col: *src_col,
209                        types: Some(types_off),
210                        dir: to_fb_expand_dir(*dir),
211                        min_hops: *min_hops,
212                        max_hops: *max_hops,
213                        schema: Some(schema_off),
214                        src_var: Some(src_var_off),
215                        path_var: Some(path_var_off),
216                        dst_var: Some(dst_var_off),
217                        graph_ref: graph_ref_off,
218                    },
219                );
220                (fb::OpNode::ExpandVarLenOp, Some(off.as_union_value()))
221            }
222            Op::Filter { input, predicate } => {
223                let pred_off = serialize_expr(&mut fbb, predicate);
224                let off = fb::FilterOp::create(
225                    &mut fbb,
226                    &fb::FilterOpArgs {
227                        input: *input,
228                        predicate: Some(pred_off),
229                    },
230                );
231                (fb::OpNode::FilterOp, Some(off.as_union_value()))
232            }
233            Op::BlockMarker {
234                input,
235                block_id,
236                branch_id,
237            } => {
238                let off = fb::BlockMarkerOp::create(
239                    &mut fbb,
240                    &fb::BlockMarkerOpArgs {
241                        input: *input,
242                        block_id: *block_id,
243                        branch_id: *branch_id,
244                    },
245                );
246                (fb::OpNode::BlockMarkerOp, Some(off.as_union_value()))
247            }
248            Op::Project {
249                input,
250                exprs,
251                schema,
252            } => {
253                let exprs_off = create_expr_vec(&mut fbb, exprs);
254                let schema_off = create_coldef_vec(&mut fbb, schema);
255                let off = fb::ProjectOp::create(
256                    &mut fbb,
257                    &fb::ProjectOpArgs {
258                        input: *input,
259                        exprs: Some(exprs_off),
260                        schema: Some(schema_off),
261                    },
262                );
263                (fb::OpNode::ProjectOp, Some(off.as_union_value()))
264            }
265            Op::Aggregate {
266                input,
267                keys,
268                aggs,
269                schema,
270            } => {
271                let keys_off = fbb.create_vector(keys);
272                let aggs_off = create_expr_vec(&mut fbb, aggs);
273                let schema_off = create_coldef_vec(&mut fbb, schema);
274                let off = fb::AggregateOp::create(
275                    &mut fbb,
276                    &fb::AggregateOpArgs {
277                        input: *input,
278                        keys: Some(keys_off),
279                        aggs: Some(aggs_off),
280                        schema: Some(schema_off),
281                    },
282                );
283                (fb::OpNode::AggregateOp, Some(off.as_union_value()))
284            }
285            Op::Sort { input, keys, dirs } => {
286                let dirs_fb: Vec<_> = dirs.iter().copied().map(to_fb_sort_dir).collect();
287                let dirs_off = fbb.create_vector(&dirs_fb);
288                let keys_off = fbb.create_vector(keys);
289                let off = fb::SortOp::create(
290                    &mut fbb,
291                    &fb::SortOpArgs {
292                        input: *input,
293                        keys: Some(keys_off),
294                        dirs: Some(dirs_off),
295                    },
296                );
297                (fb::OpNode::SortOp, Some(off.as_union_value()))
298            }
299            Op::Limit { input, count, skip } => {
300                let off = fb::LimitOp::create(
301                    &mut fbb,
302                    &fb::LimitOpArgs {
303                        input: *input,
304                        count: *count,
305                        skip: *skip,
306                    },
307                );
308                (fb::OpNode::LimitOp, Some(off.as_union_value()))
309            }
310            Op::Unwind {
311                input,
312                list_expr,
313                out_var,
314                schema,
315            } => {
316                let list_expr_off = serialize_expr(&mut fbb, list_expr);
317                let out_var_off = fbb.create_string(out_var);
318                let schema_off = create_coldef_vec(&mut fbb, schema);
319                let off = fb::UnwindOp::create(
320                    &mut fbb,
321                    &fb::UnwindOpArgs {
322                        input: *input,
323                        list_expr: Some(list_expr_off),
324                        out_var: Some(out_var_off),
325                        schema: Some(schema_off),
326                    },
327                );
328                (fb::OpNode::UnwindOp, Some(off.as_union_value()))
329            }
330            Op::PathConstruct {
331                input,
332                rel_cols,
333                schema,
334            } => {
335                let rel_cols_off = fbb.create_vector(rel_cols);
336                let schema_off = create_coldef_vec(&mut fbb, schema);
337                let off = fb::PathConstructOp::create(
338                    &mut fbb,
339                    &fb::PathConstructOpArgs {
340                        input: *input,
341                        rel_cols: Some(rel_cols_off),
342                        schema: Some(schema_off),
343                    },
344                );
345                (fb::OpNode::PathConstructOp, Some(off.as_union_value()))
346            }
347            Op::Union {
348                lhs,
349                rhs,
350                all,
351                schema,
352            } => {
353                let schema_off = create_coldef_vec(&mut fbb, schema);
354                let off = fb::UnionOp::create(
355                    &mut fbb,
356                    &fb::UnionOpArgs {
357                        lhs: *lhs,
358                        rhs: *rhs,
359                        all: *all,
360                        schema: Some(schema_off),
361                    },
362                );
363                (fb::OpNode::UnionOp, Some(off.as_union_value()))
364            }
365            Op::CreateNode {
366                input,
367                labels,
368                props,
369                schema,
370                out_var,
371            } => {
372                let labels_off = create_string_vec(&mut fbb, labels);
373                let props_off = serialize_expr(&mut fbb, props);
374                let schema_off = create_coldef_vec(&mut fbb, schema);
375                let out_var_off = fbb.create_string(out_var);
376                let off = fb::CreateNodeOp::create(
377                    &mut fbb,
378                    &fb::CreateNodeOpArgs {
379                        input: *input,
380                        labels: Some(labels_off),
381                        props: Some(props_off),
382                        schema: Some(schema_off),
383                        out_var: Some(out_var_off),
384                    },
385                );
386                (fb::OpNode::CreateNodeOp, Some(off.as_union_value()))
387            }
388            Op::CreateRel {
389                input,
390                src_col,
391                dst_col,
392                rel_type,
393                props,
394                schema,
395                out_var,
396            } => {
397                let rel_type_off = fbb.create_string(rel_type);
398                let props_off = serialize_expr(&mut fbb, props);
399                let schema_off = create_coldef_vec(&mut fbb, schema);
400                let out_var_off = fbb.create_string(out_var);
401                let off = fb::CreateRelOp::create(
402                    &mut fbb,
403                    &fb::CreateRelOpArgs {
404                        input: *input,
405                        src_col: *src_col,
406                        dst_col: *dst_col,
407                        rel_type: Some(rel_type_off),
408                        props: Some(props_off),
409                        schema: Some(schema_off),
410                        out_var: Some(out_var_off),
411                    },
412                );
413                (fb::OpNode::CreateRelOp, Some(off.as_union_value()))
414            }
415            Op::Merge {
416                input,
417                pattern,
418                on_create_props,
419                on_match_props,
420                schema,
421            } => {
422                let pattern_off = serialize_expr(&mut fbb, pattern);
423                let on_create_props_off = serialize_expr(&mut fbb, on_create_props);
424                let on_match_props_off = serialize_expr(&mut fbb, on_match_props);
425                let schema_off = create_coldef_vec(&mut fbb, schema);
426                let off = fb::MergeOp::create(
427                    &mut fbb,
428                    &fb::MergeOpArgs {
429                        input: *input,
430                        pattern: Some(pattern_off),
431                        on_create_props: Some(on_create_props_off),
432                        on_match_props: Some(on_match_props_off),
433                        schema: Some(schema_off),
434                    },
435                );
436                (fb::OpNode::MergeOp, Some(off.as_union_value()))
437            }
438            Op::Delete {
439                input,
440                target_col,
441                detach,
442                schema,
443            } => {
444                let schema_off = create_coldef_vec(&mut fbb, schema);
445                let off = fb::DeleteOp::create(
446                    &mut fbb,
447                    &fb::DeleteOpArgs {
448                        input: *input,
449                        target_col: *target_col,
450                        detach: *detach,
451                        schema: Some(schema_off),
452                    },
453                );
454                (fb::OpNode::DeleteOp, Some(off.as_union_value()))
455            }
456            Op::SetProperty {
457                input,
458                target_col,
459                key,
460                value_expr,
461                schema,
462            } => {
463                let key_off = fbb.create_string(key);
464                let value_expr_off = serialize_expr(&mut fbb, value_expr);
465                let schema_off = create_coldef_vec(&mut fbb, schema);
466                let off = fb::SetPropertyOp::create(
467                    &mut fbb,
468                    &fb::SetPropertyOpArgs {
469                        input: *input,
470                        target_col: *target_col,
471                        key: Some(key_off),
472                        value_expr: Some(value_expr_off),
473                        schema: Some(schema_off),
474                    },
475                );
476                (fb::OpNode::SetPropertyOp, Some(off.as_union_value()))
477            }
478            Op::RemoveProperty {
479                input,
480                target_col,
481                key,
482                schema,
483            } => {
484                let key_off = fbb.create_string(key);
485                let schema_off = create_coldef_vec(&mut fbb, schema);
486                let off = fb::RemovePropertyOp::create(
487                    &mut fbb,
488                    &fb::RemovePropertyOpArgs {
489                        input: *input,
490                        target_col: *target_col,
491                        key: Some(key_off),
492                        schema: Some(schema_off),
493                    },
494                );
495                (fb::OpNode::RemovePropertyOp, Some(off.as_union_value()))
496            }
497            Op::VectorScan {
498                input,
499                collection,
500                query_vector,
501                metric,
502                top_k,
503                approx_hint,
504                schema,
505            } => {
506                let collection_off = fbb.create_string(collection);
507                let query_vector_off = serialize_expr(&mut fbb, query_vector);
508                let schema_off = create_coldef_vec(&mut fbb, schema);
509                let off = fb::VectorScanOp::create(
510                    &mut fbb,
511                    &fb::VectorScanOpArgs {
512                        input: *input,
513                        collection: Some(collection_off),
514                        query_vector: Some(query_vector_off),
515                        metric: to_fb_vector_metric(*metric),
516                        top_k: *top_k,
517                        approx_hint: *approx_hint,
518                        schema: Some(schema_off),
519                    },
520                );
521                (fb::OpNode::VectorScanOp, Some(off.as_union_value()))
522            }
523            Op::Rerank {
524                input,
525                score_expr,
526                top_k,
527                schema,
528            } => {
529                let score_expr_off = serialize_expr(&mut fbb, score_expr);
530                let schema_off = create_coldef_vec(&mut fbb, schema);
531                let off = fb::RerankOp::create(
532                    &mut fbb,
533                    &fb::RerankOpArgs {
534                        input: *input,
535                        score_expr: Some(score_expr_off),
536                        top_k: *top_k,
537                        schema: Some(schema_off),
538                    },
539                );
540                (fb::OpNode::RerankOp, Some(off.as_union_value()))
541            }
542            Op::Return { input } => {
543                let off = fb::ReturnOp::create(&mut fbb, &fb::ReturnOpArgs { input: *input });
544                (fb::OpNode::ReturnOp, Some(off.as_union_value()))
545            }
546            Op::ConstRow => {
547                let off = fb::ConstRowOp::create(&mut fbb, &fb::ConstRowOpArgs {});
548                (fb::OpNode::ConstRowOp, Some(off.as_union_value()))
549            }
550        };
551
552        let plan_op = fb::PlanOp::create(&mut fbb, &fb::PlanOpArgs { node_type, node });
553        ops_offsets.push(plan_op);
554    }
555
556    let ops = fbb.create_vector(&ops_offsets);
557    let root = fb::PlexusPlan::create(
558        &mut fbb,
559        &fb::PlexusPlanArgs {
560            version: Some(version),
561            ops: Some(ops),
562            root_op: plan.root_op,
563        },
564    );
565    fb::finish_plexus_plan_buffer(&mut fbb, root);
566    Ok(fbb.finished_data().to_vec())
567}