1use anyhow::{Result, anyhow};
16use fxhash::FxHashSet;
17use mangle_ir::physical::{self, Aggregate, CmpOp, Condition, DataSource, Expr, Op, Operand};
18use mangle_ir::{Inst, InstId, Ir, NameId};
19
20pub struct Planner<'a> {
21 ir: &'a mut Ir,
22 delta_pred: Option<NameId>,
23 fresh_counter: usize,
24 hash_join: bool,
28}
29
30fn hash_join_env_default() -> bool {
34 std::env::var("MANGLE_HASHJOIN")
35 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
36 .unwrap_or(false)
37}
38
39fn splice_hash_join_body(op: Op, body: Op) -> Op {
42 match op {
43 Op::HashJoin {
44 build_source,
45 probe_source,
46 join_keys,
47 ..
48 } => Op::HashJoin {
49 build_source,
50 probe_source,
51 join_keys,
52 body: Box::new(body),
53 },
54 other => other,
55 }
56}
57
58impl<'a> Planner<'a> {
59 pub fn new(ir: &'a mut Ir) -> Self {
60 Self {
61 ir,
62 delta_pred: None,
63 fresh_counter: 0,
64 hash_join: hash_join_env_default(),
65 }
66 }
67
68 pub fn with_delta(mut self, delta_pred: NameId) -> Self {
69 self.delta_pred = Some(delta_pred);
70 self
71 }
72
73 pub fn with_hash_join(mut self, enabled: bool) -> Self {
77 self.hash_join = enabled;
78 self
79 }
80
81 pub fn plan_rule(mut self, rule_id: InstId) -> Result<Op> {
82 let (head, premises, transform) = match self.ir.get(rule_id) {
83 Inst::Rule {
84 head,
85 premises,
86 transform,
87 } => (*head, premises.clone(), transform.clone()),
88 _ => return Err(anyhow!("Not a rule")),
89 };
90
91 let blocks = self.split_transforms(transform);
93 let num_blocks = blocks.len();
94
95 let mut ops = Vec::new();
96 let mut current_source: Option<(NameId, Vec<NameId>)> = None;
97 let mut bound_vars = FxHashSet::default();
98
99 for (i, block) in blocks.into_iter().enumerate() {
100 let is_last = i == num_blocks - 1;
101
102 if i == 0 {
103 if is_last {
105 let op = self.plan_join_sequence(
107 premises.clone(),
108 &mut bound_vars,
109 |planner, vars| {
110 planner.plan_transforms_sequence(&block, vars, |p, v| {
111 p.plan_head_insert(head, v)
112 })
113 },
114 )?;
115 ops.push(op);
116 } else {
117 let temp_rel = self.fresh_var("temp_grp");
119 let mut capture_vars: Vec<NameId> = Vec::new(); let op = self.plan_join_sequence(
122 premises.clone(),
123 &mut bound_vars,
124 |planner, vars| {
125 planner.plan_transforms_sequence(&block, vars, |_, v| {
126 let mut sorted_vars: Vec<NameId> = v.iter().cloned().collect();
127 sorted_vars.sort();
128 capture_vars = sorted_vars.clone();
129 let args =
130 sorted_vars.iter().map(|&var| Operand::Var(var)).collect();
131 Ok(Op::Insert {
132 relation: temp_rel,
133 args,
134 })
135 })
136 },
137 )?;
138 ops.push(op);
139 current_source = Some((temp_rel, capture_vars));
140 }
141 } else {
142 let (src_rel, src_vars) = current_source.take().expect("No source for aggregation");
144
145 if is_last {
146 let op = self.plan_block_k(src_rel, src_vars, &block, |p, v| {
147 p.plan_head_insert(head, v)
148 })?;
149 ops.push(op);
150 } else {
151 let next_temp = self.fresh_var("temp_grp");
152 let mut next_vars: Vec<NameId> = Vec::new();
153
154 let op = self.plan_block_k(src_rel, src_vars, &block, |_, v| {
155 let mut sorted_vars: Vec<NameId> = v.iter().cloned().collect();
156 sorted_vars.sort();
157 next_vars = sorted_vars.clone();
158 let args = sorted_vars.iter().map(|&var| Operand::Var(var)).collect();
159 Ok(Op::Insert {
160 relation: next_temp,
161 args,
162 })
163 })?;
164 ops.push(op);
165 current_source = Some((next_temp, next_vars));
166 }
167 }
168 }
169
170 if ops.len() == 1 {
171 Ok(ops.remove(0))
172 } else {
173 Ok(Op::Seq(ops))
174 }
175 }
176
177 fn split_transforms(&self, transforms: Vec<InstId>) -> Vec<Vec<InstId>> {
178 let mut blocks = Vec::new();
179 let mut current = Vec::new();
180 for t in transforms {
181 let inst = self.ir.get(t);
182 if let Inst::Transform { var: None, .. } = inst {
183 blocks.push(current);
184 current = Vec::new();
185 }
186 current.push(t);
187 }
188 blocks.push(current);
189 blocks
190 }
191
192 fn plan_block_k<F>(
193 &mut self,
194 source_rel: NameId,
195 source_vars: Vec<NameId>,
196 block: &[InstId],
197 continuation: F,
198 ) -> Result<Op>
199 where
200 F: FnOnce(&mut Self, &mut FxHashSet<NameId>) -> Result<Op>,
201 {
202 let do_stmt = block[0];
203 let rest = &block[1..];
204
205 let keys_insts = self.get_transform_app_args(do_stmt)?;
206 let mut keys = Vec::new();
207 for k in keys_insts {
208 if let Inst::Var(v) = self.ir.get(k) {
209 keys.push(*v);
210 } else {
211 return Err(anyhow!("GroupBy keys must be variables"));
212 }
213 }
214
215 let mut aggregates = Vec::new();
216 let mut lets = Vec::new();
217 for &t in rest {
218 if let Some(agg) = self.try_parse_aggregate(t)? {
219 aggregates.push(agg);
220 } else {
221 lets.push(t);
222 }
223 }
224
225 let mut inner_vars = FxHashSet::default();
226 for &k in &keys {
227 inner_vars.insert(k);
228 }
229 for agg in &aggregates {
230 inner_vars.insert(agg.var);
231 }
232
233 let body = self.plan_transforms_sequence(&lets, &mut inner_vars, continuation)?;
234
235 Ok(Op::GroupBy {
236 source: source_rel,
237 vars: source_vars,
238 keys,
239 aggregates,
240 body: Box::new(body),
241 })
242 }
243
244 fn plan_transforms_sequence<F>(
245 &mut self,
246 transforms: &[InstId],
247 bound_vars: &mut FxHashSet<NameId>,
248 continuation: F,
249 ) -> Result<Op>
250 where
251 F: FnOnce(&mut Self, &mut FxHashSet<NameId>) -> Result<Op>,
252 {
253 if transforms.is_empty() {
254 return continuation(self, bound_vars);
255 }
256
257 let t_id = transforms[0];
258 let rest = &transforms[1..];
259
260 let inst = self.ir.get(t_id).clone();
261 if let Inst::Transform {
262 var: Some(var),
263 app,
264 } = inst
265 {
266 self.inst_to_expr(app, |planner, expr| {
267 bound_vars.insert(var);
268 let body = planner.plan_transforms_sequence(rest, bound_vars, continuation)?;
269 Ok(Op::Let {
270 var,
271 expr,
272 body: Box::new(body),
273 })
274 })
275 } else {
276 Err(anyhow!("Unexpected transform in sequence"))
278 }
279 }
280
281 fn fresh_var(&mut self, prefix: &str) -> NameId {
282 let id = self.fresh_counter;
283 self.fresh_counter += 1;
284 let name = format!("${}_{}", prefix, id);
285 self.ir.intern_name(name)
286 }
287
288 fn try_plan_hash_join(
294 &self,
295 predicate: NameId,
296 args: &[InstId],
297 premises: &mut Vec<InstId>,
298 bound_vars: &mut FxHashSet<NameId>,
299 ) -> Result<Option<Op>> {
300 let Some(build_vars) = self.all_fresh_distinct_vars(args, bound_vars) else {
301 return Ok(None);
302 };
303 let next_id = *premises.first().unwrap();
304 let next_inst = self.ir.get(next_id).clone();
305 let Inst::Atom {
306 predicate: next_pred,
307 args: next_args,
308 } = next_inst
309 else {
310 return Ok(None);
311 };
312 if Some(next_pred) == self.delta_pred {
313 return Ok(None);
314 }
315 let Some(probe_vars) = self.all_fresh_distinct_vars(&next_args, bound_vars) else {
316 return Ok(None);
317 };
318 let join_keys: Vec<NameId> = build_vars
319 .iter()
320 .filter(|v| probe_vars.contains(v))
321 .copied()
322 .collect();
323 if join_keys.is_empty() {
324 return Ok(None);
325 }
326
327 premises.remove(0);
329 for v in build_vars.iter().chain(probe_vars.iter()) {
330 bound_vars.insert(*v);
331 }
332
333 Ok(Some(Op::HashJoin {
334 build_source: DataSource::Scan {
335 relation: predicate,
336 vars: build_vars,
337 },
338 probe_source: DataSource::Scan {
339 relation: next_pred,
340 vars: probe_vars,
341 },
342 join_keys,
343 body: Box::new(Op::Nop),
344 }))
345 }
346
347 fn all_fresh_distinct_vars(
352 &self,
353 args: &[InstId],
354 bound_vars: &FxHashSet<NameId>,
355 ) -> Option<Vec<NameId>> {
356 let mut out: Vec<NameId> = Vec::with_capacity(args.len());
357 for arg in args {
358 let Inst::Var(v) = self.ir.get(*arg) else {
359 return None;
360 };
361 if bound_vars.contains(v) || out.contains(v) {
362 return None;
363 }
364 out.push(*v);
365 }
366 Some(out)
367 }
368
369 fn plan_join_sequence<F>(
370 &mut self,
371 mut premises: Vec<InstId>,
372 bound_vars: &mut FxHashSet<NameId>,
373 continuation: F,
374 ) -> Result<Op>
375 where
376 F: FnOnce(&mut Self, &mut FxHashSet<NameId>) -> Result<Op>,
377 {
378 if premises.is_empty() {
379 return continuation(self, bound_vars);
380 }
381
382 let current_premise = premises.remove(0);
383 let inst = self.ir.get(current_premise).clone();
384
385 match inst {
386 Inst::Atom { predicate, args }
387 if matches!(
388 self.ir.resolve_name(predicate),
389 ":lt" | ":le" | ":gt" | ":ge"
390 | ":time:lt" | ":time:le" | ":time:gt" | ":time:ge"
391 | ":duration:lt" | ":duration:le" | ":duration:gt" | ":duration:ge"
392 ) =>
393 {
394 let cmp_op = match self.ir.resolve_name(predicate) {
395 ":lt" | ":time:lt" | ":duration:lt" => CmpOp::Lt,
396 ":le" | ":time:le" | ":duration:le" => CmpOp::Le,
397 ":gt" | ":time:gt" | ":duration:gt" => CmpOp::Gt,
398 ":ge" | ":time:ge" | ":duration:ge" => CmpOp::Ge,
399 _ => unreachable!(),
400 };
401 if args.len() != 2 {
402 return Err(anyhow!("Comparison predicate requires exactly 2 arguments"));
403 }
404 let body = self.plan_join_sequence(premises, bound_vars, continuation)?;
405 self.with_eval(args[0], |this, left_op| {
406 this.with_eval(args[1], |_this, right_op| {
407 Ok(Op::Filter {
408 cond: Condition::Cmp {
409 op: cmp_op,
410 left: left_op.clone(),
411 right: right_op,
412 },
413 body: Box::new(body),
414 })
415 })
416 })
417 }
418 Inst::Atom { predicate, args }
419 if matches!(
420 self.ir.resolve_name(predicate),
421 ":string:starts_with"
422 | ":string:ends_with"
423 | ":string:contains"
424 | ":match_prefix"
425 ) =>
426 {
427 if args.len() != 2 {
428 return Err(anyhow!(
429 "Built-in predicate requires exactly 2 arguments"
430 ));
431 }
432 let body = self.plan_join_sequence(premises, bound_vars, continuation)?;
433 self.with_eval(args[0], |this, left_op| {
434 this.with_eval(args[1], |_this, right_op| {
435 Ok(Op::Filter {
436 cond: Condition::Call {
437 function: predicate,
438 args: vec![left_op.clone(), right_op],
439 },
440 body: Box::new(body),
441 })
442 })
443 })
444 }
445 Inst::Atom { predicate, args } => {
446 if self.hash_join
457 && self.delta_pred.is_none()
458 && !premises.is_empty()
459 {
460 if let Some(op) = self.try_plan_hash_join(
461 predicate,
462 &args,
463 &mut premises,
464 bound_vars,
465 )? {
466 let body_op = self.plan_join_sequence(premises, bound_vars, continuation)?;
467 return Ok(splice_hash_join_body(op, body_op));
468 }
469 }
470
471 let mut scan_vars = Vec::new();
472 let mut new_bindings = Vec::new();
473
474 let mut index_lookup: Option<(usize, Operand)> = None;
476
477 for (i, arg) in args.iter().enumerate() {
478 let arg_inst = self.ir.get(*arg).clone();
479 match arg_inst {
480 Inst::Var(v) if bound_vars.contains(&v) => {
481 if index_lookup.is_none() {
482 index_lookup = Some((i, Operand::Var(v)));
483 }
484 }
485 Inst::Number(n) => {
486 if index_lookup.is_none() {
487 index_lookup =
488 Some((i, Operand::Const(physical::Constant::Number(n))));
489 }
490 }
491 Inst::String(s) => {
492 if index_lookup.is_none() {
493 index_lookup =
494 Some((i, Operand::Const(physical::Constant::String(s))));
495 }
496 }
497 Inst::Name(n) => {
498 if index_lookup.is_none() {
499 index_lookup =
500 Some((i, Operand::Const(physical::Constant::Name(n))));
501 }
502 }
503 Inst::Float(fl) => {
504 if index_lookup.is_none() {
505 index_lookup =
506 Some((i, Operand::Const(physical::Constant::Float(fl))));
507 }
508 }
509 Inst::Time(t) => {
510 if index_lookup.is_none() {
511 index_lookup =
512 Some((i, Operand::Const(physical::Constant::Time(t))));
513 }
514 }
515 Inst::Duration(d) => {
516 if index_lookup.is_none() {
517 index_lookup =
518 Some((i, Operand::Const(physical::Constant::Duration(d))));
519 }
520 }
521 _ => {}
522 }
523 }
524
525 for arg in &args {
526 if let Inst::Var(v) = self.ir.get(*arg)
527 && !bound_vars.contains(v)
528 {
529 scan_vars.push(*v);
530 new_bindings.push(*v);
531 continue;
532 }
533 let tmp = self.fresh_var("scan");
534 scan_vars.push(tmp);
535 new_bindings.push(tmp);
536 }
537
538 for v in &new_bindings {
539 bound_vars.insert(*v);
540 }
541
542 let body = self.plan_join_sequence(premises, bound_vars, continuation)?;
543 let wrapped_body = self.apply_constraints(&args, &scan_vars, body)?;
544
545 let source = if let Some((col_idx, key)) = index_lookup {
546 DataSource::IndexLookup {
547 relation: predicate,
548 col_idx,
549 key,
550 vars: scan_vars,
551 }
552 } else if Some(predicate) == self.delta_pred {
553 DataSource::ScanDelta {
554 relation: predicate,
555 vars: scan_vars,
556 }
557 } else {
558 DataSource::Scan {
559 relation: predicate,
560 vars: scan_vars,
561 }
562 };
563 Ok(Op::Iterate {
564 source,
565 body: Box::new(wrapped_body),
566 })
567 }
568 Inst::Eq(l, r) => {
569 let body = self.plan_join_sequence(premises, bound_vars, continuation)?;
570 self.wrap_eq_check(l, r, body)
571 }
572 Inst::Ineq(l, r) => {
573 let body = self.plan_join_sequence(premises, bound_vars, continuation)?;
574 self.with_eval(l, |this, left_op| {
575 this.with_eval(r, |_this, right_op| {
576 Ok(Op::Filter {
577 cond: Condition::Cmp {
578 op: CmpOp::Neq,
579 left: left_op.clone(),
580 right: right_op,
581 },
582 body: Box::new(body),
583 })
584 })
585 })
586 }
587 Inst::NegAtom(inner) => {
588 let inner_inst = self.ir.get(inner).clone();
589 if let Inst::Atom { predicate, args } = inner_inst {
590 let body = self.plan_join_sequence(premises, bound_vars, continuation)?;
591 let mut neg_args = Vec::new();
592 for arg in &args {
593 let arg_inst = self.ir.get(*arg).clone();
594 match arg_inst {
595 Inst::Var(v) => neg_args.push(Operand::Var(v)),
596 Inst::Number(n) => {
597 neg_args.push(Operand::Const(physical::Constant::Number(n)))
598 }
599 Inst::String(s) => {
600 neg_args.push(Operand::Const(physical::Constant::String(s)))
601 }
602 Inst::Name(n) => {
603 neg_args.push(Operand::Const(physical::Constant::Name(n)))
604 }
605 Inst::Float(fl) => {
606 neg_args.push(Operand::Const(physical::Constant::Float(fl)))
607 }
608 Inst::Time(t) => {
609 neg_args.push(Operand::Const(physical::Constant::Time(t)))
610 }
611 Inst::Duration(d) => {
612 neg_args.push(Operand::Const(physical::Constant::Duration(d)))
613 }
614 _ => return Err(anyhow!("Complex expression in negated atom")),
615 }
616 }
617 Ok(Op::Filter {
618 cond: Condition::Negation {
619 relation: predicate,
620 args: neg_args,
621 },
622 body: Box::new(body),
623 })
624 } else {
625 Err(anyhow!("NegAtom wraps non-atom"))
626 }
627 }
628 _ => Err(anyhow!("Unsupported premise type: {:?}", inst)),
629 }
630 }
631
632 fn apply_constraints(
633 &mut self,
634 args: &[InstId],
635 scan_vars: &[NameId],
636 mut body: Op,
637 ) -> Result<Op> {
638 for (i, arg) in args.iter().enumerate().rev() {
639 let scan_var = scan_vars[i];
640 let arg_inst = self.ir.get(*arg).clone();
641 match arg_inst {
642 Inst::Var(v) => {
643 if v == scan_var {
644 continue;
645 }
646 body = Op::Filter {
647 cond: Condition::Cmp {
648 op: CmpOp::Eq,
649 left: Operand::Var(scan_var),
650 right: Operand::Var(v),
651 },
652 body: Box::new(body),
653 };
654 }
655 _ => {
656 body = self.wrap_eval_check(*arg, Operand::Var(scan_var), body)?;
657 }
658 }
659 }
660 Ok(body)
661 }
662
663 fn wrap_eq_check(&mut self, l: InstId, r: InstId, body: Op) -> Result<Op> {
664 self.with_eval(l, |this, op_l| {
665 this.with_eval(r, |_this, op_r| {
666 Ok(Op::Filter {
667 cond: Condition::Cmp {
668 op: CmpOp::Eq,
669 left: op_l,
670 right: op_r,
671 },
672 body: Box::new(body),
673 })
674 })
675 })
676 }
677
678 fn wrap_eval_check(&mut self, inst: InstId, target: Operand, body: Op) -> Result<Op> {
679 self.with_eval(inst, |_this, op| {
680 Ok(Op::Filter {
681 cond: Condition::Cmp {
682 op: CmpOp::Eq,
683 left: target,
684 right: op,
685 },
686 body: Box::new(body),
687 })
688 })
689 }
690
691 fn with_eval<F>(&mut self, inst: InstId, f: F) -> Result<Op>
692 where
693 F: FnOnce(&mut Self, Operand) -> Result<Op>,
694 {
695 let i = self.ir.get(inst).clone();
696 match i {
697 Inst::Var(v) => f(self, Operand::Var(v)),
698 Inst::String(s) => f(self, Operand::Const(physical::Constant::String(s))),
699 Inst::Number(n) => f(self, Operand::Const(physical::Constant::Number(n))),
700 Inst::Name(n) => f(self, Operand::Const(physical::Constant::Name(n))),
701 Inst::Float(fl) => f(self, Operand::Const(physical::Constant::Float(fl))),
702 Inst::Time(t) => f(self, Operand::Const(physical::Constant::Time(t))),
703 Inst::Duration(d) => f(self, Operand::Const(physical::Constant::Duration(d))),
704 Inst::ApplyFn { function, args } => self.with_eval_args(
705 &args,
706 0,
707 Vec::new(),
708 Box::new(|this, ops| {
709 let tmp = this.fresh_var("call");
710 let inner = f(this, Operand::Var(tmp))?;
711 Ok(Op::Let {
712 var: tmp,
713 expr: Expr::Call {
714 function,
715 args: ops,
716 },
717 body: Box::new(inner),
718 })
719 }),
720 ),
721 Inst::List(args) => {
723 let fn_name = self.ir.intern_name("fn:list".to_string());
724 self.with_eval_args(
725 &args,
726 0,
727 Vec::new(),
728 Box::new(|this, ops| {
729 let tmp = this.fresh_var("list");
730 let inner = f(this, Operand::Var(tmp))?;
731 Ok(Op::Let {
732 var: tmp,
733 expr: Expr::Call {
734 function: fn_name,
735 args: ops,
736 },
737 body: Box::new(inner),
738 })
739 }),
740 )
741 }
742 Inst::Map { keys, values } => {
743 let mut interleaved = Vec::with_capacity(keys.len() + values.len());
745 for (k, v) in keys.iter().zip(values.iter()) {
746 interleaved.push(*k);
747 interleaved.push(*v);
748 }
749 let fn_name = self.ir.intern_name("fn:map".to_string());
750 self.with_eval_args(
751 &interleaved,
752 0,
753 Vec::new(),
754 Box::new(|this, ops| {
755 let tmp = this.fresh_var("map");
756 let inner = f(this, Operand::Var(tmp))?;
757 Ok(Op::Let {
758 var: tmp,
759 expr: Expr::Call {
760 function: fn_name,
761 args: ops,
762 },
763 body: Box::new(inner),
764 })
765 }),
766 )
767 }
768 Inst::Struct { fields, values } => {
769 let mut interleaved = Vec::with_capacity(fields.len() + values.len());
771 for (field, val) in fields.iter().zip(values.iter()) {
772 let name_inst = self.ir.add_inst(Inst::Name(*field));
774 interleaved.push(name_inst);
775 interleaved.push(*val);
776 }
777 let fn_name = self.ir.intern_name("fn:struct".to_string());
778 self.with_eval_args(
779 &interleaved,
780 0,
781 Vec::new(),
782 Box::new(|this, ops| {
783 let tmp = this.fresh_var("struct");
784 let inner = f(this, Operand::Var(tmp))?;
785 Ok(Op::Let {
786 var: tmp,
787 expr: Expr::Call {
788 function: fn_name,
789 args: ops,
790 },
791 body: Box::new(inner),
792 })
793 }),
794 )
795 }
796 _ => Err(anyhow!("Unsupported expression in evaluation")),
797 }
798 }
799
800 fn inst_to_expr<F>(&mut self, inst: InstId, f: F) -> Result<Op>
801 where
802 F: FnOnce(&mut Self, Expr) -> Result<Op>,
803 {
804 let i = self.ir.get(inst).clone();
805 match i {
806 Inst::ApplyFn { function, args } => self.with_eval_args(
807 &args,
808 0,
809 Vec::new(),
810 Box::new(|this, ops| {
811 f(
812 this,
813 Expr::Call {
814 function,
815 args: ops,
816 },
817 )
818 }),
819 ),
820 _ => self.with_eval(inst, |this, op| f(this, Expr::Value(op))),
821 }
822 }
823
824 fn with_eval_args(
825 &mut self,
826 args: &[InstId],
827 index: usize,
828 mut acc: Vec<Operand>,
829 f: Box<dyn FnOnce(&mut Self, Vec<Operand>) -> Result<Op> + '_>,
830 ) -> Result<Op> {
831 if index >= args.len() {
832 return f(self, acc);
833 }
834 self.with_eval(args[index], |this, op| {
835 acc.push(op);
836 this.with_eval_args(args, index + 1, acc, f)
837 })
838 }
839
840 fn plan_head_insert(
841 &mut self,
842 head: InstId,
843 _bound_vars: &mut FxHashSet<NameId>,
844 ) -> Result<Op> {
845 let inst = self.ir.get(head).clone();
846 if let Inst::Atom { predicate, args } = inst {
847 self.with_eval_args(
848 &args,
849 0,
850 Vec::new(),
851 Box::new(|_this, ops| {
852 Ok(Op::Insert {
853 relation: predicate,
854 args: ops,
855 })
856 }),
857 )
858 } else {
859 Err(anyhow!("Head must be an atom"))
860 }
861 }
862
863 fn get_transform_app_args(&self, t_id: InstId) -> Result<Vec<InstId>> {
864 if let Inst::Transform { app, .. } = self.ir.get(t_id)
865 && let Inst::ApplyFn { args, .. } = self.ir.get(*app)
866 {
867 return Ok(args.clone());
868 }
869 Err(anyhow!("Invalid transform structure"))
870 }
871
872 fn try_parse_aggregate(&mut self, t_id: InstId) -> Result<Option<Aggregate>> {
873 let inst = self.ir.get(t_id).clone();
874 if let Inst::Transform {
875 var: Some(var),
876 app,
877 } = inst
878 && let Inst::ApplyFn { function, args } = self.ir.get(app).clone()
879 {
880 let func_name = self.ir.resolve_name(function);
881 if matches!(
882 func_name,
883 "fn:sum"
884 | "fn:count"
885 | "fn:max"
886 | "fn:min"
887 | "fn:collect"
888 | "fn:float:sum"
889 | "fn:float:max"
890 | "fn:float:min"
891 ) {
892 let mut op_args = Vec::new();
893 for arg in args {
894 let arg_inst = self.ir.get(arg).clone();
895 match arg_inst {
896 Inst::Var(v) => op_args.push(Operand::Var(v)),
897 Inst::Number(n) => {
898 op_args.push(Operand::Const(physical::Constant::Number(n)))
899 }
900 Inst::Float(fl) => {
901 op_args.push(Operand::Const(physical::Constant::Float(fl)))
902 }
903 _ => {
904 return Err(anyhow!(
905 "Complex expressions in aggregates not supported yet"
906 ));
907 }
908 }
909 }
910 return Ok(Some(Aggregate {
911 var,
912 func: function,
913 args: op_args,
914 }));
915 }
916 }
917 Ok(None)
918 }
919}