ipfrs_storage/
query_optimizer.rs

1//! Query optimizer for storage operations
2//!
3//! This module provides query optimization and planning for complex storage operations.
4//! It analyzes query patterns and suggests optimal execution strategies.
5//!
6//! # Example
7//!
8//! ```rust
9//! use ipfrs_storage::{QueryOptimizer, QueryPlan, MemoryBlockStore};
10//!
11//! let store = MemoryBlockStore::new();
12//! let optimizer = QueryOptimizer::new();
13//!
14//! // Optimize a batch get operation
15//! let cids = vec![/* ... */];
16//! let plan = optimizer.optimize_batch_get(&cids);
17//! println!("Optimal batch size: {}", plan.batch_size);
18//! ```
19
20use ipfrs_core::Cid;
21use serde::{Deserialize, Serialize};
22use std::collections::HashMap;
23use std::time::Duration;
24
25/// Query execution plan
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct QueryPlan {
28    /// Estimated execution time in microseconds
29    pub estimated_duration_us: u64,
30    /// Recommended batch size for operations
31    pub batch_size: usize,
32    /// Whether to use parallel execution
33    pub use_parallel: bool,
34    /// Estimated memory usage in bytes
35    pub estimated_memory_bytes: usize,
36    /// Strategy to use
37    pub strategy: QueryStrategy,
38    /// Additional optimization hints
39    pub hints: Vec<String>,
40}
41
42/// Query execution strategy
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44pub enum QueryStrategy {
45    /// Sequential execution
46    Sequential,
47    /// Parallel batch execution
48    ParallelBatch,
49    /// Streaming execution
50    Streaming,
51    /// Cache-first strategy
52    CacheFirst,
53    /// Hybrid approach
54    Hybrid,
55}
56
57/// Query optimizer for storage operations
58#[derive(Debug, Clone)]
59pub struct QueryOptimizer {
60    /// Historical query statistics
61    stats: QueryStats,
62    /// Configuration
63    config: OptimizerConfig,
64}
65
66/// Query statistics for optimization
67#[derive(Debug, Clone, Default)]
68struct QueryStats {
69    /// Average block size in bytes
70    avg_block_size: usize,
71    /// Cache hit rate (0.0 to 1.0)
72    cache_hit_rate: f64,
73    /// Average batch operation latency
74    #[allow(dead_code)]
75    avg_batch_latency_us: u64,
76    /// Number of queries analyzed
77    query_count: u64,
78}
79
80/// Optimizer configuration
81#[derive(Debug, Clone)]
82pub struct OptimizerConfig {
83    /// Maximum batch size
84    pub max_batch_size: usize,
85    /// Minimum batch size
86    pub min_batch_size: usize,
87    /// Parallel execution threshold (number of items)
88    pub parallel_threshold: usize,
89    /// Streaming threshold (total bytes)
90    pub streaming_threshold_bytes: usize,
91    /// Memory limit for operations
92    pub memory_limit_bytes: usize,
93}
94
95impl Default for OptimizerConfig {
96    fn default() -> Self {
97        Self {
98            max_batch_size: 1000,
99            min_batch_size: 10,
100            parallel_threshold: 100,
101            streaming_threshold_bytes: 100 * 1024 * 1024, // 100MB
102            memory_limit_bytes: 1024 * 1024 * 1024,       // 1GB
103        }
104    }
105}
106
107impl QueryOptimizer {
108    /// Create a new query optimizer with default configuration
109    pub fn new() -> Self {
110        Self::with_config(OptimizerConfig::default())
111    }
112
113    /// Create a new query optimizer with custom configuration
114    pub fn with_config(config: OptimizerConfig) -> Self {
115        Self {
116            stats: QueryStats::default(),
117            config,
118        }
119    }
120
121    /// Update statistics with query feedback
122    pub fn update_stats(&mut self, avg_block_size: usize, cache_hit_rate: f64) {
123        self.stats.avg_block_size = avg_block_size;
124        self.stats.cache_hit_rate = cache_hit_rate;
125        self.stats.query_count += 1;
126    }
127
128    /// Optimize a batch get operation
129    pub fn optimize_batch_get(&self, cids: &[Cid]) -> QueryPlan {
130        let count = cids.len();
131
132        if count == 0 {
133            return QueryPlan {
134                estimated_duration_us: 0,
135                batch_size: 0,
136                use_parallel: false,
137                estimated_memory_bytes: 0,
138                strategy: QueryStrategy::Sequential,
139                hints: vec!["Empty query".to_string()],
140            };
141        }
142
143        // Estimate memory usage
144        let estimated_memory_bytes = count * self.stats.avg_block_size;
145
146        // Determine strategy based on size and cache hit rate
147        let strategy = if estimated_memory_bytes > self.config.streaming_threshold_bytes {
148            QueryStrategy::Streaming
149        } else if self.stats.cache_hit_rate > 0.8 {
150            QueryStrategy::CacheFirst
151        } else if count >= self.config.parallel_threshold {
152            QueryStrategy::ParallelBatch
153        } else {
154            QueryStrategy::Sequential
155        };
156
157        // Calculate optimal batch size
158        let batch_size = self.calculate_batch_size(count, estimated_memory_bytes);
159
160        // Estimate duration (simplified model)
161        let base_latency_us = 500; // Base per-item latency
162        let cache_speedup = 1.0 - (self.stats.cache_hit_rate * 0.7);
163        let parallel_speedup = if strategy == QueryStrategy::ParallelBatch {
164            0.3
165        } else {
166            1.0
167        };
168        let estimated_duration_us =
169            ((count as f64) * base_latency_us as f64 * cache_speedup * parallel_speedup) as u64;
170
171        let mut hints = Vec::new();
172        if estimated_memory_bytes > self.config.memory_limit_bytes / 2 {
173            hints.push("High memory usage - consider streaming".to_string());
174        }
175        if count > self.config.max_batch_size {
176            hints.push(format!(
177                "Large query - split into {} batches",
178                count.div_ceil(self.config.max_batch_size)
179            ));
180        }
181        if self.stats.cache_hit_rate < 0.3 {
182            hints.push("Low cache hit rate - consider cache warming".to_string());
183        }
184
185        QueryPlan {
186            estimated_duration_us,
187            batch_size,
188            use_parallel: strategy == QueryStrategy::ParallelBatch,
189            estimated_memory_bytes,
190            strategy,
191            hints,
192        }
193    }
194
195    /// Optimize a batch put operation
196    pub fn optimize_batch_put(&self, block_count: usize, total_bytes: usize) -> QueryPlan {
197        if block_count == 0 {
198            return QueryPlan {
199                estimated_duration_us: 0,
200                batch_size: 0,
201                use_parallel: false,
202                estimated_memory_bytes: 0,
203                strategy: QueryStrategy::Sequential,
204                hints: vec!["Empty operation".to_string()],
205            };
206        }
207
208        // Determine strategy
209        let strategy = if total_bytes > self.config.streaming_threshold_bytes {
210            QueryStrategy::Streaming
211        } else if block_count >= self.config.parallel_threshold {
212            QueryStrategy::ParallelBatch
213        } else {
214            QueryStrategy::Sequential
215        };
216
217        // Calculate optimal batch size
218        let batch_size = self.calculate_batch_size(block_count, total_bytes);
219
220        // Estimate duration (write is typically slower than read)
221        let base_latency_us = 1000; // Base per-item latency for writes
222        let parallel_speedup = if strategy == QueryStrategy::ParallelBatch {
223            0.4
224        } else {
225            1.0
226        };
227        let estimated_duration_us =
228            ((block_count as f64) * base_latency_us as f64 * parallel_speedup) as u64;
229
230        let mut hints = Vec::new();
231        if total_bytes > self.config.memory_limit_bytes {
232            hints.push("Very large write - use streaming".to_string());
233        }
234        if block_count > self.config.max_batch_size * 2 {
235            hints.push("Consider write coalescing".to_string());
236        }
237
238        QueryPlan {
239            estimated_duration_us,
240            batch_size,
241            use_parallel: strategy == QueryStrategy::ParallelBatch,
242            estimated_memory_bytes: total_bytes,
243            strategy,
244            hints,
245        }
246    }
247
248    /// Calculate optimal batch size
249    fn calculate_batch_size(&self, item_count: usize, estimated_bytes: usize) -> usize {
250        // Start with max batch size
251        let mut batch_size = self.config.max_batch_size;
252
253        // Adjust based on memory constraints
254        if estimated_bytes > 0 {
255            let bytes_per_item = estimated_bytes / item_count;
256            let memory_based_limit = self.config.memory_limit_bytes / bytes_per_item;
257            batch_size = batch_size.min(memory_based_limit);
258        }
259
260        // Ensure minimum
261        batch_size = batch_size.max(self.config.min_batch_size);
262
263        // Don't exceed item count
264        batch_size.min(item_count)
265    }
266
267    /// Analyze query patterns and provide recommendations
268    pub fn analyze_patterns(&self, query_log: &[QueryLogEntry]) -> Vec<Recommendation> {
269        let mut recommendations = Vec::new();
270
271        if query_log.is_empty() {
272            return recommendations;
273        }
274
275        // Analyze access patterns
276        let mut cid_access_count: HashMap<String, usize> = HashMap::new();
277        let mut total_items = 0;
278        let mut large_queries = 0;
279
280        for entry in query_log {
281            for cid in &entry.cids {
282                *cid_access_count.entry(cid.to_string()).or_insert(0) += 1;
283            }
284            total_items += entry.cids.len();
285            if entry.cids.len() > self.config.parallel_threshold {
286                large_queries += 1;
287            }
288        }
289
290        // Hot data detection
291        let hot_threshold = query_log.len() / 4; // Top 25%
292        let hot_cids: Vec<_> = cid_access_count
293            .iter()
294            .filter(|(_, &count)| count >= hot_threshold)
295            .collect();
296
297        if !hot_cids.is_empty() {
298            recommendations.push(Recommendation {
299                priority: RecommendationPriority::High,
300                category: RecommendationCategory::Caching,
301                description: format!(
302                    "Detected {} hot CIDs (accessed {}+ times). Consider pinning or caching.",
303                    hot_cids.len(),
304                    hot_threshold
305                ),
306                impact: "Improved cache hit rate by 20-40%".to_string(),
307            });
308        }
309
310        // Large query detection
311        if large_queries > query_log.len() / 2 {
312            recommendations.push(Recommendation {
313                priority: RecommendationPriority::Medium,
314                category: RecommendationCategory::Performance,
315                description: format!(
316                    "{}% of queries are large (>{} items). Enable parallel execution.",
317                    (large_queries * 100) / query_log.len(),
318                    self.config.parallel_threshold
319                ),
320                impact: "Reduced query latency by 30-50%".to_string(),
321            });
322        }
323
324        // Average query size
325        let avg_query_size = total_items / query_log.len();
326        if avg_query_size < self.config.min_batch_size {
327            recommendations.push(Recommendation {
328                priority: RecommendationPriority::Low,
329                category: RecommendationCategory::Efficiency,
330                description: format!(
331                    "Average query size is {avg_query_size} items. Consider batching small queries."
332                ),
333                impact: "Reduced overhead by 10-20%".to_string(),
334            });
335        }
336
337        recommendations
338    }
339}
340
341impl Default for QueryOptimizer {
342    fn default() -> Self {
343        Self::new()
344    }
345}
346
347/// Query log entry for pattern analysis
348#[derive(Debug, Clone)]
349pub struct QueryLogEntry {
350    /// CIDs accessed in this query
351    pub cids: Vec<Cid>,
352    /// Duration of the query
353    pub duration: Duration,
354    /// Whether the query hit the cache
355    pub cache_hit: bool,
356}
357
358/// Optimization recommendation
359#[derive(Debug, Clone, Serialize, Deserialize)]
360pub struct Recommendation {
361    /// Priority of this recommendation
362    pub priority: RecommendationPriority,
363    /// Category
364    pub category: RecommendationCategory,
365    /// Description
366    pub description: String,
367    /// Estimated impact
368    pub impact: String,
369}
370
371/// Recommendation priority
372#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
373pub enum RecommendationPriority {
374    /// Critical - address immediately
375    Critical,
376    /// High priority
377    High,
378    /// Medium priority
379    Medium,
380    /// Low priority
381    Low,
382}
383
384/// Recommendation category
385#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
386pub enum RecommendationCategory {
387    /// Performance optimization
388    Performance,
389    /// Caching strategy
390    Caching,
391    /// Resource efficiency
392    Efficiency,
393    /// Reliability
394    Reliability,
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400    use ipfrs_core::Block;
401
402    #[test]
403    fn test_query_optimizer_basic() {
404        let optimizer = QueryOptimizer::new();
405
406        let block = Block::new(vec![0u8; 1024].into()).unwrap();
407        let cids = vec![block.cid().clone(); 100];
408
409        let plan = optimizer.optimize_batch_get(&cids);
410        assert!(plan.batch_size > 0);
411        assert!(plan.estimated_duration_us > 0);
412    }
413
414    #[test]
415    fn test_optimize_empty_query() {
416        let optimizer = QueryOptimizer::new();
417        let plan = optimizer.optimize_batch_get(&[]);
418
419        assert_eq!(plan.batch_size, 0);
420        assert_eq!(plan.estimated_duration_us, 0);
421        assert_eq!(plan.strategy, QueryStrategy::Sequential);
422    }
423
424    #[test]
425    fn test_optimize_large_query() {
426        let optimizer = QueryOptimizer::new();
427        let block = Block::new(vec![0u8; 1024].into()).unwrap();
428        let cids = vec![block.cid().clone(); 1000];
429
430        let plan = optimizer.optimize_batch_get(&cids);
431        assert_eq!(plan.strategy, QueryStrategy::ParallelBatch);
432        assert!(plan.use_parallel);
433    }
434
435    #[test]
436    fn test_optimize_streaming_query() {
437        let mut config = OptimizerConfig::default();
438        config.streaming_threshold_bytes = 1024; // Very low threshold for testing
439
440        let mut optimizer = QueryOptimizer::with_config(config);
441        optimizer.update_stats(2048, 0.5); // Set avg block size to ensure streaming threshold is met
442
443        let block = Block::new(vec![0u8; 1024].into()).unwrap();
444        let cids = vec![block.cid().clone(); 100];
445
446        let plan = optimizer.optimize_batch_get(&cids);
447        assert_eq!(plan.strategy, QueryStrategy::Streaming);
448    }
449
450    #[test]
451    fn test_optimize_batch_put() {
452        let optimizer = QueryOptimizer::new();
453        let plan = optimizer.optimize_batch_put(100, 100 * 1024);
454
455        assert!(plan.batch_size > 0);
456        assert!(plan.estimated_duration_us > 0);
457    }
458
459    #[test]
460    fn test_pattern_analysis() {
461        let optimizer = QueryOptimizer::new();
462        let block = Block::new(vec![0u8; 1024].into()).unwrap();
463        let cid = block.cid().clone();
464
465        // Create log with repeated accesses
466        let log = vec![
467            QueryLogEntry {
468                cids: vec![cid.clone()],
469                duration: Duration::from_millis(10),
470                cache_hit: false,
471            };
472            10
473        ];
474
475        let recommendations = optimizer.analyze_patterns(&log);
476        assert!(!recommendations.is_empty());
477    }
478
479    #[test]
480    fn test_update_stats() {
481        let mut optimizer = QueryOptimizer::new();
482        optimizer.update_stats(1024, 0.9);
483
484        assert_eq!(optimizer.stats.avg_block_size, 1024);
485        assert_eq!(optimizer.stats.cache_hit_rate, 0.9);
486        assert_eq!(optimizer.stats.query_count, 1);
487    }
488
489    #[test]
490    fn test_cache_first_strategy() {
491        let mut optimizer = QueryOptimizer::new();
492        optimizer.update_stats(1024, 0.95); // High cache hit rate
493
494        let block = Block::new(vec![0u8; 1024].into()).unwrap();
495        let cids = vec![block.cid().clone(); 50]; // Below parallel threshold
496
497        let plan = optimizer.optimize_batch_get(&cids);
498        assert_eq!(plan.strategy, QueryStrategy::CacheFirst);
499    }
500}