1use crate::convert::{from_fb_expand_dir, from_fb_sort_dir, from_fb_vector_metric};
2use crate::expr::deserialize_expr;
3use crate::plan::common::{req, vec_coldefs, vec_strings};
4use crate::plexus_generated::plexus as fb;
5use crate::{Op, Plan, SerdeError, Version};
6
7pub fn deserialize_plan(bytes: &[u8]) -> Result<Plan, SerdeError> {
8 let fb_plan = fb::root_as_plexus_plan(bytes)?;
9 let fb_version = fb_plan.version();
10 let version = Version {
11 major: fb_version.major(),
12 minor: fb_version.minor(),
13 patch: fb_version.patch(),
14 producer: req(fb_version.producer(), "Version.producer")?.to_string(),
15 };
16
17 let mut ops = Vec::new();
18 if let Some(fb_ops) = fb_plan.ops() {
19 for i in 0..fb_ops.len() {
20 let plan_op = fb_ops.get(i);
21 let op = match plan_op.node_type() {
22 fb::OpNode::ScanNodesOp => {
23 let x = req(plan_op.node_as_scan_nodes_op(), "PlanOp.ScanNodesOp")?;
24 Op::ScanNodes {
25 labels: vec_strings(x.labels()),
26 schema: vec_coldefs(x.schema())?,
27 must_labels: vec_strings(x.must_labels()),
28 forbidden_labels: vec_strings(x.forbidden_labels()),
29 est_rows: x.est_rows(),
30 selectivity: x.selectivity(),
31 graph_ref: x.graph_ref().map(str::to_string),
32 }
33 }
34 fb::OpNode::ScanRelsOp => {
35 let x = req(plan_op.node_as_scan_rels_op(), "PlanOp.ScanRelsOp")?;
36 Op::ScanRels {
37 types: vec_strings(x.types()),
38 schema: vec_coldefs(x.schema())?,
39 src_labels: vec_strings(x.src_labels()),
40 dst_labels: vec_strings(x.dst_labels()),
41 est_rows: x.est_rows(),
42 selectivity: x.selectivity(),
43 }
44 }
45 fb::OpNode::ExpandOp => {
46 let x = req(plan_op.node_as_expand_op(), "PlanOp.ExpandOp")?;
47 Op::Expand {
48 input: x.input(),
49 src_col: x.src_col(),
50 types: vec_strings(x.types()),
51 dir: from_fb_expand_dir(x.dir())?,
52 schema: vec_coldefs(x.schema())?,
53 src_var: x.src_var().unwrap_or("").to_string(),
54 rel_var: x.rel_var().unwrap_or("").to_string(),
55 dst_var: x.dst_var().unwrap_or("").to_string(),
56 legal_src_labels: vec_strings(x.legal_src_labels()),
57 legal_dst_labels: vec_strings(x.legal_dst_labels()),
58 est_degree: x.est_degree(),
59 graph_ref: x.graph_ref().map(str::to_string),
60 }
61 }
62 fb::OpNode::OptionalExpandOp => {
63 let x = plan_op
64 .node_as_optional_expand_op()
65 .ok_or(SerdeError::MissingField("PlanOp.OptionalExpandOp"))?;
66 Op::OptionalExpand {
67 input: x.input(),
68 src_col: x.src_col(),
69 types: vec_strings(x.types()),
70 dir: from_fb_expand_dir(x.dir())?,
71 schema: vec_coldefs(x.schema())?,
72 src_var: x.src_var().unwrap_or("").to_string(),
73 rel_var: x.rel_var().unwrap_or("").to_string(),
74 dst_var: x.dst_var().unwrap_or("").to_string(),
75 legal_src_labels: vec_strings(x.legal_src_labels()),
76 legal_dst_labels: vec_strings(x.legal_dst_labels()),
77 graph_ref: x.graph_ref().map(str::to_string),
78 }
79 }
80 fb::OpNode::SemiExpandOp => {
81 let x = plan_op
82 .node_as_semi_expand_op()
83 .ok_or(SerdeError::MissingField("PlanOp.SemiExpandOp"))?;
84 Op::SemiExpand {
85 input: x.input(),
86 src_col: x.src_col(),
87 types: vec_strings(x.types()),
88 dir: from_fb_expand_dir(x.dir())?,
89 schema: vec_coldefs(x.schema())?,
90 legal_src_labels: vec_strings(x.legal_src_labels()),
91 legal_dst_labels: vec_strings(x.legal_dst_labels()),
92 }
93 }
94 fb::OpNode::ExpandVarLenOp => {
95 let x = plan_op
96 .node_as_expand_var_len_op()
97 .ok_or(SerdeError::MissingField("PlanOp.ExpandVarLenOp"))?;
98 Op::ExpandVarLen {
99 input: x.input(),
100 src_col: x.src_col(),
101 types: vec_strings(x.types()),
102 dir: from_fb_expand_dir(x.dir())?,
103 min_hops: x.min_hops(),
104 max_hops: x.max_hops(),
105 schema: vec_coldefs(x.schema())?,
106 src_var: x.src_var().unwrap_or("").to_string(),
107 path_var: x.path_var().unwrap_or("").to_string(),
108 dst_var: x.dst_var().unwrap_or("").to_string(),
109 graph_ref: x.graph_ref().map(str::to_string),
110 }
111 }
112 fb::OpNode::FilterOp => {
113 let x = req(plan_op.node_as_filter_op(), "PlanOp.FilterOp")?;
114 let pred = req(x.predicate(), "FilterOp.predicate")?;
115 Op::Filter {
116 input: x.input(),
117 predicate: deserialize_expr(pred)?,
118 }
119 }
120 fb::OpNode::BlockMarkerOp => {
121 let x = plan_op
122 .node_as_block_marker_op()
123 .ok_or(SerdeError::MissingField("PlanOp.BlockMarkerOp"))?;
124 Op::BlockMarker {
125 input: x.input(),
126 block_id: x.block_id(),
127 branch_id: x.branch_id(),
128 }
129 }
130 fb::OpNode::ProjectOp => {
131 let x = req(plan_op.node_as_project_op(), "PlanOp.ProjectOp")?;
132 let exprs = x
133 .exprs()
134 .map(|v| {
135 (0..v.len())
136 .map(|j| deserialize_expr(v.get(j)))
137 .collect::<Result<Vec<_>, _>>()
138 })
139 .transpose()?
140 .unwrap_or_default();
141 Op::Project {
142 input: x.input(),
143 exprs,
144 schema: vec_coldefs(x.schema())?,
145 }
146 }
147 fb::OpNode::AggregateOp => {
148 let x = plan_op
149 .node_as_aggregate_op()
150 .ok_or(SerdeError::MissingField("PlanOp.AggregateOp"))?;
151 let keys = x
152 .keys()
153 .map(|v| (0..v.len()).map(|j| v.get(j)).collect())
154 .unwrap_or_default();
155 let aggs = x
156 .aggs()
157 .map(|v| {
158 (0..v.len())
159 .map(|j| deserialize_expr(v.get(j)))
160 .collect::<Result<Vec<_>, _>>()
161 })
162 .transpose()?
163 .unwrap_or_default();
164 Op::Aggregate {
165 input: x.input(),
166 keys,
167 aggs,
168 schema: vec_coldefs(x.schema())?,
169 }
170 }
171 fb::OpNode::SortOp => {
172 let x = req(plan_op.node_as_sort_op(), "PlanOp.SortOp")?;
173 let keys = x
174 .keys()
175 .map(|v| (0..v.len()).map(|j| v.get(j)).collect())
176 .unwrap_or_default();
177 let dirs = x
178 .dirs()
179 .map(|v| {
180 (0..v.len())
181 .map(|j| from_fb_sort_dir(v.get(j)))
182 .collect::<Result<Vec<_>, _>>()
183 })
184 .transpose()?
185 .unwrap_or_default();
186 Op::Sort {
187 input: x.input(),
188 keys,
189 dirs,
190 }
191 }
192 fb::OpNode::LimitOp => {
193 let x = req(plan_op.node_as_limit_op(), "PlanOp.LimitOp")?;
194 Op::Limit {
195 input: x.input(),
196 count: x.count(),
197 skip: x.skip(),
198 cursor: x.cursor().map(|v| v.iter().map(|b| b as u8).collect()),
199 emit_cursor: x.emit_cursor(),
200 }
201 }
202 fb::OpNode::UnwindOp => {
203 let x = plan_op
204 .node_as_unwind_op()
205 .ok_or(SerdeError::MissingField("PlanOp.UnwindOp"))?;
206 let list_expr = req(x.list_expr(), "UnwindOp.list_expr")?;
207 Op::Unwind {
208 input: x.input(),
209 list_expr: deserialize_expr(list_expr)?,
210 out_var: req(x.out_var(), "UnwindOp.out_var")?.to_string(),
211 schema: vec_coldefs(x.schema())?,
212 }
213 }
214 fb::OpNode::PathConstructOp => {
215 let x = plan_op
216 .node_as_path_construct_op()
217 .ok_or(SerdeError::MissingField("PlanOp.PathConstructOp"))?;
218 let rel_cols = x
219 .rel_cols()
220 .map(|v| (0..v.len()).map(|j| v.get(j)).collect())
221 .unwrap_or_default();
222 Op::PathConstruct {
223 input: x.input(),
224 rel_cols,
225 schema: vec_coldefs(x.schema())?,
226 }
227 }
228 fb::OpNode::UnionOp => {
229 let x = plan_op
230 .node_as_union_op()
231 .ok_or(SerdeError::MissingField("PlanOp.UnionOp"))?;
232 Op::Union {
233 lhs: x.lhs(),
234 rhs: x.rhs(),
235 all: x.all(),
236 schema: vec_coldefs(x.schema())?,
237 }
238 }
239 fb::OpNode::CreateNodeOp => {
240 let x = req(plan_op.node_as_create_node_op(), "PlanOp.CreateNodeOp")?;
241 let props = req(x.props(), "CreateNodeOp.props")?;
242 Op::CreateNode {
243 input: x.input(),
244 labels: vec_strings(x.labels()),
245 props: deserialize_expr(props)?,
246 schema: vec_coldefs(x.schema())?,
247 out_var: x.out_var().unwrap_or("").to_string(),
248 }
249 }
250 fb::OpNode::CreateRelOp => {
251 let x = req(plan_op.node_as_create_rel_op(), "PlanOp.CreateRelOp")?;
252 let props = req(x.props(), "CreateRelOp.props")?;
253 Op::CreateRel {
254 input: x.input(),
255 src_col: x.src_col(),
256 dst_col: x.dst_col(),
257 rel_type: req(x.rel_type(), "CreateRelOp.rel_type")?.to_string(),
258 props: deserialize_expr(props)?,
259 schema: vec_coldefs(x.schema())?,
260 out_var: x.out_var().unwrap_or("").to_string(),
261 }
262 }
263 fb::OpNode::MergeOp => {
264 let x = req(plan_op.node_as_merge_op(), "PlanOp.MergeOp")?;
265 let pattern = req(x.pattern(), "MergeOp.pattern")?;
266 let on_create_props = req(x.on_create_props(), "MergeOp.on_create_props")?;
267 let on_match_props = req(x.on_match_props(), "MergeOp.on_match_props")?;
268 Op::Merge {
269 input: x.input(),
270 pattern: deserialize_expr(pattern)?,
271 on_create_props: deserialize_expr(on_create_props)?,
272 on_match_props: deserialize_expr(on_match_props)?,
273 schema: vec_coldefs(x.schema())?,
274 }
275 }
276 fb::OpNode::DeleteOp => {
277 let x = req(plan_op.node_as_delete_op(), "PlanOp.DeleteOp")?;
278 Op::Delete {
279 input: x.input(),
280 target_col: x.target_col(),
281 detach: x.detach(),
282 schema: vec_coldefs(x.schema())?,
283 }
284 }
285 fb::OpNode::SetPropertyOp => {
286 let x = req(plan_op.node_as_set_property_op(), "PlanOp.SetPropertyOp")?;
287 let value_expr = req(x.value_expr(), "SetPropertyOp.value_expr")?;
288 Op::SetProperty {
289 input: x.input(),
290 target_col: x.target_col(),
291 key: req(x.key(), "SetPropertyOp.key")?.to_string(),
292 value_expr: deserialize_expr(value_expr)?,
293 schema: vec_coldefs(x.schema())?,
294 }
295 }
296 fb::OpNode::RemovePropertyOp => {
297 let x = req(
298 plan_op.node_as_remove_property_op(),
299 "PlanOp.RemovePropertyOp",
300 )?;
301 Op::RemoveProperty {
302 input: x.input(),
303 target_col: x.target_col(),
304 key: req(x.key(), "RemovePropertyOp.key")?.to_string(),
305 schema: vec_coldefs(x.schema())?,
306 }
307 }
308 fb::OpNode::VectorScanOp => {
309 let x = req(plan_op.node_as_vector_scan_op(), "PlanOp.VectorScanOp")?;
310 let query_vector = req(x.query_vector(), "VectorScanOp.query_vector")?;
311 Op::VectorScan {
312 input: x.input(),
313 collection: req(x.collection(), "VectorScanOp.collection")?.to_string(),
314 query_vector: deserialize_expr(query_vector)?,
315 metric: from_fb_vector_metric(x.metric())?,
316 top_k: x.top_k(),
317 approx_hint: x.approx_hint(),
318 schema: vec_coldefs(x.schema())?,
319 }
320 }
321 fb::OpNode::RerankOp => {
322 let x = req(plan_op.node_as_rerank_op(), "PlanOp.RerankOp")?;
323 let score_expr = req(x.score_expr(), "RerankOp.score_expr")?;
324 Op::Rerank {
325 input: x.input(),
326 score_expr: deserialize_expr(score_expr)?,
327 top_k: x.top_k(),
328 schema: vec_coldefs(x.schema())?,
329 }
330 }
331 fb::OpNode::ReturnOp => {
332 let x = req(plan_op.node_as_return_op(), "PlanOp.ReturnOp")?;
333 Op::Return { input: x.input() }
334 }
335 fb::OpNode::ConstRowOp => Op::ConstRow,
336 _ => return Err(SerdeError::Unsupported("unknown OpNode variant")),
337 };
338 ops.push(op);
339 }
340 }
341
342 Ok(Plan {
343 version,
344 ops,
345 root_op: fb_plan.root_op(),
346 })
347}