llkv_plan/
plan_graph.rs

1//! Planner graph intermediate representation (IR).
2//!
3//! This module defines a directed acyclic graph (DAG) description that the
4//! query planner can emit after it has applied logical and physical
5//! optimizations. The IR is designed to serve multiple audiences:
6//!
7//! * **Executors** can materialize runtime operators from the in-memory
8//!   [`PlanGraph`] structure.
9//! * **Tests and tooling** can capture deterministic [`serde_json`] or DOT
10//!   artifacts produced via [`PlanGraph::to_json`] and [`PlanGraph::to_dot`].
11//! * **Developers** gain visibility into planner decisions (chosen indices,
12//!   predicates, costing) that can be rendered with Graphviz or other
13//!   visualizers.
14//!
15//! The representation concentrates on a few key goals:
16//!
17//! * Deterministic ordering for reproducible snapshot tests.
18//! * Explicit metadata for schema, expressions, and physical properties.
19//! * Validation helpers that guarantee the structure is a proper DAG with
20//!   internally consistent edges.
21//!
22//! The `PlanGraph` intentionally avoids referencing heavy Arrow or execution
23//! types directly; instead, it records human-readable summaries (for example
24//! `DataType` names) so that serialization stays lightweight.
25
26use std::collections::{BTreeMap, BTreeSet, VecDeque};
27use std::fmt;
28
29use serde::{Deserialize, Serialize};
30use thiserror::Error;
31
32/// Semantic version identifier for the plan graph payload.
33#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
34pub struct PlanGraphVersion {
35    pub major: u16,
36    pub minor: u16,
37}
38
39impl PlanGraphVersion {
40    pub const fn new(major: u16, minor: u16) -> Self {
41        Self { major, minor }
42    }
43}
44
45impl fmt::Display for PlanGraphVersion {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        write!(f, "{}.{:02}", self.major, self.minor)
48    }
49}
50
51/// Current version of the planner IR.
52pub const PLAN_GRAPH_VERSION: PlanGraphVersion = PlanGraphVersion::new(0, 1);
53
54/// Unique identifier for a planner node.
55#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
56#[serde(transparent)]
57pub struct PlanNodeId(pub u32);
58
59impl PlanNodeId {
60    pub const fn new(value: u32) -> Self {
61        Self(value)
62    }
63
64    #[inline]
65    pub const fn as_u32(self) -> u32 {
66        self.0
67    }
68}
69
70impl fmt::Display for PlanNodeId {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        write!(f, "n{}", self.0)
73    }
74}
75
76/// Planner node operator kind.
77#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
78#[serde(rename_all = "snake_case")]
79pub enum PlanOperator {
80    TableScan,
81    IndexScan,
82    Filter,
83    Project,
84    Aggregate,
85    Sort,
86    Limit,
87    TopK,
88    HashJoin,
89    NestedLoopJoin,
90    MergeJoin,
91    Window,
92    Union,
93    Intersect,
94    Difference,
95    Values,
96    Materialize,
97    Output,
98    Explain,
99}
100
101impl fmt::Display for PlanOperator {
102    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103        write!(f, "{:?}", self)
104    }
105}
106
107/// Column description carried with each node's output schema.
108#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
109pub struct PlanField {
110    pub name: String,
111    pub data_type: String,
112    #[serde(default)]
113    pub nullable: bool,
114}
115
116impl PlanField {
117    pub fn new<N: Into<String>, T: Into<String>>(name: N, data_type: T) -> Self {
118        Self {
119            name: name.into(),
120            data_type: data_type.into(),
121            nullable: true,
122        }
123    }
124
125    pub fn with_nullability(mut self, nullable: bool) -> Self {
126        self.nullable = nullable;
127        self
128    }
129}
130
131/// Expression or projection annotations associated with a node.
132#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
133pub struct PlanExpression {
134    pub display: String,
135    #[serde(default, skip_serializing_if = "Option::is_none")]
136    pub fingerprint: Option<String>,
137    #[serde(default, skip_serializing_if = "Option::is_none")]
138    pub slot: Option<String>,
139}
140
141impl PlanExpression {
142    pub fn new<S: Into<String>>(display: S) -> Self {
143        Self {
144            display: display.into(),
145            fingerprint: None,
146            slot: None,
147        }
148    }
149
150    pub fn with_fingerprint<S: Into<String>>(mut self, fingerprint: S) -> Self {
151        self.fingerprint = Some(fingerprint.into());
152        self
153    }
154
155    pub fn with_slot<S: Into<String>>(mut self, slot: S) -> Self {
156        self.slot = Some(slot.into());
157        self
158    }
159}
160
161/// Free-form metadata map for nodes.
162#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
163pub struct PlanNodeMetadata {
164    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
165    pub properties: BTreeMap<String, String>,
166}
167
168impl PlanNodeMetadata {
169    #[inline]
170    pub fn is_empty(&self) -> bool {
171        self.properties.is_empty()
172    }
173
174    pub fn insert<K: Into<String>, V: Into<String>>(&mut self, key: K, value: V) {
175        self.properties.insert(key.into(), value.into());
176    }
177}
178
179/// Metadata associated with plan edges or inputs.
180#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
181pub struct PlanEdgeMetadata {
182    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
183    pub properties: BTreeMap<String, String>,
184}
185
186impl PlanEdgeMetadata {
187    #[inline]
188    pub fn is_empty(&self) -> bool {
189        self.properties.is_empty()
190    }
191
192    pub fn insert<K: Into<String>, V: Into<String>>(&mut self, key: K, value: V) {
193        self.properties.insert(key.into(), value.into());
194    }
195}
196
197/// Incoming connection for a node.
198#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
199pub struct PlanInput {
200    pub source: PlanNodeId,
201    #[serde(default, skip_serializing_if = "Option::is_none")]
202    pub label: Option<String>,
203    #[serde(default, skip_serializing_if = "PlanEdgeMetadata::is_empty")]
204    pub metadata: PlanEdgeMetadata,
205}
206
207impl PlanInput {
208    pub fn new(source: PlanNodeId) -> Self {
209        Self {
210            source,
211            label: None,
212            metadata: PlanEdgeMetadata::default(),
213        }
214    }
215
216    pub fn with_label<S: Into<String>>(mut self, label: S) -> Self {
217        self.label = Some(label.into());
218        self
219    }
220
221    pub fn with_metadata(mut self, metadata: PlanEdgeMetadata) -> Self {
222        self.metadata = metadata;
223        self
224    }
225}
226
227/// Edge between two planner nodes.
228#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
229pub struct PlanEdge {
230    pub from: PlanNodeId,
231    pub to: PlanNodeId,
232    #[serde(default, skip_serializing_if = "Option::is_none")]
233    pub label: Option<String>,
234    #[serde(default, skip_serializing_if = "PlanEdgeMetadata::is_empty")]
235    pub metadata: PlanEdgeMetadata,
236}
237
238impl PlanEdge {
239    pub fn new(from: PlanNodeId, to: PlanNodeId) -> Self {
240        Self {
241            from,
242            to,
243            label: None,
244            metadata: PlanEdgeMetadata::default(),
245        }
246    }
247
248    pub fn with_label<S: Into<String>>(mut self, label: S) -> Self {
249        self.label = Some(label.into());
250        self
251    }
252
253    pub fn with_metadata(mut self, metadata: PlanEdgeMetadata) -> Self {
254        self.metadata = metadata;
255        self
256    }
257}
258
259/// Aggregate annotations that apply to the whole plan.
260#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
261pub struct PlanAnnotations {
262    #[serde(default, skip_serializing_if = "Option::is_none")]
263    pub logical_plan_fingerprint: Option<String>,
264    #[serde(default, skip_serializing_if = "Option::is_none")]
265    pub description: Option<String>,
266    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
267    pub properties: BTreeMap<String, String>,
268}
269
270impl PlanAnnotations {
271    #[inline]
272    pub fn is_empty(&self) -> bool {
273        self.logical_plan_fingerprint.is_none()
274            && self.description.is_none()
275            && self.properties.is_empty()
276    }
277}
278
279/// Planner node with annotations used to describe how operators compose.
280#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
281pub struct PlanNode {
282    pub id: PlanNodeId,
283    pub kind: PlanOperator,
284    #[serde(default, skip_serializing_if = "Vec::is_empty")]
285    pub inputs: Vec<PlanInput>,
286    #[serde(default, skip_serializing_if = "Vec::is_empty")]
287    pub outputs: Vec<PlanNodeId>,
288    #[serde(default, skip_serializing_if = "Vec::is_empty")]
289    pub schema: Vec<PlanField>,
290    #[serde(default, skip_serializing_if = "Vec::is_empty")]
291    pub predicates: Vec<PlanExpression>,
292    #[serde(default, skip_serializing_if = "Vec::is_empty")]
293    pub projections: Vec<PlanExpression>,
294    #[serde(default, skip_serializing_if = "Option::is_none")]
295    pub cost: Option<f64>,
296    #[serde(default, skip_serializing_if = "Option::is_none")]
297    pub cardinality: Option<u64>,
298    #[serde(default, skip_serializing_if = "Option::is_none")]
299    pub chosen_index: Option<String>,
300    #[serde(default, skip_serializing_if = "PlanNodeMetadata::is_empty")]
301    pub metadata: PlanNodeMetadata,
302}
303
304impl PlanNode {
305    pub fn new(id: PlanNodeId, kind: PlanOperator) -> Self {
306        Self {
307            id,
308            kind,
309            inputs: Vec::new(),
310            outputs: Vec::new(),
311            schema: Vec::new(),
312            predicates: Vec::new(),
313            projections: Vec::new(),
314            cost: None,
315            cardinality: None,
316            chosen_index: None,
317            metadata: PlanNodeMetadata::default(),
318        }
319    }
320
321    pub fn add_field(&mut self, field: PlanField) {
322        self.schema.push(field);
323    }
324
325    pub fn add_predicate(&mut self, expr: PlanExpression) {
326        self.predicates.push(expr);
327    }
328
329    pub fn add_projection(&mut self, expr: PlanExpression) {
330        self.projections.push(expr);
331    }
332}
333
334/// Errors raised while constructing or validating a plan graph.
335#[derive(Error, Debug)]
336pub enum PlanGraphError {
337    #[error("duplicate node id {0}")]
338    DuplicateNode(PlanNodeId),
339    #[error("edge references missing node {0}")]
340    MissingNode(PlanNodeId),
341    #[error("edge creates self-loop on node {0}")]
342    SelfLoop(PlanNodeId),
343    #[error("duplicate edge {from:?} -> {to:?}")]
344    DuplicateEdge { from: PlanNodeId, to: PlanNodeId },
345    #[error("node {0} already contains wired edges; use the builder to manage connections")]
346    NodeAlreadyConnected(PlanNodeId),
347    #[error("root node {0} not present in graph")]
348    UnknownRoot(PlanNodeId),
349    #[error("root node {0} receives inputs")]
350    RootHasInputs(PlanNodeId),
351    #[error("node {node} inputs are inconsistent with edge set")]
352    InputsDoNotMatch { node: PlanNodeId },
353    #[error("node {node} outputs are inconsistent with edge set")]
354    OutputsDoNotMatch { node: PlanNodeId },
355    #[error("cycle detected in plan graph")]
356    CycleDetected,
357    #[error("serialization error: {0}")]
358    Serde(#[from] serde_json::Error),
359}
360
361pub type PlanGraphResult<T> = Result<T, PlanGraphError>;
362
363/// Immutable DAG describing the planner output.
364#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
365pub struct PlanGraph {
366    pub version: PlanGraphVersion,
367    pub nodes: Vec<PlanNode>,
368    #[serde(default, skip_serializing_if = "Vec::is_empty")]
369    pub edges: Vec<PlanEdge>,
370    #[serde(default, skip_serializing_if = "Vec::is_empty")]
371    pub root_nodes: Vec<PlanNodeId>,
372    #[serde(default, skip_serializing_if = "PlanAnnotations::is_empty")]
373    pub annotations: PlanAnnotations,
374}
375
376impl PlanGraph {
377    /// Construct a new [`PlanGraphBuilder`].
378    pub fn builder() -> PlanGraphBuilder {
379        PlanGraphBuilder::default()
380    }
381
382    pub fn is_empty(&self) -> bool {
383        self.nodes.is_empty()
384    }
385
386    pub fn version(&self) -> PlanGraphVersion {
387        self.version
388    }
389
390    pub fn root_nodes(&self) -> &[PlanNodeId] {
391        &self.root_nodes
392    }
393
394    pub fn validate(&self) -> PlanGraphResult<()> {
395        let mut index_by_id: BTreeMap<PlanNodeId, usize> = BTreeMap::new();
396        for (idx, node) in self.nodes.iter().enumerate() {
397            if index_by_id.insert(node.id, idx).is_some() {
398                return Err(PlanGraphError::DuplicateNode(node.id));
399            }
400        }
401
402        let mut indegree = vec![0usize; self.nodes.len()];
403        let mut outgoing: Vec<Vec<usize>> = vec![Vec::new(); self.nodes.len()];
404        let mut seen_edges: BTreeSet<(PlanNodeId, PlanNodeId, Option<String>)> = BTreeSet::new();
405
406        for edge in &self.edges {
407            if edge.from == edge.to {
408                return Err(PlanGraphError::SelfLoop(edge.from));
409            }
410            let from_idx = index_by_id
411                .get(&edge.from)
412                .copied()
413                .ok_or(PlanGraphError::MissingNode(edge.from))?;
414            let to_idx = index_by_id
415                .get(&edge.to)
416                .copied()
417                .ok_or(PlanGraphError::MissingNode(edge.to))?;
418
419            let key = (edge.from, edge.to, edge.label.clone());
420            if !seen_edges.insert(key) {
421                return Err(PlanGraphError::DuplicateEdge {
422                    from: edge.from,
423                    to: edge.to,
424                });
425            }
426
427            indegree[to_idx] += 1;
428            outgoing[from_idx].push(to_idx);
429        }
430
431        // Validate inputs and outputs line up with edges.
432        #[allow(clippy::type_complexity)]
433        // NOTE: Map values capture edge metadata tuples; restructure when plan graph API stabilizes.
434        let mut expected_inputs: BTreeMap<
435            PlanNodeId,
436            Vec<(PlanNodeId, Option<String>, Vec<(String, String)>)>,
437        > = BTreeMap::new();
438        let mut expected_outputs: BTreeMap<PlanNodeId, BTreeSet<PlanNodeId>> = BTreeMap::new();
439
440        for edge in &self.edges {
441            let metadata_vec = metadata_as_vec(&edge.metadata);
442            expected_inputs.entry(edge.to).or_default().push((
443                edge.from,
444                edge.label.clone(),
445                metadata_vec.clone(),
446            ));
447            expected_outputs
448                .entry(edge.from)
449                .or_default()
450                .insert(edge.to);
451        }
452
453        for node in &self.nodes {
454            #[allow(clippy::type_complexity)]
455            // NOTE: Preserves rich edge metadata while validating topologies.
456            let mut actual_inputs: Vec<(
457                PlanNodeId,
458                Option<String>,
459                Vec<(String, String)>,
460            )> = node
461                .inputs
462                .iter()
463                .map(|input| {
464                    (
465                        input.source,
466                        input.label.clone(),
467                        metadata_as_vec(&input.metadata),
468                    )
469                })
470                .collect();
471            let mut expected = expected_inputs.remove(&node.id).unwrap_or_default();
472            actual_inputs.sort();
473            expected.sort();
474            if actual_inputs != expected {
475                return Err(PlanGraphError::InputsDoNotMatch { node: node.id });
476            }
477
478            let mut actual_outputs: BTreeSet<PlanNodeId> = node.outputs.iter().copied().collect();
479            let expected_outputs = expected_outputs.remove(&node.id).unwrap_or_default();
480            if actual_outputs != expected_outputs {
481                return Err(PlanGraphError::OutputsDoNotMatch { node: node.id });
482            }
483
484            actual_outputs.clear();
485        }
486
487        // Remaining expected inputs implies an edge pointed to a missing node entry.
488        if let Some((node, _)) = expected_inputs.iter().next() {
489            return Err(PlanGraphError::MissingNode(*node));
490        }
491
492        // Validate roots.
493        let mut root_set: BTreeSet<PlanNodeId> = BTreeSet::new();
494        for root in &self.root_nodes {
495            if !root_set.insert(*root) {
496                return Err(PlanGraphError::DuplicateNode(*root));
497            }
498            if !index_by_id.contains_key(root) {
499                return Err(PlanGraphError::UnknownRoot(*root));
500            }
501        }
502        for root in &self.root_nodes {
503            let idx = index_by_id[root];
504            if indegree[idx] > 0 {
505                return Err(PlanGraphError::RootHasInputs(*root));
506            }
507        }
508
509        // Cycle detection via Kahn's algorithm.
510        let mut queue: VecDeque<usize> = indegree
511            .iter()
512            .enumerate()
513            .filter_map(|(idx, &deg)| if deg == 0 { Some(idx) } else { None })
514            .collect();
515        let mut visited = 0usize;
516        let mut indegree_mut = indegree.clone();
517        while let Some(node_idx) = queue.pop_front() {
518            visited += 1;
519            for &child in &outgoing[node_idx] {
520                indegree_mut[child] -= 1;
521                if indegree_mut[child] == 0 {
522                    queue.push_back(child);
523                }
524            }
525        }
526
527        if visited != self.nodes.len() {
528            return Err(PlanGraphError::CycleDetected);
529        }
530
531        Ok(())
532    }
533
534    pub fn to_dot(&self) -> PlanGraphResult<String> {
535        self.validate()?;
536
537        let mut dot = String::new();
538        dot.push_str("digraph PlanGraph {\n");
539        dot.push_str("  graph [rankdir=LR];\n");
540        dot.push_str("  node [shape=box, fontname=\"Helvetica\"];\n");
541
542        let root_set: BTreeSet<PlanNodeId> = self.root_nodes.iter().copied().collect();
543        for node in &self.nodes {
544            let shape_attr = if root_set.contains(&node.id) {
545                "shape=doublecircle, style=bold"
546            } else {
547                "shape=box"
548            };
549            let label = escape_dot_label(&build_node_label(node));
550            dot.push_str(&format!(
551                "  \"{id}\" [{shape} label=\"{label}\"];\n",
552                id = node.id,
553                shape = shape_attr,
554                label = label
555            ));
556        }
557
558        for edge in &self.edges {
559            let label = edge
560                .label
561                .as_ref()
562                .map(|label| escape_dot_label(label))
563                .or_else(|| {
564                    if edge.metadata.is_empty() {
565                        None
566                    } else {
567                        Some(escape_dot_label(&format_edge_metadata(&edge.metadata)))
568                    }
569                });
570            match label {
571                Some(text) => {
572                    dot.push_str(&format!(
573                        "  \"{from}\" -> \"{to}\" [label=\"{label}\"];\n",
574                        from = edge.from,
575                        to = edge.to,
576                        label = text
577                    ));
578                }
579                None => {
580                    dot.push_str(&format!(
581                        "  \"{from}\" -> \"{to}\";\n",
582                        from = edge.from,
583                        to = edge.to
584                    ));
585                }
586            }
587        }
588
589        dot.push('}');
590        Ok(dot)
591    }
592
593    pub fn to_json(&self) -> PlanGraphResult<String> {
594        self.validate()?;
595        let json = serde_json::to_string_pretty(self)?;
596        Ok(json)
597    }
598
599    pub fn from_json(json: &str) -> PlanGraphResult<Self> {
600        let mut graph: PlanGraph = serde_json::from_str(json)?;
601        graph.normalize();
602        graph.validate()?;
603        Ok(graph)
604    }
605
606    pub fn topological_order(&self) -> PlanGraphResult<Vec<PlanNodeId>> {
607        self.validate()?;
608
609        let mut index_by_id: BTreeMap<PlanNodeId, usize> = BTreeMap::new();
610        for (idx, node) in self.nodes.iter().enumerate() {
611            index_by_id.insert(node.id, idx);
612        }
613
614        let mut indegree = vec![0usize; self.nodes.len()];
615        let mut outgoing: Vec<Vec<usize>> = vec![Vec::new(); self.nodes.len()];
616        for edge in &self.edges {
617            let from_idx = index_by_id[&edge.from];
618            let to_idx = index_by_id[&edge.to];
619            indegree[to_idx] += 1;
620            outgoing[from_idx].push(to_idx);
621        }
622
623        let mut queue: VecDeque<usize> = indegree
624            .iter()
625            .enumerate()
626            .filter_map(|(idx, &deg)| if deg == 0 { Some(idx) } else { None })
627            .collect();
628        let mut order = Vec::with_capacity(self.nodes.len());
629        let mut indegree_mut = indegree;
630        while let Some(idx) = queue.pop_front() {
631            order.push(self.nodes[idx].id);
632            for &child in &outgoing[idx] {
633                indegree_mut[child] -= 1;
634                if indegree_mut[child] == 0 {
635                    queue.push_back(child);
636                }
637            }
638        }
639
640        if order.len() != self.nodes.len() {
641            return Err(PlanGraphError::CycleDetected);
642        }
643
644        Ok(order)
645    }
646
647    fn normalize(&mut self) {
648        self.nodes.sort_by(|a, b| a.id.cmp(&b.id));
649        for node in &mut self.nodes {
650            node.inputs.sort_by(|left, right| {
651                left.source
652                    .cmp(&right.source)
653                    .then(left.label.cmp(&right.label))
654                    .then_with(|| {
655                        metadata_as_vec(&left.metadata).cmp(&metadata_as_vec(&right.metadata))
656                    })
657            });
658            node.outputs.sort();
659        }
660        self.edges.sort_by(|a, b| {
661            a.from
662                .cmp(&b.from)
663                .then(a.to.cmp(&b.to))
664                .then(a.label.cmp(&b.label))
665        });
666        self.root_nodes.sort();
667    }
668}
669
670/// Builder for `PlanGraph` that enforces DAG invariants while allowing
671/// incremental construction.
672pub struct PlanGraphBuilder {
673    version: PlanGraphVersion,
674    nodes: BTreeMap<PlanNodeId, PlanNode>,
675    edges: BTreeMap<(PlanNodeId, PlanNodeId, Option<String>), PlanEdge>,
676    roots: BTreeSet<PlanNodeId>,
677    annotations: PlanAnnotations,
678}
679
680impl PlanGraphBuilder {
681    pub fn new() -> Self {
682        Self::default()
683    }
684
685    pub fn with_version(version: PlanGraphVersion) -> Self {
686        Self {
687            version,
688            ..Self::default()
689        }
690    }
691
692    pub fn add_node(&mut self, node: PlanNode) -> PlanGraphResult<()> {
693        let node_id = node.id;
694        if !node.inputs.is_empty() || !node.outputs.is_empty() {
695            return Err(PlanGraphError::NodeAlreadyConnected(node_id));
696        }
697        if self.nodes.insert(node_id, node).is_some() {
698            return Err(PlanGraphError::DuplicateNode(node_id));
699        }
700        Ok(())
701    }
702
703    pub fn add_edge(&mut self, edge: PlanEdge) -> PlanGraphResult<()> {
704        if edge.from == edge.to {
705            return Err(PlanGraphError::SelfLoop(edge.from));
706        }
707        let key = (edge.from, edge.to, edge.label.clone());
708        if self.edges.contains_key(&key) {
709            return Err(PlanGraphError::DuplicateEdge {
710                from: edge.from,
711                to: edge.to,
712            });
713        }
714
715        let target = self
716            .nodes
717            .get_mut(&edge.to)
718            .ok_or(PlanGraphError::MissingNode(edge.to))?;
719        let mut input = PlanInput::new(edge.from);
720        if let Some(label) = &edge.label {
721            input.label = Some(label.clone());
722        }
723        if !edge.metadata.is_empty() {
724            input.metadata = edge.metadata.clone();
725        }
726        target.inputs.push(input);
727
728        let source = self
729            .nodes
730            .get_mut(&edge.from)
731            .ok_or(PlanGraphError::MissingNode(edge.from))?;
732        if !source.outputs.contains(&edge.to) {
733            source.outputs.push(edge.to);
734        }
735
736        self.edges.insert(key, edge);
737        Ok(())
738    }
739
740    pub fn add_root(&mut self, node_id: PlanNodeId) -> PlanGraphResult<()> {
741        if !self.nodes.contains_key(&node_id) {
742            return Err(PlanGraphError::UnknownRoot(node_id));
743        }
744        self.roots.insert(node_id);
745        Ok(())
746    }
747
748    pub fn annotations_mut(&mut self) -> &mut PlanAnnotations {
749        &mut self.annotations
750    }
751
752    pub fn finish(self) -> PlanGraphResult<PlanGraph> {
753        let nodes: Vec<PlanNode> = self.nodes.into_values().collect();
754        let edges: Vec<PlanEdge> = self.edges.into_values().collect();
755        let root_nodes: Vec<PlanNodeId> = self.roots.into_iter().collect();
756
757        let mut graph = PlanGraph {
758            version: self.version,
759            nodes,
760            edges,
761            root_nodes,
762            annotations: self.annotations,
763        };
764        graph.normalize();
765        graph.validate()?;
766        Ok(graph)
767    }
768}
769
770impl Default for PlanGraphBuilder {
771    fn default() -> Self {
772        Self {
773            version: PLAN_GRAPH_VERSION,
774            nodes: BTreeMap::new(),
775            edges: BTreeMap::new(),
776            roots: BTreeSet::new(),
777            annotations: PlanAnnotations::default(),
778        }
779    }
780}
781
782fn metadata_as_vec(metadata: &PlanEdgeMetadata) -> Vec<(String, String)> {
783    metadata
784        .properties
785        .iter()
786        .map(|(k, v)| (k.clone(), v.clone()))
787        .collect()
788}
789
790fn build_node_label(node: &PlanNode) -> String {
791    let mut lines = Vec::new();
792    lines.push(node.kind.to_string());
793    if !node.schema.is_empty() {
794        let fields: Vec<String> = node
795            .schema
796            .iter()
797            .map(|field| {
798                if field.nullable {
799                    format!("{}:{}?", field.name, field.data_type)
800                } else {
801                    format!("{}:{}", field.name, field.data_type)
802                }
803            })
804            .collect();
805        lines.push(format!("schema: {}", fields.join(", ")));
806    }
807    if let Some(card) = node.cardinality {
808        lines.push(format!("card: {}", card));
809    }
810    if let Some(cost) = node.cost {
811        lines.push(format!("cost: {:.4}", cost));
812    }
813    if let Some(idx) = &node.chosen_index {
814        lines.push(format!("index: {idx}"));
815    }
816    for expr in &node.predicates {
817        lines.push(format!("pred: {}", expr.display));
818    }
819    for expr in &node.projections {
820        lines.push(format!("proj: {}", expr.display));
821    }
822    for (key, value) in &node.metadata.properties {
823        lines.push(format!("{key}: {value}"));
824    }
825    lines.join("\n")
826}
827
828fn format_edge_metadata(metadata: &PlanEdgeMetadata) -> String {
829    metadata
830        .properties
831        .iter()
832        .map(|(k, v)| format!("{k}={v}"))
833        .collect::<Vec<_>>()
834        .join(", ")
835}
836
837fn escape_dot_label(input: &str) -> String {
838    input
839        .replace('\\', "\\\\")
840        .replace('"', "\\\"")
841        .replace('\n', "\\n")
842}