llkv_plan/
plan_graph.rs

1//! Planner graph intermediate representation (IR).
2//!
3//! This module defines a directed acyclic graph (DAG) description that the
4//! query planner can emit after it has applied logical and physical
5//! optimizations. The IR is designed to serve multiple audiences:
6//!
7//! * **Executors** can materialize runtime operators from the in-memory
8//!   [`PlanGraph`] structure.
9//! * **Tests and tooling** can capture deterministic [`serde_json`] or DOT
10//!   artifacts produced via [`PlanGraph::to_json`] and [`PlanGraph::to_dot`].
11//! * **Developers** gain visibility into planner decisions (chosen indices,
12//!   predicates, costing) that can be rendered with Graphviz or other
13//!   visualizers.
14//!
15//! The representation concentrates on a few key goals:
16//!
17//! * Deterministic ordering for reproducible snapshot tests.
18//! * Explicit metadata for schema, expressions, and physical properties.
19//! * Validation helpers that guarantee the structure is a proper DAG with
20//!   internally consistent edges.
21//!
22//! The `PlanGraph` intentionally avoids referencing heavy Arrow or execution
23//! types directly; instead, it records human-readable summaries (for example
24//! `DataType` names) so that serialization stays lightweight.
25
26use std::collections::{BTreeMap, BTreeSet, VecDeque};
27use std::fmt;
28
29use serde::{Deserialize, Serialize};
30use thiserror::Error;
31
32/// Semantic version identifier for the plan graph payload.
33#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
34pub struct PlanGraphVersion {
35    pub major: u16,
36    pub minor: u16,
37}
38
39impl PlanGraphVersion {
40    pub const fn new(major: u16, minor: u16) -> Self {
41        Self { major, minor }
42    }
43}
44
45impl fmt::Display for PlanGraphVersion {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        write!(f, "{}.{:02}", self.major, self.minor)
48    }
49}
50
51/// Current version of the planner IR.
52pub const PLAN_GRAPH_VERSION: PlanGraphVersion = PlanGraphVersion::new(0, 1);
53
54/// Unique identifier for a planner node.
55#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
56#[serde(transparent)]
57pub struct PlanNodeId(pub u32);
58
59impl PlanNodeId {
60    pub const fn new(value: u32) -> Self {
61        Self(value)
62    }
63
64    #[inline]
65    pub const fn as_u32(self) -> u32 {
66        self.0
67    }
68}
69
70impl fmt::Display for PlanNodeId {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        write!(f, "n{}", self.0)
73    }
74}
75
76/// Planner node operator kind.
77#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
78#[serde(rename_all = "snake_case")]
79pub enum PlanOperator {
80    TableScan,
81    IndexScan,
82    Filter,
83    Project,
84    Aggregate,
85    Sort,
86    Limit,
87    TopK,
88    HashJoin,
89    NestedLoopJoin,
90    MergeJoin,
91    Window,
92    Union,
93    Intersect,
94    Difference,
95    Values,
96    Materialize,
97    Output,
98    Explain,
99}
100
101impl fmt::Display for PlanOperator {
102    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103        write!(f, "{:?}", self)
104    }
105}
106
107/// Column description carried with each node's output schema.
108#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
109pub struct PlanField {
110    pub name: String,
111    pub data_type: String,
112    #[serde(default)]
113    pub nullable: bool,
114}
115
116impl PlanField {
117    pub fn new<N: Into<String>, T: Into<String>>(name: N, data_type: T) -> Self {
118        Self {
119            name: name.into(),
120            data_type: data_type.into(),
121            nullable: true,
122        }
123    }
124
125    pub fn with_nullability(mut self, nullable: bool) -> Self {
126        self.nullable = nullable;
127        self
128    }
129}
130
131/// Expression or projection annotations associated with a node.
132#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
133pub struct PlanExpression {
134    pub display: String,
135    #[serde(default, skip_serializing_if = "Option::is_none")]
136    pub fingerprint: Option<String>,
137    #[serde(default, skip_serializing_if = "Option::is_none")]
138    pub slot: Option<String>,
139}
140
141impl PlanExpression {
142    pub fn new<S: Into<String>>(display: S) -> Self {
143        Self {
144            display: display.into(),
145            fingerprint: None,
146            slot: None,
147        }
148    }
149
150    pub fn with_fingerprint<S: Into<String>>(mut self, fingerprint: S) -> Self {
151        self.fingerprint = Some(fingerprint.into());
152        self
153    }
154
155    pub fn with_slot<S: Into<String>>(mut self, slot: S) -> Self {
156        self.slot = Some(slot.into());
157        self
158    }
159}
160
161/// Free-form metadata map for nodes.
162#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
163pub struct PlanNodeMetadata {
164    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
165    pub properties: BTreeMap<String, String>,
166}
167
168impl PlanNodeMetadata {
169    #[inline]
170    pub fn is_empty(&self) -> bool {
171        self.properties.is_empty()
172    }
173
174    pub fn insert<K: Into<String>, V: Into<String>>(&mut self, key: K, value: V) {
175        self.properties.insert(key.into(), value.into());
176    }
177}
178
179/// Metadata associated with plan edges or inputs.
180#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
181pub struct PlanEdgeMetadata {
182    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
183    pub properties: BTreeMap<String, String>,
184}
185
186impl PlanEdgeMetadata {
187    #[inline]
188    pub fn is_empty(&self) -> bool {
189        self.properties.is_empty()
190    }
191
192    pub fn insert<K: Into<String>, V: Into<String>>(&mut self, key: K, value: V) {
193        self.properties.insert(key.into(), value.into());
194    }
195}
196
197/// Incoming connection for a node.
198#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
199pub struct PlanInput {
200    pub source: PlanNodeId,
201    #[serde(default, skip_serializing_if = "Option::is_none")]
202    pub label: Option<String>,
203    #[serde(default, skip_serializing_if = "PlanEdgeMetadata::is_empty")]
204    pub metadata: PlanEdgeMetadata,
205}
206
207impl PlanInput {
208    pub fn new(source: PlanNodeId) -> Self {
209        Self {
210            source,
211            label: None,
212            metadata: PlanEdgeMetadata::default(),
213        }
214    }
215
216    pub fn with_label<S: Into<String>>(mut self, label: S) -> Self {
217        self.label = Some(label.into());
218        self
219    }
220
221    pub fn with_metadata(mut self, metadata: PlanEdgeMetadata) -> Self {
222        self.metadata = metadata;
223        self
224    }
225}
226
227/// Edge between two planner nodes.
228#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
229pub struct PlanEdge {
230    pub from: PlanNodeId,
231    pub to: PlanNodeId,
232    #[serde(default, skip_serializing_if = "Option::is_none")]
233    pub label: Option<String>,
234    #[serde(default, skip_serializing_if = "PlanEdgeMetadata::is_empty")]
235    pub metadata: PlanEdgeMetadata,
236}
237
238impl PlanEdge {
239    pub fn new(from: PlanNodeId, to: PlanNodeId) -> Self {
240        Self {
241            from,
242            to,
243            label: None,
244            metadata: PlanEdgeMetadata::default(),
245        }
246    }
247
248    pub fn with_label<S: Into<String>>(mut self, label: S) -> Self {
249        self.label = Some(label.into());
250        self
251    }
252
253    pub fn with_metadata(mut self, metadata: PlanEdgeMetadata) -> Self {
254        self.metadata = metadata;
255        self
256    }
257}
258
259/// Aggregate annotations that apply to the whole plan.
260#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
261pub struct PlanAnnotations {
262    #[serde(default, skip_serializing_if = "Option::is_none")]
263    pub logical_plan_fingerprint: Option<String>,
264    #[serde(default, skip_serializing_if = "Option::is_none")]
265    pub description: Option<String>,
266    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
267    pub properties: BTreeMap<String, String>,
268}
269
270impl PlanAnnotations {
271    #[inline]
272    pub fn is_empty(&self) -> bool {
273        self.logical_plan_fingerprint.is_none()
274            && self.description.is_none()
275            && self.properties.is_empty()
276    }
277}
278
279/// Planner node with annotations used to describe how operators compose.
280#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
281pub struct PlanNode {
282    pub id: PlanNodeId,
283    pub kind: PlanOperator,
284    #[serde(default, skip_serializing_if = "Vec::is_empty")]
285    pub inputs: Vec<PlanInput>,
286    #[serde(default, skip_serializing_if = "Vec::is_empty")]
287    pub outputs: Vec<PlanNodeId>,
288    #[serde(default, skip_serializing_if = "Vec::is_empty")]
289    pub schema: Vec<PlanField>,
290    #[serde(default, skip_serializing_if = "Vec::is_empty")]
291    pub predicates: Vec<PlanExpression>,
292    #[serde(default, skip_serializing_if = "Vec::is_empty")]
293    pub projections: Vec<PlanExpression>,
294    #[serde(default, skip_serializing_if = "Option::is_none")]
295    pub cost: Option<f64>,
296    #[serde(default, skip_serializing_if = "Option::is_none")]
297    pub cardinality: Option<u64>,
298    #[serde(default, skip_serializing_if = "Option::is_none")]
299    pub chosen_index: Option<String>,
300    #[serde(default, skip_serializing_if = "PlanNodeMetadata::is_empty")]
301    pub metadata: PlanNodeMetadata,
302}
303
304impl PlanNode {
305    pub fn new(id: PlanNodeId, kind: PlanOperator) -> Self {
306        Self {
307            id,
308            kind,
309            inputs: Vec::new(),
310            outputs: Vec::new(),
311            schema: Vec::new(),
312            predicates: Vec::new(),
313            projections: Vec::new(),
314            cost: None,
315            cardinality: None,
316            chosen_index: None,
317            metadata: PlanNodeMetadata::default(),
318        }
319    }
320
321    pub fn add_field(&mut self, field: PlanField) {
322        self.schema.push(field);
323    }
324
325    pub fn add_predicate(&mut self, expr: PlanExpression) {
326        self.predicates.push(expr);
327    }
328
329    pub fn add_projection(&mut self, expr: PlanExpression) {
330        self.projections.push(expr);
331    }
332}
333
334/// Errors raised while constructing or validating a plan graph.
335#[derive(Error, Debug)]
336pub enum PlanGraphError {
337    #[error("duplicate node id {0}")]
338    DuplicateNode(PlanNodeId),
339    #[error("edge references missing node {0}")]
340    MissingNode(PlanNodeId),
341    #[error("edge creates self-loop on node {0}")]
342    SelfLoop(PlanNodeId),
343    #[error("duplicate edge {from:?} -> {to:?}")]
344    DuplicateEdge { from: PlanNodeId, to: PlanNodeId },
345    #[error("node {0} already contains wired edges; use the builder to manage connections")]
346    NodeAlreadyConnected(PlanNodeId),
347    #[error("root node {0} not present in graph")]
348    UnknownRoot(PlanNodeId),
349    #[error("root node {0} receives inputs")]
350    RootHasInputs(PlanNodeId),
351    #[error("node {node} inputs are inconsistent with edge set")]
352    InputsDoNotMatch { node: PlanNodeId },
353    #[error("node {node} outputs are inconsistent with edge set")]
354    OutputsDoNotMatch { node: PlanNodeId },
355    #[error("cycle detected in plan graph")]
356    CycleDetected,
357    #[error("serialization error: {0}")]
358    Serde(#[from] serde_json::Error),
359}
360
361pub type PlanGraphResult<T> = Result<T, PlanGraphError>;
362
363/// Immutable DAG describing the planner output.
364#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
365pub struct PlanGraph {
366    pub version: PlanGraphVersion,
367    pub nodes: Vec<PlanNode>,
368    #[serde(default, skip_serializing_if = "Vec::is_empty")]
369    pub edges: Vec<PlanEdge>,
370    #[serde(default, skip_serializing_if = "Vec::is_empty")]
371    pub root_nodes: Vec<PlanNodeId>,
372    #[serde(default, skip_serializing_if = "PlanAnnotations::is_empty")]
373    pub annotations: PlanAnnotations,
374}
375
376impl PlanGraph {
377    pub fn builder() -> PlanGraphBuilder {
378        PlanGraphBuilder::default()
379    }
380
381    pub fn is_empty(&self) -> bool {
382        self.nodes.is_empty()
383    }
384
385    pub fn version(&self) -> PlanGraphVersion {
386        self.version
387    }
388
389    pub fn root_nodes(&self) -> &[PlanNodeId] {
390        &self.root_nodes
391    }
392
393    pub fn validate(&self) -> PlanGraphResult<()> {
394        let mut index_by_id: BTreeMap<PlanNodeId, usize> = BTreeMap::new();
395        for (idx, node) in self.nodes.iter().enumerate() {
396            if index_by_id.insert(node.id, idx).is_some() {
397                return Err(PlanGraphError::DuplicateNode(node.id));
398            }
399        }
400
401        let mut indegree = vec![0usize; self.nodes.len()];
402        let mut outgoing: Vec<Vec<usize>> = vec![Vec::new(); self.nodes.len()];
403        let mut seen_edges: BTreeSet<(PlanNodeId, PlanNodeId, Option<String>)> = BTreeSet::new();
404
405        for edge in &self.edges {
406            if edge.from == edge.to {
407                return Err(PlanGraphError::SelfLoop(edge.from));
408            }
409            let from_idx = index_by_id
410                .get(&edge.from)
411                .copied()
412                .ok_or(PlanGraphError::MissingNode(edge.from))?;
413            let to_idx = index_by_id
414                .get(&edge.to)
415                .copied()
416                .ok_or(PlanGraphError::MissingNode(edge.to))?;
417
418            let key = (edge.from, edge.to, edge.label.clone());
419            if !seen_edges.insert(key) {
420                return Err(PlanGraphError::DuplicateEdge {
421                    from: edge.from,
422                    to: edge.to,
423                });
424            }
425
426            indegree[to_idx] += 1;
427            outgoing[from_idx].push(to_idx);
428        }
429
430        // Validate inputs and outputs line up with edges.
431        #[allow(clippy::type_complexity)] // TODO: Refactor
432        let mut expected_inputs: BTreeMap<
433            PlanNodeId,
434            Vec<(PlanNodeId, Option<String>, Vec<(String, String)>)>,
435        > = BTreeMap::new();
436        let mut expected_outputs: BTreeMap<PlanNodeId, BTreeSet<PlanNodeId>> = BTreeMap::new();
437
438        for edge in &self.edges {
439            let metadata_vec = metadata_as_vec(&edge.metadata);
440            expected_inputs.entry(edge.to).or_default().push((
441                edge.from,
442                edge.label.clone(),
443                metadata_vec.clone(),
444            ));
445            expected_outputs
446                .entry(edge.from)
447                .or_default()
448                .insert(edge.to);
449        }
450
451        for node in &self.nodes {
452            #[allow(clippy::type_complexity)] // TODO: Refactor
453            let mut actual_inputs: Vec<(
454                PlanNodeId,
455                Option<String>,
456                Vec<(String, String)>,
457            )> = node
458                .inputs
459                .iter()
460                .map(|input| {
461                    (
462                        input.source,
463                        input.label.clone(),
464                        metadata_as_vec(&input.metadata),
465                    )
466                })
467                .collect();
468            let mut expected = expected_inputs.remove(&node.id).unwrap_or_default();
469            actual_inputs.sort();
470            expected.sort();
471            if actual_inputs != expected {
472                return Err(PlanGraphError::InputsDoNotMatch { node: node.id });
473            }
474
475            let mut actual_outputs: BTreeSet<PlanNodeId> = node.outputs.iter().copied().collect();
476            let expected_outputs = expected_outputs.remove(&node.id).unwrap_or_default();
477            if actual_outputs != expected_outputs {
478                return Err(PlanGraphError::OutputsDoNotMatch { node: node.id });
479            }
480
481            actual_outputs.clear();
482        }
483
484        // Remaining expected inputs implies an edge pointed to a missing node entry.
485        if let Some((node, _)) = expected_inputs.iter().next() {
486            return Err(PlanGraphError::MissingNode(*node));
487        }
488
489        // Validate roots.
490        let mut root_set: BTreeSet<PlanNodeId> = BTreeSet::new();
491        for root in &self.root_nodes {
492            if !root_set.insert(*root) {
493                return Err(PlanGraphError::DuplicateNode(*root));
494            }
495            if !index_by_id.contains_key(root) {
496                return Err(PlanGraphError::UnknownRoot(*root));
497            }
498        }
499        for root in &self.root_nodes {
500            let idx = index_by_id[root];
501            if indegree[idx] > 0 {
502                return Err(PlanGraphError::RootHasInputs(*root));
503            }
504        }
505
506        // Cycle detection via Kahn's algorithm.
507        let mut queue: VecDeque<usize> = indegree
508            .iter()
509            .enumerate()
510            .filter_map(|(idx, &deg)| if deg == 0 { Some(idx) } else { None })
511            .collect();
512        let mut visited = 0usize;
513        let mut indegree_mut = indegree.clone();
514        while let Some(node_idx) = queue.pop_front() {
515            visited += 1;
516            for &child in &outgoing[node_idx] {
517                indegree_mut[child] -= 1;
518                if indegree_mut[child] == 0 {
519                    queue.push_back(child);
520                }
521            }
522        }
523
524        if visited != self.nodes.len() {
525            return Err(PlanGraphError::CycleDetected);
526        }
527
528        Ok(())
529    }
530
531    pub fn to_dot(&self) -> PlanGraphResult<String> {
532        self.validate()?;
533
534        let mut dot = String::new();
535        dot.push_str("digraph PlanGraph {\n");
536        dot.push_str("  graph [rankdir=LR];\n");
537        dot.push_str("  node [shape=box, fontname=\"Helvetica\"];\n");
538
539        let root_set: BTreeSet<PlanNodeId> = self.root_nodes.iter().copied().collect();
540        for node in &self.nodes {
541            let shape_attr = if root_set.contains(&node.id) {
542                "shape=doublecircle, style=bold"
543            } else {
544                "shape=box"
545            };
546            let label = escape_dot_label(&build_node_label(node));
547            dot.push_str(&format!(
548                "  \"{id}\" [{shape} label=\"{label}\"];\n",
549                id = node.id,
550                shape = shape_attr,
551                label = label
552            ));
553        }
554
555        for edge in &self.edges {
556            let label = edge
557                .label
558                .as_ref()
559                .map(|label| escape_dot_label(label))
560                .or_else(|| {
561                    if edge.metadata.is_empty() {
562                        None
563                    } else {
564                        Some(escape_dot_label(&format_edge_metadata(&edge.metadata)))
565                    }
566                });
567            match label {
568                Some(text) => {
569                    dot.push_str(&format!(
570                        "  \"{from}\" -> \"{to}\" [label=\"{label}\"];\n",
571                        from = edge.from,
572                        to = edge.to,
573                        label = text
574                    ));
575                }
576                None => {
577                    dot.push_str(&format!(
578                        "  \"{from}\" -> \"{to}\";\n",
579                        from = edge.from,
580                        to = edge.to
581                    ));
582                }
583            }
584        }
585
586        dot.push('}');
587        Ok(dot)
588    }
589
590    pub fn to_json(&self) -> PlanGraphResult<String> {
591        self.validate()?;
592        let json = serde_json::to_string_pretty(self)?;
593        Ok(json)
594    }
595
596    pub fn from_json(json: &str) -> PlanGraphResult<Self> {
597        let mut graph: PlanGraph = serde_json::from_str(json)?;
598        graph.normalize();
599        graph.validate()?;
600        Ok(graph)
601    }
602
603    pub fn topological_order(&self) -> PlanGraphResult<Vec<PlanNodeId>> {
604        self.validate()?;
605
606        let mut index_by_id: BTreeMap<PlanNodeId, usize> = BTreeMap::new();
607        for (idx, node) in self.nodes.iter().enumerate() {
608            index_by_id.insert(node.id, idx);
609        }
610
611        let mut indegree = vec![0usize; self.nodes.len()];
612        let mut outgoing: Vec<Vec<usize>> = vec![Vec::new(); self.nodes.len()];
613        for edge in &self.edges {
614            let from_idx = index_by_id[&edge.from];
615            let to_idx = index_by_id[&edge.to];
616            indegree[to_idx] += 1;
617            outgoing[from_idx].push(to_idx);
618        }
619
620        let mut queue: VecDeque<usize> = indegree
621            .iter()
622            .enumerate()
623            .filter_map(|(idx, &deg)| if deg == 0 { Some(idx) } else { None })
624            .collect();
625        let mut order = Vec::with_capacity(self.nodes.len());
626        let mut indegree_mut = indegree;
627        while let Some(idx) = queue.pop_front() {
628            order.push(self.nodes[idx].id);
629            for &child in &outgoing[idx] {
630                indegree_mut[child] -= 1;
631                if indegree_mut[child] == 0 {
632                    queue.push_back(child);
633                }
634            }
635        }
636
637        if order.len() != self.nodes.len() {
638            return Err(PlanGraphError::CycleDetected);
639        }
640
641        Ok(order)
642    }
643
644    fn normalize(&mut self) {
645        self.nodes.sort_by(|a, b| a.id.cmp(&b.id));
646        for node in &mut self.nodes {
647            node.inputs.sort_by(|left, right| {
648                left.source
649                    .cmp(&right.source)
650                    .then(left.label.cmp(&right.label))
651                    .then_with(|| {
652                        metadata_as_vec(&left.metadata).cmp(&metadata_as_vec(&right.metadata))
653                    })
654            });
655            node.outputs.sort();
656        }
657        self.edges.sort_by(|a, b| {
658            a.from
659                .cmp(&b.from)
660                .then(a.to.cmp(&b.to))
661                .then(a.label.cmp(&b.label))
662        });
663        self.root_nodes.sort();
664    }
665}
666
667/// Builder for `PlanGraph` that enforces DAG invariants while allowing
668/// incremental construction.
669pub struct PlanGraphBuilder {
670    version: PlanGraphVersion,
671    nodes: BTreeMap<PlanNodeId, PlanNode>,
672    edges: BTreeMap<(PlanNodeId, PlanNodeId, Option<String>), PlanEdge>,
673    roots: BTreeSet<PlanNodeId>,
674    annotations: PlanAnnotations,
675}
676
677impl PlanGraphBuilder {
678    pub fn new() -> Self {
679        Self::default()
680    }
681
682    pub fn with_version(version: PlanGraphVersion) -> Self {
683        Self {
684            version,
685            ..Self::default()
686        }
687    }
688
689    pub fn add_node(&mut self, node: PlanNode) -> PlanGraphResult<()> {
690        let node_id = node.id;
691        if !node.inputs.is_empty() || !node.outputs.is_empty() {
692            return Err(PlanGraphError::NodeAlreadyConnected(node_id));
693        }
694        if self.nodes.insert(node_id, node).is_some() {
695            return Err(PlanGraphError::DuplicateNode(node_id));
696        }
697        Ok(())
698    }
699
700    pub fn add_edge(&mut self, edge: PlanEdge) -> PlanGraphResult<()> {
701        if edge.from == edge.to {
702            return Err(PlanGraphError::SelfLoop(edge.from));
703        }
704        let key = (edge.from, edge.to, edge.label.clone());
705        if self.edges.contains_key(&key) {
706            return Err(PlanGraphError::DuplicateEdge {
707                from: edge.from,
708                to: edge.to,
709            });
710        }
711
712        let target = self
713            .nodes
714            .get_mut(&edge.to)
715            .ok_or(PlanGraphError::MissingNode(edge.to))?;
716        let mut input = PlanInput::new(edge.from);
717        if let Some(label) = &edge.label {
718            input.label = Some(label.clone());
719        }
720        if !edge.metadata.is_empty() {
721            input.metadata = edge.metadata.clone();
722        }
723        target.inputs.push(input);
724
725        let source = self
726            .nodes
727            .get_mut(&edge.from)
728            .ok_or(PlanGraphError::MissingNode(edge.from))?;
729        if !source.outputs.contains(&edge.to) {
730            source.outputs.push(edge.to);
731        }
732
733        self.edges.insert(key, edge);
734        Ok(())
735    }
736
737    pub fn add_root(&mut self, node_id: PlanNodeId) -> PlanGraphResult<()> {
738        if !self.nodes.contains_key(&node_id) {
739            return Err(PlanGraphError::UnknownRoot(node_id));
740        }
741        self.roots.insert(node_id);
742        Ok(())
743    }
744
745    pub fn annotations_mut(&mut self) -> &mut PlanAnnotations {
746        &mut self.annotations
747    }
748
749    pub fn finish(self) -> PlanGraphResult<PlanGraph> {
750        let nodes: Vec<PlanNode> = self.nodes.into_values().collect();
751        let edges: Vec<PlanEdge> = self.edges.into_values().collect();
752        let root_nodes: Vec<PlanNodeId> = self.roots.into_iter().collect();
753
754        let mut graph = PlanGraph {
755            version: self.version,
756            nodes,
757            edges,
758            root_nodes,
759            annotations: self.annotations,
760        };
761        graph.normalize();
762        graph.validate()?;
763        Ok(graph)
764    }
765}
766
767impl Default for PlanGraphBuilder {
768    fn default() -> Self {
769        Self {
770            version: PLAN_GRAPH_VERSION,
771            nodes: BTreeMap::new(),
772            edges: BTreeMap::new(),
773            roots: BTreeSet::new(),
774            annotations: PlanAnnotations::default(),
775        }
776    }
777}
778
779fn metadata_as_vec(metadata: &PlanEdgeMetadata) -> Vec<(String, String)> {
780    metadata
781        .properties
782        .iter()
783        .map(|(k, v)| (k.clone(), v.clone()))
784        .collect()
785}
786
787fn build_node_label(node: &PlanNode) -> String {
788    let mut lines = Vec::new();
789    lines.push(node.kind.to_string());
790    if !node.schema.is_empty() {
791        let fields: Vec<String> = node
792            .schema
793            .iter()
794            .map(|field| {
795                if field.nullable {
796                    format!("{}:{}?", field.name, field.data_type)
797                } else {
798                    format!("{}:{}", field.name, field.data_type)
799                }
800            })
801            .collect();
802        lines.push(format!("schema: {}", fields.join(", ")));
803    }
804    if let Some(card) = node.cardinality {
805        lines.push(format!("card: {}", card));
806    }
807    if let Some(cost) = node.cost {
808        lines.push(format!("cost: {:.4}", cost));
809    }
810    if let Some(idx) = &node.chosen_index {
811        lines.push(format!("index: {idx}"));
812    }
813    for expr in &node.predicates {
814        lines.push(format!("pred: {}", expr.display));
815    }
816    for expr in &node.projections {
817        lines.push(format!("proj: {}", expr.display));
818    }
819    for (key, value) in &node.metadata.properties {
820        lines.push(format!("{key}: {value}"));
821    }
822    lines.join("\n")
823}
824
825fn format_edge_metadata(metadata: &PlanEdgeMetadata) -> String {
826    metadata
827        .properties
828        .iter()
829        .map(|(k, v)| format!("{k}={v}"))
830        .collect::<Vec<_>>()
831        .join(", ")
832}
833
834fn escape_dot_label(input: &str) -> String {
835    input
836        .replace('\\', "\\\\")
837        .replace('"', "\\\"")
838        .replace('\n', "\\n")
839}