graphannis/annis/db/aql/operators/
edge_op.rs

1use crate::annis::db::aql::{model::AnnotationComponentType, operators::RangeSpec};
2use crate::annis::db::exec::CostEstimate;
3use crate::annis::errors::GraphAnnisError;
4use crate::annis::operator::{
5    BinaryOperator, BinaryOperatorBase, BinaryOperatorIndex, BinaryOperatorSpec,
6    EdgeAnnoSearchSpec, EstimationType,
7};
8use crate::errors::Result;
9use crate::graph::{GraphStatistic, GraphStorage, Match};
10use crate::{AnnotationGraph, try_as_boxed_iter};
11use graphannis_core::{
12    graph::{ANNIS_NS, DEFAULT_ANNO_KEY, NODE_TYPE_KEY},
13    types::{Component, Edge, NodeID},
14};
15use itertools::Itertools;
16use std::collections::{HashSet, VecDeque};
17use std::iter::FromIterator;
18use std::sync::Arc;
19
20#[derive(Clone, Debug)]
21struct BaseEdgeOpSpec {
22    pub components: Vec<Component<AnnotationComponentType>>,
23    pub dist: RangeSpec,
24    pub edge_anno: Option<EdgeAnnoSearchSpec>,
25    pub is_reflexive: bool,
26    pub op_str: Option<String>,
27    pub inverse_operator_needs_cost_check: bool,
28}
29
30struct BaseEdgeOp {
31    gs: Vec<Arc<dyn GraphStorage>>,
32    spec: BaseEdgeOpSpec,
33    max_nodes_estimate: usize,
34    inverse: bool,
35}
36
37impl BaseEdgeOp {
38    pub fn new(db: &AnnotationGraph, spec: BaseEdgeOpSpec) -> Result<BaseEdgeOp> {
39        let mut gs: Vec<Arc<dyn GraphStorage>> = Vec::new();
40        for c in &spec.components {
41            let gs_for_component = db.get_graphstorage(c).ok_or_else(|| {
42                GraphAnnisError::ImpossibleSearch(format!("Component {} does not exist", &c))
43            })?;
44
45            gs.push(gs_for_component);
46        }
47
48        let max_nodes_estimate = calculate_max_node_estimate(db, &spec, &gs, false)?;
49        Ok(BaseEdgeOp {
50            gs,
51            spec,
52            max_nodes_estimate,
53            inverse: false,
54        })
55    }
56}
57
58fn calculate_max_node_estimate(
59    db: &AnnotationGraph,
60    spec: &BaseEdgeOpSpec,
61    gs: &[Arc<dyn GraphStorage>],
62    inverse: bool,
63) -> Result<usize> {
64    let all_components_are_partof = spec
65        .components
66        .iter()
67        .all(|c| c.get_type() == AnnotationComponentType::PartOf);
68    let max_nodes_estimate = if all_components_are_partof && gs.len() == 1 {
69        // PartOf components have a very skewed distribution of root nodes vs.
70        // the actual possible targets, thus do not use all nodes as population
71        // but only the non-roots. We can only use this formula for the actual
72        // @* operator, but not the inverted one.
73        if !inverse && let Some(stats) = gs[0].get_statistics() {
74            stats.nodes - stats.root_nodes
75        } else {
76            // Fallback to guessing how many nodes have the node type "corpus"
77            // or "datasource" and thus could be reachable as RHS in a worst case
78            // scenario. Since a node can't be part of itself, subtract 1 for
79            // the node on the LHS.
80            db.get_node_annos()
81                .guess_max_count(
82                    Some(&NODE_TYPE_KEY.ns),
83                    &NODE_TYPE_KEY.name,
84                    "corpus",
85                    "datasource",
86                )?
87                .saturating_sub(1)
88        }
89    } else {
90        db.get_node_annos().guess_max_count(
91            Some(&NODE_TYPE_KEY.ns),
92            &NODE_TYPE_KEY.name,
93            "node",
94            "node",
95        )?
96    };
97    Ok(max_nodes_estimate)
98}
99
100impl BinaryOperatorSpec for BaseEdgeOpSpec {
101    fn necessary_components(
102        &self,
103        _db: &AnnotationGraph,
104    ) -> HashSet<Component<AnnotationComponentType>> {
105        HashSet::from_iter(self.components.clone())
106    }
107
108    fn create_operator<'a>(
109        &self,
110        db: &'a AnnotationGraph,
111        _cost_estimate: Option<(&CostEstimate, &CostEstimate)>,
112    ) -> Result<BinaryOperator<'a>> {
113        let optional_op = BaseEdgeOp::new(db, self.clone());
114        optional_op.map(|op| BinaryOperator::Index(Box::new(op)))
115    }
116
117    fn get_edge_anno_spec(&self) -> Option<EdgeAnnoSearchSpec> {
118        self.edge_anno.clone()
119    }
120
121    #[cfg(test)]
122    fn into_any(self: Arc<Self>) -> Arc<dyn std::any::Any> {
123        self
124    }
125
126    #[cfg(test)]
127    fn any_ref(&self) -> &dyn std::any::Any {
128        self
129    }
130}
131
132fn check_edge_annotation(
133    edge_anno: &Option<EdgeAnnoSearchSpec>,
134    gs: &dyn GraphStorage,
135    source: NodeID,
136    target: NodeID,
137) -> Result<bool> {
138    match edge_anno {
139        Some(EdgeAnnoSearchSpec::ExactValue { ns, name, val }) => {
140            for a in gs
141                .get_anno_storage()
142                .get_annotations_for_item(&Edge { source, target })?
143            {
144                if name != &a.key.name {
145                    continue;
146                }
147                if let Some(template_ns) = ns
148                    && template_ns != &a.key.ns
149                {
150                    continue;
151                }
152                if let Some(template_val) = val
153                    && template_val != &*a.val
154                {
155                    continue;
156                }
157                // all checks passed, this edge has the correct annotation
158                return Ok(true);
159            }
160            Ok(false)
161        }
162        Some(EdgeAnnoSearchSpec::NotExactValue { ns, name, val }) => {
163            for a in gs
164                .get_anno_storage()
165                .get_annotations_for_item(&Edge { source, target })?
166            {
167                if name != &a.key.name {
168                    continue;
169                }
170                if let Some(template_ns) = ns
171                    && template_ns != &a.key.ns
172                {
173                    continue;
174                }
175                if val.as_str() == a.val.as_str() {
176                    continue;
177                }
178
179                // all checks passed, this edge has the correct annotation
180                return Ok(true);
181            }
182            Ok(false)
183        }
184        Some(EdgeAnnoSearchSpec::RegexValue { ns, name, val }) => {
185            let full_match_pattern = graphannis_core::util::regex_full_match(val);
186            let re = regex::Regex::new(&full_match_pattern);
187            if let Ok(re) = re {
188                for a in gs
189                    .get_anno_storage()
190                    .get_annotations_for_item(&Edge { source, target })?
191                {
192                    if name != &a.key.name {
193                        continue;
194                    }
195                    if let Some(template_ns) = ns
196                        && template_ns != &a.key.ns
197                    {
198                        continue;
199                    }
200
201                    if !re.is_match(&a.val) {
202                        continue;
203                    }
204
205                    // all checks passed, this edge has the correct annotation
206                    return Ok(true);
207                }
208            }
209            Ok(false)
210        }
211        Some(EdgeAnnoSearchSpec::NotRegexValue { ns, name, val }) => {
212            let full_match_pattern = graphannis_core::util::regex_full_match(val);
213            let re = regex::Regex::new(&full_match_pattern);
214            if let Ok(re) = re {
215                for a in gs
216                    .get_anno_storage()
217                    .get_annotations_for_item(&Edge { source, target })?
218                {
219                    if name != &a.key.name {
220                        continue;
221                    }
222                    if let Some(template_ns) = ns
223                        && template_ns != &a.key.ns
224                    {
225                        continue;
226                    }
227
228                    if re.is_match(&a.val) {
229                        continue;
230                    }
231
232                    // all checks passed, this edge has the correct annotation
233                    return Ok(true);
234                }
235            }
236            Ok(false)
237        }
238        None => Ok(true),
239    }
240}
241
242impl BaseEdgeOp {}
243
244impl std::fmt::Display for BaseEdgeOp {
245    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
246        let anno_frag = if let Some(ref edge_anno) = self.spec.edge_anno {
247            format!("[{}]", edge_anno)
248        } else {
249            String::from("")
250        };
251
252        if let Some(ref op_str) = self.spec.op_str {
253            if self.inverse {
254                write!(f, "{}\u{20D6}{}{}", op_str, self.spec.dist, anno_frag)
255            } else {
256                write!(f, "{}{}{}", op_str, self.spec.dist, anno_frag)
257            }
258        } else {
259            write!(f, "?")
260        }
261    }
262}
263
264impl BinaryOperatorBase for BaseEdgeOp {
265    fn filter_match(&self, lhs: &Match, rhs: &Match) -> Result<bool> {
266        for e in &self.gs {
267            if self.inverse {
268                if e.is_connected(
269                    rhs.node,
270                    lhs.node,
271                    self.spec.dist.min_dist(),
272                    self.spec.dist.max_dist(),
273                )? && check_edge_annotation(
274                    &self.spec.edge_anno,
275                    e.as_ref(),
276                    rhs.node,
277                    lhs.node,
278                )? {
279                    return Ok(true);
280                }
281            } else if e.is_connected(
282                lhs.node,
283                rhs.node,
284                self.spec.dist.min_dist(),
285                self.spec.dist.max_dist(),
286            )? && check_edge_annotation(
287                &self.spec.edge_anno,
288                e.as_ref(),
289                lhs.node,
290                rhs.node,
291            )? {
292                return Ok(true);
293            }
294        }
295        Ok(false)
296    }
297
298    fn is_reflexive(&self) -> bool {
299        self.spec.is_reflexive
300    }
301
302    fn get_inverse_operator<'a>(
303        &self,
304        graph: &'a AnnotationGraph,
305    ) -> Result<Option<BinaryOperator<'a>>> {
306        let inverse = !self.inverse;
307
308        // Check if all graph storages have the same inverse cost. If not, we
309        // don't provide an inverse operator, because the plans would not
310        // account for the different costs
311        for g in &self.gs {
312            if self.spec.inverse_operator_needs_cost_check && !g.inverse_has_same_cost() {
313                return Ok(None);
314            }
315            if let Some(stat) = g.get_statistics() {
316                // If input and output estimations are too different, also don't provide a more costly inverse operator
317                if stat.inverse_fan_out_99_percentile > stat.fan_out_99_percentile {
318                    return Ok(None);
319                }
320            }
321        }
322        let max_nodes_estimate = calculate_max_node_estimate(graph, &self.spec, &self.gs, inverse)?;
323        let edge_op = BaseEdgeOp {
324            gs: self.gs.clone(),
325            spec: self.spec.clone(),
326            max_nodes_estimate,
327            inverse,
328        };
329        Ok(Some(BinaryOperator::Index(Box::new(edge_op))))
330    }
331
332    fn estimation_type(&self) -> Result<EstimationType> {
333        if self.gs.is_empty() {
334            // will not find anything
335            return Ok(EstimationType::Selectivity(0.0));
336        }
337
338        let mut max_nodes: f64 = self.max_nodes_estimate as f64;
339        // Avoid division by 0
340        if max_nodes == 0.0 {
341            max_nodes = 1.0;
342        }
343
344        let mut worst_sel: f64 = 0.0;
345
346        for g in &self.gs {
347            let g: &Arc<dyn GraphStorage> = g;
348
349            let mut gs_selectivity = 0.01;
350
351            if let Some(stats) = g.get_statistics() {
352                let stats: &GraphStatistic = stats;
353                if stats.cyclic {
354                    // can get all other nodes
355                    return Ok(EstimationType::Selectivity(1.0));
356                }
357                // get number of nodes reachable from min to max distance
358                let max_dist = match self.spec.dist.max_dist() {
359                    std::ops::Bound::Unbounded => usize::MAX,
360                    std::ops::Bound::Included(max_dist) => max_dist,
361                    std::ops::Bound::Excluded(max_dist) => max_dist - 1,
362                };
363                let max_path_length = std::cmp::min(max_dist, stats.max_depth) as i32;
364                let min_path_length = std::cmp::max(0, self.spec.dist.min_dist() - 1) as i32;
365
366                if stats.avg_fan_out > 1.0 {
367                    // Assume two complete k-ary trees (with the average fan-out
368                    // as k) as defined in "Thomas Cormen: Introduction to
369                    // algorithms (2009), page 1179) with the maximum and
370                    // minimum height. Calculate the number of nodes for both
371                    // complete trees and subtract them to get an estimation of
372                    // the number of nodes that fullfull the path length
373                    // criteria.
374                    let k = stats.avg_fan_out;
375
376                    let reachable_max: f64 = ((k.powi(max_path_length) - 1.0) / (k - 1.0)).ceil();
377                    let reachable_min: f64 = ((k.powi(min_path_length) - 1.0) / (k - 1.0)).ceil();
378
379                    let reachable = reachable_max - reachable_min;
380
381                    gs_selectivity = reachable / max_nodes;
382                } else {
383                    // We can't use the formula for complete k-ary trees because
384                    // we can't divide by zero and don't want negative numbers.
385                    // Use the simplified estimation with multiplication
386                    // instead.
387                    let reachable_max: f64 =
388                        (stats.avg_fan_out * f64::from(max_path_length)).ceil();
389                    let reachable_min: f64 =
390                        (stats.avg_fan_out * f64::from(min_path_length)).ceil();
391
392                    gs_selectivity = (reachable_max - reachable_min) / max_nodes;
393                }
394            }
395
396            if worst_sel < gs_selectivity {
397                worst_sel = gs_selectivity;
398            }
399        } // end for
400
401        Ok(EstimationType::Selectivity(worst_sel))
402    }
403
404    fn edge_anno_selectivity(&self) -> Result<Option<f64>> {
405        if let Some(ref edge_anno) = self.spec.edge_anno {
406            let mut worst_sel = 0.0;
407            for g in &self.gs {
408                let g: &Arc<dyn GraphStorage> = g;
409                let anno_storage = g.get_anno_storage();
410                let num_of_annos = anno_storage.number_of_annotations()?;
411                if num_of_annos == 0 {
412                    // we won't be able to find anything if there are no
413                    // annotations
414                    return Ok(Some(0.0));
415                } else {
416                    let guessed_count = match edge_anno {
417                        EdgeAnnoSearchSpec::ExactValue { val, ns, name } => {
418                            if let Some(val) = val {
419                                anno_storage.guess_max_count(
420                                    ns.as_ref().map(String::as_str),
421                                    name,
422                                    val,
423                                    val,
424                                )?
425                            } else {
426                                anno_storage.number_of_annotations_by_name(
427                                    ns.as_ref().map(String::as_str),
428                                    name,
429                                )?
430                            }
431                        }
432                        EdgeAnnoSearchSpec::NotExactValue { val, ns, name } => {
433                            let total = anno_storage.number_of_annotations_by_name(
434                                ns.as_ref().map(String::as_str),
435                                name,
436                            )?;
437                            total
438                                - anno_storage.guess_max_count(
439                                    ns.as_ref().map(String::as_str),
440                                    name,
441                                    val,
442                                    val,
443                                )?
444                        }
445                        EdgeAnnoSearchSpec::RegexValue { val, ns, name } => anno_storage
446                            .guess_max_count_regex(ns.as_ref().map(String::as_str), name, val)?,
447                        EdgeAnnoSearchSpec::NotRegexValue { val, ns, name } => {
448                            let total = anno_storage.number_of_annotations_by_name(
449                                ns.as_ref().map(String::as_str),
450                                name,
451                            )?;
452                            total
453                                - anno_storage.guess_max_count_regex(
454                                    ns.as_ref().map(String::as_str),
455                                    name,
456                                    val,
457                                )?
458                        }
459                    };
460                    let g_sel: f64 = (guessed_count as f64) / (num_of_annos as f64);
461                    if g_sel > worst_sel {
462                        worst_sel = g_sel;
463                    }
464                }
465            }
466            Ok(Some(worst_sel))
467        } else {
468            Ok(Some(1.0))
469        }
470    }
471}
472
473impl BinaryOperatorIndex for BaseEdgeOp {
474    fn retrieve_matches(&self, lhs: &Match) -> Box<dyn Iterator<Item = Result<Match>>> {
475        let lhs = lhs.clone();
476        let spec = self.spec.clone();
477
478        if self.gs.len() == 1 {
479            // directly return all matched nodes since when having only one component
480            // no duplicates are possible
481            let result: Result<VecDeque<_>> = if self.inverse {
482                self.gs[0]
483                    .find_connected_inverse(lhs.node, spec.dist.min_dist(), spec.dist.max_dist())
484                    .fuse()
485                    .map(move |candidate| {
486                        let candidate = candidate?;
487                        let has_annotation = check_edge_annotation(
488                            &self.spec.edge_anno,
489                            self.gs[0].as_ref(),
490                            candidate,
491                            lhs.node,
492                        )?;
493                        Ok((candidate, has_annotation))
494                    })
495                    .filter_map_ok(move |(n, has_annotation)| {
496                        if has_annotation {
497                            Some(Match {
498                                node: n,
499                                anno_key: DEFAULT_ANNO_KEY.clone(),
500                            })
501                        } else {
502                            None
503                        }
504                    })
505                    .collect()
506            } else {
507                self.gs[0]
508                    .find_connected(lhs.node, spec.dist.min_dist(), spec.dist.max_dist())
509                    .fuse()
510                    .map(move |candidate| {
511                        let candidate = candidate?;
512                        let has_annotation = check_edge_annotation(
513                            &self.spec.edge_anno,
514                            self.gs[0].as_ref(),
515                            lhs.node,
516                            candidate,
517                        )?;
518                        Ok((candidate, has_annotation))
519                    })
520                    .filter_map_ok(move |(n, has_annotation)| {
521                        if has_annotation {
522                            Some(Match {
523                                node: n,
524                                anno_key: DEFAULT_ANNO_KEY.clone(),
525                            })
526                        } else {
527                            None
528                        }
529                    })
530                    .collect()
531            };
532            let result = try_as_boxed_iter!(result);
533            Box::new(result.into_iter().map(Ok))
534        } else {
535            let all: Result<Vec<_>> = if self.inverse {
536                self.gs
537                    .iter()
538                    .flat_map(move |e| {
539                        let lhs = lhs.clone();
540
541                        e.as_ref()
542                            .find_connected_inverse(
543                                lhs.node,
544                                spec.dist.min_dist(),
545                                spec.dist.max_dist(),
546                            )
547                            .fuse()
548                            .map(move |candidate| {
549                                let candidate = candidate?;
550                                let has_annotation = check_edge_annotation(
551                                    &self.spec.edge_anno,
552                                    e.as_ref(),
553                                    candidate,
554                                    lhs.node,
555                                )?;
556                                Ok((candidate, has_annotation))
557                            })
558                            .filter_map_ok(move |(n, has_annotation)| {
559                                if has_annotation {
560                                    Some(Match {
561                                        node: n,
562                                        anno_key: DEFAULT_ANNO_KEY.clone(),
563                                    })
564                                } else {
565                                    None
566                                }
567                            })
568                    })
569                    .collect()
570            } else {
571                self.gs
572                    .iter()
573                    .flat_map(move |e| {
574                        let lhs = lhs.clone();
575
576                        e.as_ref()
577                            .find_connected(lhs.node, spec.dist.min_dist(), spec.dist.max_dist())
578                            .fuse()
579                            .map(move |candidate| {
580                                let candidate = candidate?;
581                                let has_annotation = check_edge_annotation(
582                                    &self.spec.edge_anno,
583                                    e.as_ref(),
584                                    lhs.node,
585                                    candidate,
586                                )?;
587                                Ok((candidate, has_annotation))
588                            })
589                            .filter_map_ok(move |(n, has_annotation)| {
590                                if has_annotation {
591                                    Some(Match {
592                                        node: n,
593                                        anno_key: DEFAULT_ANNO_KEY.clone(),
594                                    })
595                                } else {
596                                    None
597                                }
598                            })
599                    })
600                    .collect()
601            };
602            let mut all = try_as_boxed_iter!(all);
603            all.sort_unstable();
604            all.dedup();
605            Box::new(all.into_iter().map(Ok))
606        }
607    }
608
609    fn as_binary_operator(&self) -> &dyn BinaryOperatorBase {
610        self
611    }
612}
613
614#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
615pub struct DominanceSpec {
616    pub name: String,
617    pub dist: RangeSpec,
618    pub edge_anno: Option<EdgeAnnoSearchSpec>,
619}
620
621impl BinaryOperatorSpec for DominanceSpec {
622    fn necessary_components(
623        &self,
624        db: &AnnotationGraph,
625    ) -> HashSet<Component<AnnotationComponentType>> {
626        HashSet::from_iter(
627            db.get_all_components(Some(AnnotationComponentType::Dominance), Some(&self.name)),
628        )
629    }
630
631    fn create_operator<'a>(
632        &self,
633        db: &'a AnnotationGraph,
634        cost_estimate: Option<(&CostEstimate, &CostEstimate)>,
635    ) -> Result<BinaryOperator<'a>> {
636        let components =
637            db.get_all_components(Some(AnnotationComponentType::Dominance), Some(&self.name));
638        let op_str = if self.name.is_empty() {
639            String::from(">")
640        } else {
641            format!(">{} ", &self.name)
642        };
643        let base = BaseEdgeOpSpec {
644            op_str: Some(op_str),
645            components,
646            dist: self.dist.clone(),
647            edge_anno: self.edge_anno.clone(),
648            is_reflexive: true,
649            inverse_operator_needs_cost_check: true,
650        };
651        base.create_operator(db, cost_estimate)
652    }
653
654    #[cfg(test)]
655    fn into_any(self: Arc<Self>) -> Arc<dyn std::any::Any> {
656        self
657    }
658
659    #[cfg(test)]
660    fn any_ref(&self) -> &dyn std::any::Any {
661        self
662    }
663}
664
665#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
666pub struct PointingSpec {
667    pub name: String,
668    pub dist: RangeSpec,
669    pub edge_anno: Option<EdgeAnnoSearchSpec>,
670}
671
672impl BinaryOperatorSpec for PointingSpec {
673    fn necessary_components(
674        &self,
675        db: &AnnotationGraph,
676    ) -> HashSet<Component<AnnotationComponentType>> {
677        HashSet::from_iter(
678            db.get_all_components(Some(AnnotationComponentType::Pointing), Some(&self.name)),
679        )
680    }
681
682    fn create_operator<'a>(
683        &self,
684        db: &'a AnnotationGraph,
685        cost_estimate: Option<(&CostEstimate, &CostEstimate)>,
686    ) -> Result<BinaryOperator<'a>> {
687        let components =
688            db.get_all_components(Some(AnnotationComponentType::Pointing), Some(&self.name));
689        let op_str = if self.name.is_empty() {
690            String::from("->")
691        } else {
692            format!("->{} ", self.name)
693        };
694
695        let base = BaseEdgeOpSpec {
696            components,
697            dist: self.dist.clone(),
698            edge_anno: self.edge_anno.clone(),
699            is_reflexive: true,
700            op_str: Some(op_str),
701            inverse_operator_needs_cost_check: true,
702        };
703        base.create_operator(db, cost_estimate)
704    }
705
706    #[cfg(test)]
707    fn into_any(self: Arc<Self>) -> Arc<dyn std::any::Any> {
708        self
709    }
710
711    #[cfg(test)]
712    fn any_ref(&self) -> &dyn std::any::Any {
713        self
714    }
715}
716
717#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
718pub struct PartOfSubCorpusSpec {
719    pub dist: RangeSpec,
720}
721
722impl BinaryOperatorSpec for PartOfSubCorpusSpec {
723    fn necessary_components(
724        &self,
725        _db: &AnnotationGraph,
726    ) -> HashSet<Component<AnnotationComponentType>> {
727        let mut components = HashSet::default();
728        components.insert(Component::new(
729            AnnotationComponentType::PartOf,
730            ANNIS_NS.into(),
731            "".into(),
732        ));
733        components
734    }
735
736    fn create_operator<'a>(
737        &self,
738        db: &'a AnnotationGraph,
739        cost_estimate: Option<(&CostEstimate, &CostEstimate)>,
740    ) -> Result<BinaryOperator<'a>> {
741        let components = vec![Component::new(
742            AnnotationComponentType::PartOf,
743            ANNIS_NS.into(),
744            "".into(),
745        )];
746        let inverse_operator_needs_cost_check = if let Some((_, rhs)) = cost_estimate {
747            // Only ignore different cost and risk a nested loop join if the RHS
748            // has an estimated output size of 1 and thus a nested loop is not
749            // as costly.
750            rhs.output > 1
751        } else {
752            true
753        };
754        let base = BaseEdgeOpSpec {
755            op_str: Some(String::from("@")),
756            components,
757            dist: self.dist.clone(),
758            edge_anno: None,
759            is_reflexive: false,
760            inverse_operator_needs_cost_check,
761        };
762
763        base.create_operator(db, cost_estimate)
764    }
765
766    #[cfg(test)]
767    fn into_any(self: Arc<Self>) -> Arc<dyn std::any::Any> {
768        self
769    }
770
771    #[cfg(test)]
772    fn any_ref(&self) -> &dyn std::any::Any {
773        self
774    }
775}
776
777#[cfg(test)]
778mod tests;