elif_orm/loading/
eager_loader.rs

1use crate::{
2    error::OrmResult,
3    loading::{
4        batch_loader::BatchLoader,
5        optimizer::{OptimizationStrategy, PlanExecutor, QueryNode, QueryOptimizer, QueryPlan},
6        query_deduplicator::QueryDeduplicator,
7    },
8    relationships::RelationshipType,
9};
10use serde_json::Value as JsonValue;
11use std::collections::HashMap;
12
13/// Configuration for the eager loader
14#[derive(Debug, Clone)]
15pub struct EagerLoadConfig {
16    /// Maximum batch size for loading
17    pub max_batch_size: usize,
18    /// Enable query deduplication
19    pub deduplicate_queries: bool,
20    /// Maximum depth for nested relationships
21    pub max_depth: usize,
22    /// Enable parallel execution
23    pub enable_parallelism: bool,
24    /// Query timeout in milliseconds
25    pub query_timeout_ms: u64,
26}
27
28impl Default for EagerLoadConfig {
29    fn default() -> Self {
30        Self {
31            max_batch_size: 100,
32            deduplicate_queries: true,
33            max_depth: 10,
34            enable_parallelism: true,
35            query_timeout_ms: 30000,
36        }
37    }
38}
39
40/// Result of an eager loading operation
41#[derive(Debug)]
42pub struct EagerLoadResult {
43    /// Loaded data grouped by entity ID
44    pub data: HashMap<JsonValue, JsonValue>,
45    /// Performance statistics
46    pub stats: EagerLoadStats,
47    /// Applied optimizations
48    pub optimizations: Vec<OptimizationStrategy>,
49}
50
51/// Statistics about the eager loading operation
52#[derive(Debug, Clone)]
53pub struct EagerLoadStats {
54    /// Total execution time in milliseconds
55    pub execution_time_ms: u64,
56    /// Number of database queries executed
57    pub query_count: usize,
58    /// Total records loaded
59    pub records_loaded: usize,
60    /// Number of relationship levels loaded
61    pub depth_loaded: usize,
62    /// Cache hit ratio (0.0 to 1.0)
63    pub cache_hit_ratio: f64,
64}
65
66impl Default for EagerLoadStats {
67    fn default() -> Self {
68        Self {
69            execution_time_ms: 0,
70            query_count: 0,
71            records_loaded: 0,
72            depth_loaded: 0,
73            cache_hit_ratio: 0.0,
74        }
75    }
76}
77
78/// Optimized eager loader for relationship loading with advanced optimization strategies
79pub struct OptimizedEagerLoader {
80    batch_loader: BatchLoader,
81    query_optimizer: QueryOptimizer,
82    plan_executor: PlanExecutor,
83    _query_deduplicator: QueryDeduplicator,
84    config: EagerLoadConfig,
85}
86
87impl OptimizedEagerLoader {
88    /// Create a new optimized eager loader with default configuration
89    pub fn new() -> Self {
90        let config = EagerLoadConfig::default();
91        let batch_loader = BatchLoader::new();
92        Self::with_config(config, batch_loader)
93    }
94
95    /// Create an optimized eager loader with custom configuration
96    pub fn with_config(config: EagerLoadConfig, batch_loader: BatchLoader) -> Self {
97        let query_optimizer = QueryOptimizer::new();
98        let plan_executor = PlanExecutor::with_config(
99            batch_loader.clone(),
100            if config.enable_parallelism { 10 } else { 1 },
101            std::time::Duration::from_millis(config.query_timeout_ms),
102        );
103        let query_deduplicator = QueryDeduplicator::new();
104
105        Self {
106            batch_loader,
107            query_optimizer,
108            plan_executor,
109            _query_deduplicator: query_deduplicator,
110            config,
111        }
112    }
113
114    /// Load relationships eagerly with optimization
115    pub async fn load_with_relationships(
116        &mut self,
117        root_table: &str,
118        root_ids: Vec<JsonValue>,
119        relationships: &str,
120        connection: &sqlx::PgPool,
121    ) -> OrmResult<EagerLoadResult> {
122        let start_time = std::time::Instant::now();
123
124        // Parse and build query plan
125        let mut plan = self.build_query_plan(root_table, &root_ids, relationships)?;
126
127        // Optimize the plan
128        let optimization_strategies = self.query_optimizer.optimize_plan(&mut plan)?;
129
130        // Execute the optimized plan
131        let execution_result = self.plan_executor.execute_plan(&plan, connection).await?;
132
133        // Process results into the expected format
134        let processed_data = self.process_execution_results(execution_result.results, &root_ids)?;
135
136        // Calculate statistics
137        let execution_time = start_time.elapsed();
138        let stats = EagerLoadStats {
139            execution_time_ms: execution_time.as_millis() as u64,
140            query_count: execution_result.stats.query_count,
141            records_loaded: execution_result.stats.rows_fetched,
142            depth_loaded: plan.max_depth,
143            cache_hit_ratio: self.calculate_cache_hit_ratio().await,
144        };
145
146        Ok(EagerLoadResult {
147            data: processed_data,
148            stats,
149            optimizations: optimization_strategies,
150        })
151    }
152
153    /// Load with a specific optimization strategy
154    pub async fn load_with_strategy(
155        &mut self,
156        root_table: &str,
157        root_ids: Vec<JsonValue>,
158        relationships: &str,
159        strategy: OptimizationStrategy,
160        connection: &sqlx::PgPool,
161    ) -> OrmResult<EagerLoadResult> {
162        // Build plan
163        let mut plan = self.build_query_plan(root_table, &root_ids, relationships)?;
164
165        // Apply specific strategy
166        match strategy {
167            OptimizationStrategy::IncreaseParallelism => {
168                self.apply_parallel_optimization(&mut plan)?;
169            }
170            OptimizationStrategy::ReduceBatchSize => {
171                self.apply_batch_size_optimization(&mut plan)?;
172            }
173            OptimizationStrategy::ReorderPhases => {
174                plan.build_execution_phases()?;
175            }
176            _ => {
177                // Apply through optimizer
178                let _strategies = self.query_optimizer.optimize_plan(&mut plan)?;
179            }
180        }
181
182        // Execute with the applied strategy
183        let execution_result = self.plan_executor.execute_plan(&plan, connection).await?;
184        let processed_data = self.process_execution_results(execution_result.results, &root_ids)?;
185
186        let stats = EagerLoadStats {
187            execution_time_ms: 0, // Will be calculated
188            query_count: execution_result.stats.query_count,
189            records_loaded: execution_result.stats.rows_fetched,
190            depth_loaded: plan.max_depth,
191            cache_hit_ratio: self.calculate_cache_hit_ratio().await,
192        };
193
194        Ok(EagerLoadResult {
195            data: processed_data,
196            stats,
197            optimizations: vec![strategy],
198        })
199    }
200
201    /// Build a query plan from relationship specification
202    fn build_query_plan(
203        &self,
204        root_table: &str,
205        root_ids: &[JsonValue],
206        relationships: &str,
207    ) -> OrmResult<QueryPlan> {
208        let mut plan = QueryPlan::new();
209        let mut node_counter = 0;
210
211        // Create root node
212        let root_node_id = format!("root_{}", node_counter);
213        node_counter += 1;
214
215        let mut root_node = QueryNode::root(root_node_id.clone(), root_table.to_string());
216        root_node.set_estimated_rows(root_ids.len());
217        plan.add_node(root_node);
218
219        // Parse relationships and build plan tree
220        if !relationships.is_empty() {
221            self.build_relationship_nodes(
222                &mut plan,
223                &root_node_id,
224                relationships,
225                1, // Start at depth 1
226                &mut node_counter,
227            )?;
228        }
229
230        // Build execution phases
231        plan.build_execution_phases()?;
232
233        Ok(plan)
234    }
235
236    /// Recursively build relationship nodes
237    fn build_relationship_nodes(
238        &self,
239        plan: &mut QueryPlan,
240        parent_node_id: &str,
241        relationships: &str,
242        depth: usize,
243        node_counter: &mut usize,
244    ) -> OrmResult<()> {
245        if depth > self.config.max_depth {
246            return Ok(()); // Prevent infinite recursion
247        }
248
249        // Parse relationship path (e.g., "posts.comments,profile")
250        let parts: Vec<&str> = relationships.split(',').collect();
251
252        for part in parts {
253            let relation_chain: Vec<&str> = part.split('.').collect();
254            self.build_relation_chain(plan, parent_node_id, &relation_chain, depth, node_counter)?;
255        }
256
257        Ok(())
258    }
259
260    /// Build a chain of relationships (e.g., posts.comments.user)
261    fn build_relation_chain(
262        &self,
263        plan: &mut QueryPlan,
264        parent_node_id: &str,
265        chain: &[&str],
266        depth: usize,
267        node_counter: &mut usize,
268    ) -> OrmResult<()> {
269        if chain.is_empty() || depth > self.config.max_depth {
270            return Ok(());
271        }
272
273        let relation_name = chain[0];
274        let node_id = format!("{}_{}", relation_name, *node_counter);
275        *node_counter += 1;
276
277        // Determine relationship type and table mapping
278        let (table_name, relationship_type, foreign_key) =
279            self.get_relationship_info(relation_name)?;
280
281        // Create relationship node
282        let mut node = QueryNode::child(
283            node_id.clone(),
284            table_name,
285            parent_node_id.to_string(),
286            relationship_type,
287            foreign_key,
288        );
289        node.set_depth(depth);
290        node.set_estimated_rows(std::cmp::min(1000, self.config.max_batch_size)); // Reasonable default
291
292        plan.add_node(node);
293
294        // Continue with rest of chain
295        if chain.len() > 1 {
296            self.build_relation_chain(plan, &node_id, &chain[1..], depth + 1, node_counter)?;
297        }
298
299        Ok(())
300    }
301
302    /// Get relationship information for a relation name
303    fn get_relationship_info(
304        &self,
305        relation: &str,
306    ) -> OrmResult<(String, RelationshipType, String)> {
307        // This would normally use metadata from the relationship registry
308        // For now, use convention-based mapping
309        match relation {
310            "posts" => Ok((
311                "posts".to_string(),
312                RelationshipType::HasMany,
313                "user_id".to_string(),
314            )),
315            "comments" => Ok((
316                "comments".to_string(),
317                RelationshipType::HasMany,
318                "post_id".to_string(),
319            )),
320            "user" => Ok((
321                "users".to_string(),
322                RelationshipType::BelongsTo,
323                "user_id".to_string(),
324            )),
325            "profile" => Ok((
326                "profiles".to_string(),
327                RelationshipType::HasOne,
328                "user_id".to_string(),
329            )),
330            _ => {
331                // Default convention: relation name -> table name + _id
332                Ok((
333                    format!("{}s", relation),
334                    RelationshipType::HasMany,
335                    format!("{}_id", relation),
336                ))
337            }
338        }
339    }
340
341    /// Process execution results into the expected eager loading format
342    fn process_execution_results(
343        &self,
344        results: HashMap<String, Vec<JsonValue>>,
345        root_ids: &[JsonValue],
346    ) -> OrmResult<HashMap<JsonValue, JsonValue>> {
347        let mut processed = HashMap::new();
348
349        // For now, create a simplified mapping
350        // In a real implementation, this would properly hydrate relationships
351        for root_id in root_ids.iter() {
352            let mut entity_data = serde_json::json!({
353                "id": root_id,
354                "relationships": {}
355            });
356
357            // Merge in relationship data
358            for (node_id, node_results) in &results {
359                if node_id.starts_with("root_") {
360                    continue; // Skip root nodes
361                }
362
363                // Simple relationship assignment - in reality this would be more complex
364                if let Some(obj) = entity_data.as_object_mut() {
365                    if let Some(relationships) =
366                        obj.get_mut("relationships").and_then(|r| r.as_object_mut())
367                    {
368                        relationships.insert(node_id.clone(), serde_json::json!(node_results));
369                    }
370                }
371            }
372
373            processed.insert(root_id.clone(), entity_data);
374        }
375
376        Ok(processed)
377    }
378
379    /// Apply parallel optimization to the plan
380    fn apply_parallel_optimization(&self, plan: &mut QueryPlan) -> OrmResult<()> {
381        for node in plan.nodes.values_mut() {
382            if node.constraints.is_empty() {
383                node.set_parallel_safe(true);
384            }
385        }
386        plan.build_execution_phases()?;
387        Ok(())
388    }
389
390    /// Apply batch size optimization
391    fn apply_batch_size_optimization(&self, plan: &mut QueryPlan) -> OrmResult<()> {
392        // Reduce estimated rows for large nodes
393        for node in plan.nodes.values_mut() {
394            if node.estimated_rows > 5000 {
395                node.set_estimated_rows(node.estimated_rows / 2);
396            }
397        }
398        Ok(())
399    }
400
401    /// Calculate cache hit ratio
402    async fn calculate_cache_hit_ratio(&self) -> f64 {
403        let stats = self.batch_loader.cache_stats().await;
404        if stats.total_cached_records > 0 {
405            0.75 // Mock hit ratio - in reality would track hits vs misses
406        } else {
407            0.0
408        }
409    }
410
411    /// Get loader configuration
412    pub fn config(&self) -> &EagerLoadConfig {
413        &self.config
414    }
415
416    /// Update configuration
417    pub fn update_config(&mut self, config: EagerLoadConfig) {
418        self.config = config;
419    }
420
421    /// Clear all caches
422    pub async fn clear_caches(&self) {
423        self.batch_loader.clear_cache().await;
424    }
425}
426
427impl Default for OptimizedEagerLoader {
428    fn default() -> Self {
429        Self::new()
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436    use serde_json::json;
437
438    #[test]
439    fn test_eager_load_config_default() {
440        let config = EagerLoadConfig::default();
441        assert_eq!(config.max_batch_size, 100);
442        assert!(config.deduplicate_queries);
443        assert_eq!(config.max_depth, 10);
444        assert!(config.enable_parallelism);
445    }
446
447    #[test]
448    fn test_build_query_plan() {
449        let loader = OptimizedEagerLoader::new();
450        let root_ids = vec![json!(1), json!(2)];
451
452        let plan = loader
453            .build_query_plan("users", &root_ids, "posts.comments")
454            .unwrap();
455
456        assert_eq!(plan.roots.len(), 1);
457        assert!(plan.nodes.len() >= 1); // At least the root node
458        assert_eq!(plan.max_depth, 2); // users -> posts -> comments
459    }
460
461    #[test]
462    fn test_relationship_info_mapping() {
463        let loader = OptimizedEagerLoader::new();
464
465        let (table, rel_type, fk) = loader.get_relationship_info("posts").unwrap();
466        assert_eq!(table, "posts");
467        assert_eq!(rel_type, RelationshipType::HasMany);
468        assert_eq!(fk, "user_id");
469    }
470}