Skip to main content

plexus_serde/plan/
serialize.rs

1use crate::convert::{to_fb_expand_dir, to_fb_sort_dir, to_fb_vector_metric};
2use crate::expr::{create_expr_vec, serialize_expr};
3use crate::plan::common::{create_coldef_vec, create_string_vec};
4use crate::plexus_generated::plexus as fb;
5use crate::{Op, Plan, SerdeError};
6use flatbuffers::FlatBufferBuilder;
7
8pub fn serialize_plan(plan: &Plan) -> Result<Vec<u8>, SerdeError> {
9    let mut fbb = FlatBufferBuilder::new();
10
11    let producer = fbb.create_string(&plan.version.producer);
12    let version = fb::Version::create(
13        &mut fbb,
14        &fb::VersionArgs {
15            major: plan.version.major,
16            minor: plan.version.minor,
17            patch: plan.version.patch,
18            producer: Some(producer),
19        },
20    );
21
22    let mut ops_offsets = Vec::with_capacity(plan.ops.len());
23    for op in &plan.ops {
24        let (node_type, node) = match op {
25            Op::ScanNodes {
26                labels,
27                schema,
28                must_labels,
29                forbidden_labels,
30                est_rows,
31                selectivity,
32                graph_ref,
33            } => {
34                let labels_off = create_string_vec(&mut fbb, labels);
35                let schema_off = create_coldef_vec(&mut fbb, schema);
36                let must_labels_off = create_string_vec(&mut fbb, must_labels);
37                let forbidden_labels_off = create_string_vec(&mut fbb, forbidden_labels);
38                let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
39                let off = fb::ScanNodesOp::create(
40                    &mut fbb,
41                    &fb::ScanNodesOpArgs {
42                        labels: Some(labels_off),
43                        schema: Some(schema_off),
44                        must_labels: Some(must_labels_off),
45                        forbidden_labels: Some(forbidden_labels_off),
46                        est_rows: *est_rows,
47                        selectivity: *selectivity,
48                        graph_ref: graph_ref_off,
49                    },
50                );
51                (fb::OpNode::ScanNodesOp, Some(off.as_union_value()))
52            }
53            Op::ScanRels {
54                types,
55                schema,
56                src_labels,
57                dst_labels,
58                est_rows,
59                selectivity,
60            } => {
61                let types_off = create_string_vec(&mut fbb, types);
62                let schema_off = create_coldef_vec(&mut fbb, schema);
63                let src_labels_off = create_string_vec(&mut fbb, src_labels);
64                let dst_labels_off = create_string_vec(&mut fbb, dst_labels);
65                let off = fb::ScanRelsOp::create(
66                    &mut fbb,
67                    &fb::ScanRelsOpArgs {
68                        types: Some(types_off),
69                        schema: Some(schema_off),
70                        src_labels: Some(src_labels_off),
71                        dst_labels: Some(dst_labels_off),
72                        est_rows: *est_rows,
73                        selectivity: *selectivity,
74                    },
75                );
76                (fb::OpNode::ScanRelsOp, Some(off.as_union_value()))
77            }
78            Op::Expand {
79                input,
80                src_col,
81                types,
82                dir,
83                schema,
84                src_var,
85                rel_var,
86                dst_var,
87                legal_src_labels,
88                legal_dst_labels,
89                est_degree,
90                graph_ref,
91            } => {
92                let types_off = create_string_vec(&mut fbb, types);
93                let schema_off = create_coldef_vec(&mut fbb, schema);
94                let src_var_off = fbb.create_string(src_var);
95                let rel_var_off = fbb.create_string(rel_var);
96                let dst_var_off = fbb.create_string(dst_var);
97                let legal_src_labels_off = create_string_vec(&mut fbb, legal_src_labels);
98                let legal_dst_labels_off = create_string_vec(&mut fbb, legal_dst_labels);
99                let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
100                let off = fb::ExpandOp::create(
101                    &mut fbb,
102                    &fb::ExpandOpArgs {
103                        input: *input,
104                        src_col: *src_col,
105                        types: Some(types_off),
106                        dir: to_fb_expand_dir(*dir),
107                        schema: Some(schema_off),
108                        src_var: Some(src_var_off),
109                        rel_var: Some(rel_var_off),
110                        dst_var: Some(dst_var_off),
111                        legal_src_labels: Some(legal_src_labels_off),
112                        legal_dst_labels: Some(legal_dst_labels_off),
113                        est_degree: *est_degree,
114                        graph_ref: graph_ref_off,
115                    },
116                );
117                (fb::OpNode::ExpandOp, Some(off.as_union_value()))
118            }
119            Op::OptionalExpand {
120                input,
121                src_col,
122                types,
123                dir,
124                schema,
125                src_var,
126                rel_var,
127                dst_var,
128                legal_src_labels,
129                legal_dst_labels,
130                graph_ref,
131            } => {
132                let types_off = create_string_vec(&mut fbb, types);
133                let schema_off = create_coldef_vec(&mut fbb, schema);
134                let src_var_off = fbb.create_string(src_var);
135                let rel_var_off = fbb.create_string(rel_var);
136                let dst_var_off = fbb.create_string(dst_var);
137                let legal_src_labels_off = create_string_vec(&mut fbb, legal_src_labels);
138                let legal_dst_labels_off = create_string_vec(&mut fbb, legal_dst_labels);
139                let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
140                let off = fb::OptionalExpandOp::create(
141                    &mut fbb,
142                    &fb::OptionalExpandOpArgs {
143                        input: *input,
144                        src_col: *src_col,
145                        types: Some(types_off),
146                        dir: to_fb_expand_dir(*dir),
147                        schema: Some(schema_off),
148                        src_var: Some(src_var_off),
149                        rel_var: Some(rel_var_off),
150                        dst_var: Some(dst_var_off),
151                        legal_src_labels: Some(legal_src_labels_off),
152                        legal_dst_labels: Some(legal_dst_labels_off),
153                        graph_ref: graph_ref_off,
154                    },
155                );
156                (fb::OpNode::OptionalExpandOp, Some(off.as_union_value()))
157            }
158            Op::SemiExpand {
159                input,
160                src_col,
161                types,
162                dir,
163                schema,
164                legal_src_labels,
165                legal_dst_labels,
166            } => {
167                let types_off = create_string_vec(&mut fbb, types);
168                let schema_off = create_coldef_vec(&mut fbb, schema);
169                let legal_src_labels_off = create_string_vec(&mut fbb, legal_src_labels);
170                let legal_dst_labels_off = create_string_vec(&mut fbb, legal_dst_labels);
171                let off = fb::SemiExpandOp::create(
172                    &mut fbb,
173                    &fb::SemiExpandOpArgs {
174                        input: *input,
175                        src_col: *src_col,
176                        types: Some(types_off),
177                        dir: to_fb_expand_dir(*dir),
178                        schema: Some(schema_off),
179                        legal_src_labels: Some(legal_src_labels_off),
180                        legal_dst_labels: Some(legal_dst_labels_off),
181                    },
182                );
183                (fb::OpNode::SemiExpandOp, Some(off.as_union_value()))
184            }
185            Op::ExpandVarLen {
186                input,
187                src_col,
188                types,
189                dir,
190                min_hops,
191                max_hops,
192                schema,
193                src_var,
194                path_var,
195                dst_var,
196                graph_ref,
197            } => {
198                let types_off = create_string_vec(&mut fbb, types);
199                let schema_off = create_coldef_vec(&mut fbb, schema);
200                let src_var_off = fbb.create_string(src_var);
201                let path_var_off = fbb.create_string(path_var);
202                let dst_var_off = fbb.create_string(dst_var);
203                let graph_ref_off = graph_ref.as_ref().map(|x| fbb.create_string(x));
204                let off = fb::ExpandVarLenOp::create(
205                    &mut fbb,
206                    &fb::ExpandVarLenOpArgs {
207                        input: *input,
208                        src_col: *src_col,
209                        types: Some(types_off),
210                        dir: to_fb_expand_dir(*dir),
211                        min_hops: *min_hops,
212                        max_hops: *max_hops,
213                        schema: Some(schema_off),
214                        src_var: Some(src_var_off),
215                        path_var: Some(path_var_off),
216                        dst_var: Some(dst_var_off),
217                        graph_ref: graph_ref_off,
218                    },
219                );
220                (fb::OpNode::ExpandVarLenOp, Some(off.as_union_value()))
221            }
222            Op::Filter { input, predicate } => {
223                let pred_off = serialize_expr(&mut fbb, predicate);
224                let off = fb::FilterOp::create(
225                    &mut fbb,
226                    &fb::FilterOpArgs {
227                        input: *input,
228                        predicate: Some(pred_off),
229                    },
230                );
231                (fb::OpNode::FilterOp, Some(off.as_union_value()))
232            }
233            Op::BlockMarker {
234                input,
235                block_id,
236                branch_id,
237            } => {
238                let off = fb::BlockMarkerOp::create(
239                    &mut fbb,
240                    &fb::BlockMarkerOpArgs {
241                        input: *input,
242                        block_id: *block_id,
243                        branch_id: *branch_id,
244                    },
245                );
246                (fb::OpNode::BlockMarkerOp, Some(off.as_union_value()))
247            }
248            Op::Project {
249                input,
250                exprs,
251                schema,
252            } => {
253                let exprs_off = create_expr_vec(&mut fbb, exprs);
254                let schema_off = create_coldef_vec(&mut fbb, schema);
255                let off = fb::ProjectOp::create(
256                    &mut fbb,
257                    &fb::ProjectOpArgs {
258                        input: *input,
259                        exprs: Some(exprs_off),
260                        schema: Some(schema_off),
261                    },
262                );
263                (fb::OpNode::ProjectOp, Some(off.as_union_value()))
264            }
265            Op::Aggregate {
266                input,
267                keys,
268                aggs,
269                schema,
270            } => {
271                let keys_off = fbb.create_vector(keys);
272                let aggs_off = create_expr_vec(&mut fbb, aggs);
273                let schema_off = create_coldef_vec(&mut fbb, schema);
274                let off = fb::AggregateOp::create(
275                    &mut fbb,
276                    &fb::AggregateOpArgs {
277                        input: *input,
278                        keys: Some(keys_off),
279                        aggs: Some(aggs_off),
280                        schema: Some(schema_off),
281                    },
282                );
283                (fb::OpNode::AggregateOp, Some(off.as_union_value()))
284            }
285            Op::Sort { input, keys, dirs } => {
286                let dirs_fb: Vec<_> = dirs.iter().copied().map(to_fb_sort_dir).collect();
287                let dirs_off = fbb.create_vector(&dirs_fb);
288                let keys_off = fbb.create_vector(keys);
289                let off = fb::SortOp::create(
290                    &mut fbb,
291                    &fb::SortOpArgs {
292                        input: *input,
293                        keys: Some(keys_off),
294                        dirs: Some(dirs_off),
295                    },
296                );
297                (fb::OpNode::SortOp, Some(off.as_union_value()))
298            }
299            Op::Limit {
300                input,
301                count,
302                skip,
303                cursor,
304                emit_cursor,
305            } => {
306                let cursor_off = cursor
307                    .as_ref()
308                    .map(|bytes| fbb.create_vector(bytes.iter().map(|&b| b as i8).collect::<Vec<i8>>().as_slice()));
309                let off = fb::LimitOp::create(
310                    &mut fbb,
311                    &fb::LimitOpArgs {
312                        input: *input,
313                        count: *count,
314                        skip: *skip,
315                        cursor: cursor_off,
316                        emit_cursor: *emit_cursor,
317                    },
318                );
319                (fb::OpNode::LimitOp, Some(off.as_union_value()))
320            }
321            Op::Unwind {
322                input,
323                list_expr,
324                out_var,
325                schema,
326            } => {
327                let list_expr_off = serialize_expr(&mut fbb, list_expr);
328                let out_var_off = fbb.create_string(out_var);
329                let schema_off = create_coldef_vec(&mut fbb, schema);
330                let off = fb::UnwindOp::create(
331                    &mut fbb,
332                    &fb::UnwindOpArgs {
333                        input: *input,
334                        list_expr: Some(list_expr_off),
335                        out_var: Some(out_var_off),
336                        schema: Some(schema_off),
337                    },
338                );
339                (fb::OpNode::UnwindOp, Some(off.as_union_value()))
340            }
341            Op::PathConstruct {
342                input,
343                rel_cols,
344                schema,
345            } => {
346                let rel_cols_off = fbb.create_vector(rel_cols);
347                let schema_off = create_coldef_vec(&mut fbb, schema);
348                let off = fb::PathConstructOp::create(
349                    &mut fbb,
350                    &fb::PathConstructOpArgs {
351                        input: *input,
352                        rel_cols: Some(rel_cols_off),
353                        schema: Some(schema_off),
354                    },
355                );
356                (fb::OpNode::PathConstructOp, Some(off.as_union_value()))
357            }
358            Op::Union {
359                lhs,
360                rhs,
361                all,
362                schema,
363            } => {
364                let schema_off = create_coldef_vec(&mut fbb, schema);
365                let off = fb::UnionOp::create(
366                    &mut fbb,
367                    &fb::UnionOpArgs {
368                        lhs: *lhs,
369                        rhs: *rhs,
370                        all: *all,
371                        schema: Some(schema_off),
372                    },
373                );
374                (fb::OpNode::UnionOp, Some(off.as_union_value()))
375            }
376            Op::CreateNode {
377                input,
378                labels,
379                props,
380                schema,
381                out_var,
382            } => {
383                let labels_off = create_string_vec(&mut fbb, labels);
384                let props_off = serialize_expr(&mut fbb, props);
385                let schema_off = create_coldef_vec(&mut fbb, schema);
386                let out_var_off = fbb.create_string(out_var);
387                let off = fb::CreateNodeOp::create(
388                    &mut fbb,
389                    &fb::CreateNodeOpArgs {
390                        input: *input,
391                        labels: Some(labels_off),
392                        props: Some(props_off),
393                        schema: Some(schema_off),
394                        out_var: Some(out_var_off),
395                    },
396                );
397                (fb::OpNode::CreateNodeOp, Some(off.as_union_value()))
398            }
399            Op::CreateRel {
400                input,
401                src_col,
402                dst_col,
403                rel_type,
404                props,
405                schema,
406                out_var,
407            } => {
408                let rel_type_off = fbb.create_string(rel_type);
409                let props_off = serialize_expr(&mut fbb, props);
410                let schema_off = create_coldef_vec(&mut fbb, schema);
411                let out_var_off = fbb.create_string(out_var);
412                let off = fb::CreateRelOp::create(
413                    &mut fbb,
414                    &fb::CreateRelOpArgs {
415                        input: *input,
416                        src_col: *src_col,
417                        dst_col: *dst_col,
418                        rel_type: Some(rel_type_off),
419                        props: Some(props_off),
420                        schema: Some(schema_off),
421                        out_var: Some(out_var_off),
422                    },
423                );
424                (fb::OpNode::CreateRelOp, Some(off.as_union_value()))
425            }
426            Op::Merge {
427                input,
428                pattern,
429                on_create_props,
430                on_match_props,
431                schema,
432            } => {
433                let pattern_off = serialize_expr(&mut fbb, pattern);
434                let on_create_props_off = serialize_expr(&mut fbb, on_create_props);
435                let on_match_props_off = serialize_expr(&mut fbb, on_match_props);
436                let schema_off = create_coldef_vec(&mut fbb, schema);
437                let off = fb::MergeOp::create(
438                    &mut fbb,
439                    &fb::MergeOpArgs {
440                        input: *input,
441                        pattern: Some(pattern_off),
442                        on_create_props: Some(on_create_props_off),
443                        on_match_props: Some(on_match_props_off),
444                        schema: Some(schema_off),
445                    },
446                );
447                (fb::OpNode::MergeOp, Some(off.as_union_value()))
448            }
449            Op::Delete {
450                input,
451                target_col,
452                detach,
453                schema,
454            } => {
455                let schema_off = create_coldef_vec(&mut fbb, schema);
456                let off = fb::DeleteOp::create(
457                    &mut fbb,
458                    &fb::DeleteOpArgs {
459                        input: *input,
460                        target_col: *target_col,
461                        detach: *detach,
462                        schema: Some(schema_off),
463                    },
464                );
465                (fb::OpNode::DeleteOp, Some(off.as_union_value()))
466            }
467            Op::SetProperty {
468                input,
469                target_col,
470                key,
471                value_expr,
472                schema,
473            } => {
474                let key_off = fbb.create_string(key);
475                let value_expr_off = serialize_expr(&mut fbb, value_expr);
476                let schema_off = create_coldef_vec(&mut fbb, schema);
477                let off = fb::SetPropertyOp::create(
478                    &mut fbb,
479                    &fb::SetPropertyOpArgs {
480                        input: *input,
481                        target_col: *target_col,
482                        key: Some(key_off),
483                        value_expr: Some(value_expr_off),
484                        schema: Some(schema_off),
485                    },
486                );
487                (fb::OpNode::SetPropertyOp, Some(off.as_union_value()))
488            }
489            Op::RemoveProperty {
490                input,
491                target_col,
492                key,
493                schema,
494            } => {
495                let key_off = fbb.create_string(key);
496                let schema_off = create_coldef_vec(&mut fbb, schema);
497                let off = fb::RemovePropertyOp::create(
498                    &mut fbb,
499                    &fb::RemovePropertyOpArgs {
500                        input: *input,
501                        target_col: *target_col,
502                        key: Some(key_off),
503                        schema: Some(schema_off),
504                    },
505                );
506                (fb::OpNode::RemovePropertyOp, Some(off.as_union_value()))
507            }
508            Op::VectorScan {
509                input,
510                collection,
511                query_vector,
512                metric,
513                top_k,
514                approx_hint,
515                schema,
516            } => {
517                let collection_off = fbb.create_string(collection);
518                let query_vector_off = serialize_expr(&mut fbb, query_vector);
519                let schema_off = create_coldef_vec(&mut fbb, schema);
520                let off = fb::VectorScanOp::create(
521                    &mut fbb,
522                    &fb::VectorScanOpArgs {
523                        input: *input,
524                        collection: Some(collection_off),
525                        query_vector: Some(query_vector_off),
526                        metric: to_fb_vector_metric(*metric),
527                        top_k: *top_k,
528                        approx_hint: *approx_hint,
529                        schema: Some(schema_off),
530                    },
531                );
532                (fb::OpNode::VectorScanOp, Some(off.as_union_value()))
533            }
534            Op::Rerank {
535                input,
536                score_expr,
537                top_k,
538                schema,
539            } => {
540                let score_expr_off = serialize_expr(&mut fbb, score_expr);
541                let schema_off = create_coldef_vec(&mut fbb, schema);
542                let off = fb::RerankOp::create(
543                    &mut fbb,
544                    &fb::RerankOpArgs {
545                        input: *input,
546                        score_expr: Some(score_expr_off),
547                        top_k: *top_k,
548                        schema: Some(schema_off),
549                    },
550                );
551                (fb::OpNode::RerankOp, Some(off.as_union_value()))
552            }
553            Op::Return { input } => {
554                let off = fb::ReturnOp::create(&mut fbb, &fb::ReturnOpArgs { input: *input });
555                (fb::OpNode::ReturnOp, Some(off.as_union_value()))
556            }
557            Op::ConstRow => {
558                let off = fb::ConstRowOp::create(&mut fbb, &fb::ConstRowOpArgs {});
559                (fb::OpNode::ConstRowOp, Some(off.as_union_value()))
560            }
561        };
562
563        let plan_op = fb::PlanOp::create(&mut fbb, &fb::PlanOpArgs { node_type, node });
564        ops_offsets.push(plan_op);
565    }
566
567    let ops = fbb.create_vector(&ops_offsets);
568    let root = fb::PlexusPlan::create(
569        &mut fbb,
570        &fb::PlexusPlanArgs {
571            version: Some(version),
572            ops: Some(ops),
573            root_op: plan.root_op,
574        },
575    );
576    fb::finish_plexus_plan_buffer(&mut fbb, root);
577    Ok(fbb.finished_data().to_vec())
578}