elif_orm/loading/optimizer/
plan.rs

1use crate::{
2    error::{OrmError, OrmResult},
3    relationships::{metadata::RelationshipMetadata, RelationshipType},
4};
5use std::collections::{HashMap, HashSet};
6
7/// Represents a node in the query execution plan
8#[derive(Debug, Clone)]
9pub struct QueryNode {
10    /// Unique identifier for this node
11    pub id: String,
12    /// Table to query
13    pub table: String,
14    /// Type of relationship from parent
15    pub relationship_type: Option<RelationshipType>,
16    /// Full relationship metadata if available
17    pub relationship_metadata: Option<RelationshipMetadata>,
18    /// Parent node ID (None for root)
19    pub parent_id: Option<String>,
20    /// Child node IDs
21    pub children: Vec<String>,
22    /// Depth in the relationship tree
23    pub depth: usize,
24    /// Estimated row count (can be updated from metadata)
25    pub estimated_rows: usize,
26    /// Whether this node can be executed in parallel with siblings
27    pub parallel_safe: bool,
28    /// Foreign key used to join with parent
29    pub foreign_key: Option<String>,
30    /// Additional constraints for optimization
31    pub constraints: Vec<String>,
32    /// Column names available in this table (for better query construction)
33    pub available_columns: Vec<String>,
34    /// Index hints for the optimizer
35    pub index_hints: Vec<String>,
36}
37
38impl QueryNode {
39    /// Create a new query node
40    pub fn new(id: String, table: String) -> Self {
41        Self {
42            id,
43            table,
44            relationship_type: None,
45            relationship_metadata: None,
46            parent_id: None,
47            children: Vec::new(),
48            depth: 0,
49            estimated_rows: 1000, // Default estimate
50            parallel_safe: true,
51            foreign_key: None,
52            constraints: Vec::new(),
53            available_columns: Vec::new(),
54            index_hints: Vec::new(),
55        }
56    }
57
58    /// Create a root node (no parent)
59    pub fn root(id: String, table: String) -> Self {
60        let mut node = Self::new(id, table);
61        node.depth = 0;
62        node
63    }
64
65    /// Create a child node with parent relationship
66    pub fn child(
67        id: String,
68        table: String,
69        parent_id: String,
70        relationship_type: RelationshipType,
71        foreign_key: String,
72    ) -> Self {
73        let mut node = Self::new(id, table);
74        node.parent_id = Some(parent_id);
75        node.relationship_type = Some(relationship_type);
76        node.foreign_key = Some(foreign_key);
77        node
78    }
79
80    /// Create a child node with full relationship metadata
81    pub fn child_with_metadata(
82        id: String,
83        table: String,
84        parent_id: String,
85        metadata: RelationshipMetadata,
86    ) -> Self {
87        let mut node = Self::new(id, table);
88        node.parent_id = Some(parent_id);
89        node.relationship_type = Some(metadata.relationship_type);
90        node.relationship_metadata = Some(metadata.clone());
91        node.foreign_key = Some(metadata.foreign_key.primary_column().to_string());
92
93        // Set estimated rows based on relationship type
94        node.estimated_rows = match metadata.relationship_type {
95            RelationshipType::HasMany
96            | RelationshipType::ManyToMany
97            | RelationshipType::MorphMany => 10000,
98            _ => 1, // HasOne, BelongsTo, MorphOne, MorphTo
99        };
100
101        // Set parallel safety based on relationship characteristics
102        node.parallel_safe = !metadata.relationship_type.requires_pivot();
103
104        node
105    }
106
107    /// Add a child node ID
108    pub fn add_child(&mut self, child_id: String) {
109        if !self.children.contains(&child_id) {
110            self.children.push(child_id);
111        }
112    }
113
114    /// Set the depth for this node
115    pub fn set_depth(&mut self, depth: usize) {
116        self.depth = depth;
117    }
118
119    /// Set row count estimate
120    pub fn set_estimated_rows(&mut self, rows: usize) {
121        self.estimated_rows = rows;
122    }
123
124    /// Set parallel safety
125    pub fn set_parallel_safe(&mut self, safe: bool) {
126        self.parallel_safe = safe;
127    }
128
129    /// Add a constraint
130    pub fn add_constraint(&mut self, constraint: String) {
131        self.constraints.push(constraint);
132    }
133
134    /// Check if this node is a root node
135    pub fn is_root(&self) -> bool {
136        self.parent_id.is_none()
137    }
138
139    /// Check if this node is a leaf node
140    pub fn is_leaf(&self) -> bool {
141        self.children.is_empty()
142    }
143
144    /// Update metadata for this node
145    pub fn set_metadata(&mut self, metadata: RelationshipMetadata) {
146        self.relationship_type = Some(metadata.relationship_type);
147        self.relationship_metadata = Some(metadata.clone());
148        self.foreign_key = Some(metadata.foreign_key.primary_column().to_string());
149
150        // Update estimates based on metadata
151        self.estimated_rows = match metadata.relationship_type {
152            RelationshipType::HasMany
153            | RelationshipType::ManyToMany
154            | RelationshipType::MorphMany => 10000,
155            _ => 1,
156        };
157
158        self.parallel_safe = !metadata.relationship_type.requires_pivot();
159    }
160
161    /// Set column information for better query construction
162    pub fn set_columns(&mut self, columns: Vec<String>) {
163        self.available_columns = columns;
164    }
165
166    /// Add index hints for optimization
167    pub fn add_index_hint(&mut self, index: String) {
168        if !self.index_hints.contains(&index) {
169            self.index_hints.push(index);
170        }
171    }
172
173    /// Get the primary key column name (defaults to "id")
174    pub fn primary_key(&self) -> &str {
175        if let Some(metadata) = &self.relationship_metadata {
176            &metadata.local_key
177        } else {
178            "id"
179        }
180    }
181
182    /// Get the foreign key column name for relationships
183    pub fn get_foreign_key(&self) -> Option<&str> {
184        self.foreign_key.as_deref()
185    }
186
187    /// Check if this node represents a collection relationship
188    pub fn is_collection(&self) -> bool {
189        self.relationship_type
190            .map(|rt| rt.is_collection())
191            .unwrap_or(false)
192    }
193
194    /// Check if this node requires a pivot table
195    pub fn requires_pivot(&self) -> bool {
196        self.relationship_type
197            .map(|rt| rt.requires_pivot())
198            .unwrap_or(false)
199    }
200}
201
202/// Query execution plan for optimized loading
203#[derive(Debug)]
204pub struct QueryPlan {
205    /// All nodes in the plan, keyed by ID
206    pub nodes: HashMap<String, QueryNode>,
207    /// Root node IDs (entry points)
208    pub roots: Vec<String>,
209    /// Execution phases (nodes that can run in parallel)
210    pub execution_phases: Vec<Vec<String>>,
211    /// Maximum depth of the plan
212    pub max_depth: usize,
213    /// Total estimated rows
214    pub total_estimated_rows: usize,
215    /// Plan metadata
216    pub metadata: HashMap<String, serde_json::Value>,
217}
218
219impl QueryPlan {
220    /// Create an empty query plan
221    pub fn new() -> Self {
222        Self {
223            nodes: HashMap::new(),
224            roots: Vec::new(),
225            execution_phases: Vec::new(),
226            max_depth: 0,
227            total_estimated_rows: 0,
228            metadata: HashMap::new(),
229        }
230    }
231
232    /// Add a node to the plan
233    pub fn add_node(&mut self, node: QueryNode) {
234        self.max_depth = self.max_depth.max(node.depth);
235        self.total_estimated_rows += node.estimated_rows;
236
237        if node.parent_id.is_none() {
238            self.roots.push(node.id.clone());
239        }
240
241        // Update parent-child relationships
242        if let Some(parent_id) = &node.parent_id {
243            if let Some(parent) = self.nodes.get_mut(parent_id) {
244                parent.add_child(node.id.clone());
245            }
246        }
247
248        self.nodes.insert(node.id.clone(), node);
249    }
250
251    /// Build execution phases (groups of nodes that can run in parallel)
252    pub fn build_execution_phases(&mut self) -> OrmResult<()> {
253        let mut phases = Vec::new();
254        let mut visited = HashSet::new();
255        let mut current_depth = 0;
256
257        // Validate plan before building phases
258        self.validate()?;
259
260        while visited.len() < self.nodes.len() && current_depth <= self.max_depth {
261            let mut phase_nodes = Vec::new();
262
263            // Find all nodes at the current depth that haven't been visited
264            for (id, node) in &self.nodes {
265                if !visited.contains(id) && node.depth == current_depth {
266                    // Check if all parent nodes have been visited
267                    let ready = if let Some(parent_id) = &node.parent_id {
268                        visited.contains(parent_id)
269                    } else {
270                        true // Root nodes are always ready
271                    };
272
273                    if ready && node.parallel_safe {
274                        phase_nodes.push(id.clone());
275                    }
276                }
277            }
278
279            // If no parallel-safe nodes found, add sequential nodes
280            if phase_nodes.is_empty() {
281                for (id, node) in &self.nodes {
282                    if !visited.contains(id) && node.depth == current_depth {
283                        let ready = if let Some(parent_id) = &node.parent_id {
284                            visited.contains(parent_id)
285                        } else {
286                            true
287                        };
288
289                        if ready {
290                            phase_nodes.push(id.clone());
291                            break; // Only one sequential node per phase
292                        }
293                    }
294                }
295            }
296
297            if phase_nodes.is_empty() {
298                current_depth += 1;
299                continue;
300            }
301
302            // Mark phase nodes as visited
303            for id in &phase_nodes {
304                visited.insert(id.clone());
305            }
306
307            if !phase_nodes.is_empty() {
308                phases.push(phase_nodes);
309            }
310
311            current_depth += 1;
312        }
313
314        self.execution_phases = phases;
315        Ok(())
316    }
317
318    /// Validate the query plan for consistency
319    pub fn validate(&self) -> OrmResult<()> {
320        // Check for circular dependencies
321        for root_id in &self.roots {
322            let mut visited = HashSet::new();
323            let mut path = Vec::new();
324
325            if self.has_cycle_from(root_id, &mut visited, &mut path) {
326                return Err(OrmError::Query(
327                    "Circular dependency detected in query plan".into(),
328                ));
329            }
330        }
331
332        // Validate parent-child relationships
333        for (id, node) in &self.nodes {
334            if let Some(parent_id) = &node.parent_id {
335                if !self.nodes.contains_key(parent_id) {
336                    return Err(OrmError::Query(format!(
337                        "Parent node '{}' not found for node '{}'",
338                        parent_id, id
339                    )));
340                }
341            }
342
343            for child_id in &node.children {
344                if !self.nodes.contains_key(child_id) {
345                    return Err(OrmError::Query(format!(
346                        "Child node '{}' not found for node '{}'",
347                        child_id, id
348                    )));
349                }
350            }
351        }
352
353        Ok(())
354    }
355
356    /// DFS helper for cycle detection
357    fn has_cycle_from(
358        &self,
359        node_id: &str,
360        visited: &mut HashSet<String>,
361        path: &mut Vec<String>,
362    ) -> bool {
363        if path.contains(&node_id.to_string()) {
364            return true; // Found a cycle
365        }
366
367        if visited.contains(node_id) {
368            return false; // Already processed this subtree
369        }
370
371        path.push(node_id.to_string());
372        visited.insert(node_id.to_string());
373
374        if let Some(node) = self.nodes.get(node_id) {
375            for child_id in &node.children {
376                if self.has_cycle_from(child_id, visited, path) {
377                    return true;
378                }
379            }
380        }
381
382        path.pop();
383        false
384    }
385
386    /// Get nodes at a specific depth
387    pub fn nodes_at_depth(&self, depth: usize) -> Vec<&QueryNode> {
388        self.nodes
389            .values()
390            .filter(|node| node.depth == depth)
391            .collect()
392    }
393
394    /// Get all leaf nodes
395    pub fn leaf_nodes(&self) -> Vec<&QueryNode> {
396        self.nodes.values().filter(|node| node.is_leaf()).collect()
397    }
398
399    /// Calculate plan complexity score
400    pub fn complexity_score(&self) -> f64 {
401        let depth_penalty = self.max_depth as f64 * 1.5;
402        let node_penalty = self.nodes.len() as f64 * 0.5;
403        let row_penalty = (self.total_estimated_rows as f64).log10() * 2.0;
404
405        depth_penalty + node_penalty + row_penalty
406    }
407
408    /// Add metadata to the plan
409    pub fn add_metadata(&mut self, key: String, value: serde_json::Value) {
410        self.metadata.insert(key, value);
411    }
412
413    /// Get plan statistics
414    pub fn statistics(&self) -> PlanStatistics {
415        PlanStatistics {
416            total_nodes: self.nodes.len(),
417            root_nodes: self.roots.len(),
418            leaf_nodes: self.leaf_nodes().len(),
419            max_depth: self.max_depth,
420            total_estimated_rows: self.total_estimated_rows,
421            execution_phases: self.execution_phases.len(),
422            complexity_score: self.complexity_score(),
423            parallel_nodes: self.nodes.values().filter(|n| n.parallel_safe).count(),
424        }
425    }
426}
427
428impl Default for QueryPlan {
429    fn default() -> Self {
430        Self::new()
431    }
432}
433
434/// Statistics about a query plan
435#[derive(Debug, Clone)]
436pub struct PlanStatistics {
437    pub total_nodes: usize,
438    pub root_nodes: usize,
439    pub leaf_nodes: usize,
440    pub max_depth: usize,
441    pub total_estimated_rows: usize,
442    pub execution_phases: usize,
443    pub complexity_score: f64,
444    pub parallel_nodes: usize,
445}