Skip to main content

plexus_serde/plan/
deserialize.rs

1use crate::convert::{from_fb_expand_dir, from_fb_sort_dir, from_fb_vector_metric};
2use crate::expr::deserialize_expr;
3use crate::plan::common::{req, vec_coldefs, vec_strings};
4use crate::plexus_generated::plexus as fb;
5use crate::{Op, Plan, SerdeError, Version};
6
7pub fn deserialize_plan(bytes: &[u8]) -> Result<Plan, SerdeError> {
8    let fb_plan = fb::root_as_plexus_plan(bytes)?;
9    let fb_version = fb_plan.version();
10    let version = Version {
11        major: fb_version.major(),
12        minor: fb_version.minor(),
13        patch: fb_version.patch(),
14        producer: req(fb_version.producer(), "Version.producer")?.to_string(),
15    };
16
17    let mut ops = Vec::new();
18    if let Some(fb_ops) = fb_plan.ops() {
19        for i in 0..fb_ops.len() {
20            let plan_op = fb_ops.get(i);
21            let op = match plan_op.node_type() {
22                fb::OpNode::ScanNodesOp => {
23                    let x = req(plan_op.node_as_scan_nodes_op(), "PlanOp.ScanNodesOp")?;
24                    Op::ScanNodes {
25                        labels: vec_strings(x.labels()),
26                        schema: vec_coldefs(x.schema())?,
27                        must_labels: vec_strings(x.must_labels()),
28                        forbidden_labels: vec_strings(x.forbidden_labels()),
29                        est_rows: x.est_rows(),
30                        selectivity: x.selectivity(),
31                        graph_ref: x.graph_ref().map(str::to_string),
32                    }
33                }
34                fb::OpNode::ScanRelsOp => {
35                    let x = req(plan_op.node_as_scan_rels_op(), "PlanOp.ScanRelsOp")?;
36                    Op::ScanRels {
37                        types: vec_strings(x.types()),
38                        schema: vec_coldefs(x.schema())?,
39                        src_labels: vec_strings(x.src_labels()),
40                        dst_labels: vec_strings(x.dst_labels()),
41                        est_rows: x.est_rows(),
42                        selectivity: x.selectivity(),
43                    }
44                }
45                fb::OpNode::ExpandOp => {
46                    let x = req(plan_op.node_as_expand_op(), "PlanOp.ExpandOp")?;
47                    Op::Expand {
48                        input: x.input(),
49                        src_col: x.src_col(),
50                        types: vec_strings(x.types()),
51                        dir: from_fb_expand_dir(x.dir())?,
52                        schema: vec_coldefs(x.schema())?,
53                        src_var: x.src_var().unwrap_or("").to_string(),
54                        rel_var: x.rel_var().unwrap_or("").to_string(),
55                        dst_var: x.dst_var().unwrap_or("").to_string(),
56                        legal_src_labels: vec_strings(x.legal_src_labels()),
57                        legal_dst_labels: vec_strings(x.legal_dst_labels()),
58                        est_degree: x.est_degree(),
59                        graph_ref: x.graph_ref().map(str::to_string),
60                    }
61                }
62                fb::OpNode::OptionalExpandOp => {
63                    let x = plan_op
64                        .node_as_optional_expand_op()
65                        .ok_or(SerdeError::MissingField("PlanOp.OptionalExpandOp"))?;
66                    Op::OptionalExpand {
67                        input: x.input(),
68                        src_col: x.src_col(),
69                        types: vec_strings(x.types()),
70                        dir: from_fb_expand_dir(x.dir())?,
71                        schema: vec_coldefs(x.schema())?,
72                        src_var: x.src_var().unwrap_or("").to_string(),
73                        rel_var: x.rel_var().unwrap_or("").to_string(),
74                        dst_var: x.dst_var().unwrap_or("").to_string(),
75                        legal_src_labels: vec_strings(x.legal_src_labels()),
76                        legal_dst_labels: vec_strings(x.legal_dst_labels()),
77                        graph_ref: x.graph_ref().map(str::to_string),
78                    }
79                }
80                fb::OpNode::SemiExpandOp => {
81                    let x = plan_op
82                        .node_as_semi_expand_op()
83                        .ok_or(SerdeError::MissingField("PlanOp.SemiExpandOp"))?;
84                    Op::SemiExpand {
85                        input: x.input(),
86                        src_col: x.src_col(),
87                        types: vec_strings(x.types()),
88                        dir: from_fb_expand_dir(x.dir())?,
89                        schema: vec_coldefs(x.schema())?,
90                        legal_src_labels: vec_strings(x.legal_src_labels()),
91                        legal_dst_labels: vec_strings(x.legal_dst_labels()),
92                    }
93                }
94                fb::OpNode::ExpandVarLenOp => {
95                    let x = plan_op
96                        .node_as_expand_var_len_op()
97                        .ok_or(SerdeError::MissingField("PlanOp.ExpandVarLenOp"))?;
98                    Op::ExpandVarLen {
99                        input: x.input(),
100                        src_col: x.src_col(),
101                        types: vec_strings(x.types()),
102                        dir: from_fb_expand_dir(x.dir())?,
103                        min_hops: x.min_hops(),
104                        max_hops: x.max_hops(),
105                        schema: vec_coldefs(x.schema())?,
106                        src_var: x.src_var().unwrap_or("").to_string(),
107                        path_var: x.path_var().unwrap_or("").to_string(),
108                        dst_var: x.dst_var().unwrap_or("").to_string(),
109                        graph_ref: x.graph_ref().map(str::to_string),
110                    }
111                }
112                fb::OpNode::FilterOp => {
113                    let x = req(plan_op.node_as_filter_op(), "PlanOp.FilterOp")?;
114                    let pred = req(x.predicate(), "FilterOp.predicate")?;
115                    Op::Filter {
116                        input: x.input(),
117                        predicate: deserialize_expr(pred)?,
118                    }
119                }
120                fb::OpNode::BlockMarkerOp => {
121                    let x = plan_op
122                        .node_as_block_marker_op()
123                        .ok_or(SerdeError::MissingField("PlanOp.BlockMarkerOp"))?;
124                    Op::BlockMarker {
125                        input: x.input(),
126                        block_id: x.block_id(),
127                        branch_id: x.branch_id(),
128                    }
129                }
130                fb::OpNode::ProjectOp => {
131                    let x = req(plan_op.node_as_project_op(), "PlanOp.ProjectOp")?;
132                    let exprs = x
133                        .exprs()
134                        .map(|v| {
135                            (0..v.len())
136                                .map(|j| deserialize_expr(v.get(j)))
137                                .collect::<Result<Vec<_>, _>>()
138                        })
139                        .transpose()?
140                        .unwrap_or_default();
141                    Op::Project {
142                        input: x.input(),
143                        exprs,
144                        schema: vec_coldefs(x.schema())?,
145                    }
146                }
147                fb::OpNode::AggregateOp => {
148                    let x = plan_op
149                        .node_as_aggregate_op()
150                        .ok_or(SerdeError::MissingField("PlanOp.AggregateOp"))?;
151                    let keys = x
152                        .keys()
153                        .map(|v| (0..v.len()).map(|j| v.get(j)).collect())
154                        .unwrap_or_default();
155                    let aggs = x
156                        .aggs()
157                        .map(|v| {
158                            (0..v.len())
159                                .map(|j| deserialize_expr(v.get(j)))
160                                .collect::<Result<Vec<_>, _>>()
161                        })
162                        .transpose()?
163                        .unwrap_or_default();
164                    Op::Aggregate {
165                        input: x.input(),
166                        keys,
167                        aggs,
168                        schema: vec_coldefs(x.schema())?,
169                    }
170                }
171                fb::OpNode::SortOp => {
172                    let x = req(plan_op.node_as_sort_op(), "PlanOp.SortOp")?;
173                    let keys = x
174                        .keys()
175                        .map(|v| (0..v.len()).map(|j| v.get(j)).collect())
176                        .unwrap_or_default();
177                    let dirs = x
178                        .dirs()
179                        .map(|v| {
180                            (0..v.len())
181                                .map(|j| from_fb_sort_dir(v.get(j)))
182                                .collect::<Result<Vec<_>, _>>()
183                        })
184                        .transpose()?
185                        .unwrap_or_default();
186                    Op::Sort {
187                        input: x.input(),
188                        keys,
189                        dirs,
190                    }
191                }
192                fb::OpNode::LimitOp => {
193                    let x = req(plan_op.node_as_limit_op(), "PlanOp.LimitOp")?;
194                    Op::Limit {
195                        input: x.input(),
196                        count: x.count(),
197                        skip: x.skip(),
198                        cursor: x.cursor().map(|v| v.iter().map(|b| b as u8).collect()),
199                        emit_cursor: x.emit_cursor(),
200                    }
201                }
202                fb::OpNode::UnwindOp => {
203                    let x = plan_op
204                        .node_as_unwind_op()
205                        .ok_or(SerdeError::MissingField("PlanOp.UnwindOp"))?;
206                    let list_expr = req(x.list_expr(), "UnwindOp.list_expr")?;
207                    Op::Unwind {
208                        input: x.input(),
209                        list_expr: deserialize_expr(list_expr)?,
210                        out_var: req(x.out_var(), "UnwindOp.out_var")?.to_string(),
211                        schema: vec_coldefs(x.schema())?,
212                    }
213                }
214                fb::OpNode::PathConstructOp => {
215                    let x = plan_op
216                        .node_as_path_construct_op()
217                        .ok_or(SerdeError::MissingField("PlanOp.PathConstructOp"))?;
218                    let rel_cols = x
219                        .rel_cols()
220                        .map(|v| (0..v.len()).map(|j| v.get(j)).collect())
221                        .unwrap_or_default();
222                    Op::PathConstruct {
223                        input: x.input(),
224                        rel_cols,
225                        schema: vec_coldefs(x.schema())?,
226                    }
227                }
228                fb::OpNode::UnionOp => {
229                    let x = plan_op
230                        .node_as_union_op()
231                        .ok_or(SerdeError::MissingField("PlanOp.UnionOp"))?;
232                    Op::Union {
233                        lhs: x.lhs(),
234                        rhs: x.rhs(),
235                        all: x.all(),
236                        schema: vec_coldefs(x.schema())?,
237                    }
238                }
239                fb::OpNode::CreateNodeOp => {
240                    let x = req(plan_op.node_as_create_node_op(), "PlanOp.CreateNodeOp")?;
241                    let props = req(x.props(), "CreateNodeOp.props")?;
242                    Op::CreateNode {
243                        input: x.input(),
244                        labels: vec_strings(x.labels()),
245                        props: deserialize_expr(props)?,
246                        schema: vec_coldefs(x.schema())?,
247                        out_var: x.out_var().unwrap_or("").to_string(),
248                    }
249                }
250                fb::OpNode::CreateRelOp => {
251                    let x = req(plan_op.node_as_create_rel_op(), "PlanOp.CreateRelOp")?;
252                    let props = req(x.props(), "CreateRelOp.props")?;
253                    Op::CreateRel {
254                        input: x.input(),
255                        src_col: x.src_col(),
256                        dst_col: x.dst_col(),
257                        rel_type: req(x.rel_type(), "CreateRelOp.rel_type")?.to_string(),
258                        props: deserialize_expr(props)?,
259                        schema: vec_coldefs(x.schema())?,
260                        out_var: x.out_var().unwrap_or("").to_string(),
261                    }
262                }
263                fb::OpNode::MergeOp => {
264                    let x = req(plan_op.node_as_merge_op(), "PlanOp.MergeOp")?;
265                    let pattern = req(x.pattern(), "MergeOp.pattern")?;
266                    let on_create_props = req(x.on_create_props(), "MergeOp.on_create_props")?;
267                    let on_match_props = req(x.on_match_props(), "MergeOp.on_match_props")?;
268                    Op::Merge {
269                        input: x.input(),
270                        pattern: deserialize_expr(pattern)?,
271                        on_create_props: deserialize_expr(on_create_props)?,
272                        on_match_props: deserialize_expr(on_match_props)?,
273                        schema: vec_coldefs(x.schema())?,
274                    }
275                }
276                fb::OpNode::DeleteOp => {
277                    let x = req(plan_op.node_as_delete_op(), "PlanOp.DeleteOp")?;
278                    Op::Delete {
279                        input: x.input(),
280                        target_col: x.target_col(),
281                        detach: x.detach(),
282                        schema: vec_coldefs(x.schema())?,
283                    }
284                }
285                fb::OpNode::SetPropertyOp => {
286                    let x = req(plan_op.node_as_set_property_op(), "PlanOp.SetPropertyOp")?;
287                    let value_expr = req(x.value_expr(), "SetPropertyOp.value_expr")?;
288                    Op::SetProperty {
289                        input: x.input(),
290                        target_col: x.target_col(),
291                        key: req(x.key(), "SetPropertyOp.key")?.to_string(),
292                        value_expr: deserialize_expr(value_expr)?,
293                        schema: vec_coldefs(x.schema())?,
294                    }
295                }
296                fb::OpNode::RemovePropertyOp => {
297                    let x = req(
298                        plan_op.node_as_remove_property_op(),
299                        "PlanOp.RemovePropertyOp",
300                    )?;
301                    Op::RemoveProperty {
302                        input: x.input(),
303                        target_col: x.target_col(),
304                        key: req(x.key(), "RemovePropertyOp.key")?.to_string(),
305                        schema: vec_coldefs(x.schema())?,
306                    }
307                }
308                fb::OpNode::VectorScanOp => {
309                    let x = req(plan_op.node_as_vector_scan_op(), "PlanOp.VectorScanOp")?;
310                    let query_vector = req(x.query_vector(), "VectorScanOp.query_vector")?;
311                    Op::VectorScan {
312                        input: x.input(),
313                        collection: req(x.collection(), "VectorScanOp.collection")?.to_string(),
314                        query_vector: deserialize_expr(query_vector)?,
315                        metric: from_fb_vector_metric(x.metric())?,
316                        top_k: x.top_k(),
317                        approx_hint: x.approx_hint(),
318                        schema: vec_coldefs(x.schema())?,
319                    }
320                }
321                fb::OpNode::RerankOp => {
322                    let x = req(plan_op.node_as_rerank_op(), "PlanOp.RerankOp")?;
323                    let score_expr = req(x.score_expr(), "RerankOp.score_expr")?;
324                    Op::Rerank {
325                        input: x.input(),
326                        score_expr: deserialize_expr(score_expr)?,
327                        top_k: x.top_k(),
328                        schema: vec_coldefs(x.schema())?,
329                    }
330                }
331                fb::OpNode::ReturnOp => {
332                    let x = req(plan_op.node_as_return_op(), "PlanOp.ReturnOp")?;
333                    Op::Return { input: x.input() }
334                }
335                fb::OpNode::ConstRowOp => Op::ConstRow,
336                _ => return Err(SerdeError::Unsupported("unknown OpNode variant")),
337            };
338            ops.push(op);
339        }
340    }
341
342    Ok(Plan {
343        version,
344        ops,
345        root_op: fb_plan.root_op(),
346    })
347}