1use ipfrs_core::Cid;
21use serde::{Deserialize, Serialize};
22use std::collections::HashMap;
23use std::time::Duration;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct QueryPlan {
28 pub estimated_duration_us: u64,
30 pub batch_size: usize,
32 pub use_parallel: bool,
34 pub estimated_memory_bytes: usize,
36 pub strategy: QueryStrategy,
38 pub hints: Vec<String>,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44pub enum QueryStrategy {
45 Sequential,
47 ParallelBatch,
49 Streaming,
51 CacheFirst,
53 Hybrid,
55}
56
57#[derive(Debug, Clone)]
59pub struct QueryOptimizer {
60 stats: QueryStats,
62 config: OptimizerConfig,
64}
65
66#[derive(Debug, Clone, Default)]
68struct QueryStats {
69 avg_block_size: usize,
71 cache_hit_rate: f64,
73 #[allow(dead_code)]
75 avg_batch_latency_us: u64,
76 query_count: u64,
78}
79
80#[derive(Debug, Clone)]
82pub struct OptimizerConfig {
83 pub max_batch_size: usize,
85 pub min_batch_size: usize,
87 pub parallel_threshold: usize,
89 pub streaming_threshold_bytes: usize,
91 pub memory_limit_bytes: usize,
93}
94
95impl Default for OptimizerConfig {
96 fn default() -> Self {
97 Self {
98 max_batch_size: 1000,
99 min_batch_size: 10,
100 parallel_threshold: 100,
101 streaming_threshold_bytes: 100 * 1024 * 1024, memory_limit_bytes: 1024 * 1024 * 1024, }
104 }
105}
106
107impl QueryOptimizer {
108 pub fn new() -> Self {
110 Self::with_config(OptimizerConfig::default())
111 }
112
113 pub fn with_config(config: OptimizerConfig) -> Self {
115 Self {
116 stats: QueryStats::default(),
117 config,
118 }
119 }
120
121 pub fn update_stats(&mut self, avg_block_size: usize, cache_hit_rate: f64) {
123 self.stats.avg_block_size = avg_block_size;
124 self.stats.cache_hit_rate = cache_hit_rate;
125 self.stats.query_count += 1;
126 }
127
128 pub fn optimize_batch_get(&self, cids: &[Cid]) -> QueryPlan {
130 let count = cids.len();
131
132 if count == 0 {
133 return QueryPlan {
134 estimated_duration_us: 0,
135 batch_size: 0,
136 use_parallel: false,
137 estimated_memory_bytes: 0,
138 strategy: QueryStrategy::Sequential,
139 hints: vec!["Empty query".to_string()],
140 };
141 }
142
143 let estimated_memory_bytes = count * self.stats.avg_block_size;
145
146 let strategy = if estimated_memory_bytes > self.config.streaming_threshold_bytes {
148 QueryStrategy::Streaming
149 } else if self.stats.cache_hit_rate > 0.8 {
150 QueryStrategy::CacheFirst
151 } else if count >= self.config.parallel_threshold {
152 QueryStrategy::ParallelBatch
153 } else {
154 QueryStrategy::Sequential
155 };
156
157 let batch_size = self.calculate_batch_size(count, estimated_memory_bytes);
159
160 let base_latency_us = 500; let cache_speedup = 1.0 - (self.stats.cache_hit_rate * 0.7);
163 let parallel_speedup = if strategy == QueryStrategy::ParallelBatch {
164 0.3
165 } else {
166 1.0
167 };
168 let estimated_duration_us =
169 ((count as f64) * base_latency_us as f64 * cache_speedup * parallel_speedup) as u64;
170
171 let mut hints = Vec::new();
172 if estimated_memory_bytes > self.config.memory_limit_bytes / 2 {
173 hints.push("High memory usage - consider streaming".to_string());
174 }
175 if count > self.config.max_batch_size {
176 hints.push(format!(
177 "Large query - split into {} batches",
178 count.div_ceil(self.config.max_batch_size)
179 ));
180 }
181 if self.stats.cache_hit_rate < 0.3 {
182 hints.push("Low cache hit rate - consider cache warming".to_string());
183 }
184
185 QueryPlan {
186 estimated_duration_us,
187 batch_size,
188 use_parallel: strategy == QueryStrategy::ParallelBatch,
189 estimated_memory_bytes,
190 strategy,
191 hints,
192 }
193 }
194
195 pub fn optimize_batch_put(&self, block_count: usize, total_bytes: usize) -> QueryPlan {
197 if block_count == 0 {
198 return QueryPlan {
199 estimated_duration_us: 0,
200 batch_size: 0,
201 use_parallel: false,
202 estimated_memory_bytes: 0,
203 strategy: QueryStrategy::Sequential,
204 hints: vec!["Empty operation".to_string()],
205 };
206 }
207
208 let strategy = if total_bytes > self.config.streaming_threshold_bytes {
210 QueryStrategy::Streaming
211 } else if block_count >= self.config.parallel_threshold {
212 QueryStrategy::ParallelBatch
213 } else {
214 QueryStrategy::Sequential
215 };
216
217 let batch_size = self.calculate_batch_size(block_count, total_bytes);
219
220 let base_latency_us = 1000; let parallel_speedup = if strategy == QueryStrategy::ParallelBatch {
223 0.4
224 } else {
225 1.0
226 };
227 let estimated_duration_us =
228 ((block_count as f64) * base_latency_us as f64 * parallel_speedup) as u64;
229
230 let mut hints = Vec::new();
231 if total_bytes > self.config.memory_limit_bytes {
232 hints.push("Very large write - use streaming".to_string());
233 }
234 if block_count > self.config.max_batch_size * 2 {
235 hints.push("Consider write coalescing".to_string());
236 }
237
238 QueryPlan {
239 estimated_duration_us,
240 batch_size,
241 use_parallel: strategy == QueryStrategy::ParallelBatch,
242 estimated_memory_bytes: total_bytes,
243 strategy,
244 hints,
245 }
246 }
247
248 fn calculate_batch_size(&self, item_count: usize, estimated_bytes: usize) -> usize {
250 let mut batch_size = self.config.max_batch_size;
252
253 if estimated_bytes > 0 {
255 let bytes_per_item = estimated_bytes / item_count;
256 let memory_based_limit = self.config.memory_limit_bytes / bytes_per_item;
257 batch_size = batch_size.min(memory_based_limit);
258 }
259
260 batch_size = batch_size.max(self.config.min_batch_size);
262
263 batch_size.min(item_count)
265 }
266
267 pub fn analyze_patterns(&self, query_log: &[QueryLogEntry]) -> Vec<Recommendation> {
269 let mut recommendations = Vec::new();
270
271 if query_log.is_empty() {
272 return recommendations;
273 }
274
275 let mut cid_access_count: HashMap<String, usize> = HashMap::new();
277 let mut total_items = 0;
278 let mut large_queries = 0;
279
280 for entry in query_log {
281 for cid in &entry.cids {
282 *cid_access_count.entry(cid.to_string()).or_insert(0) += 1;
283 }
284 total_items += entry.cids.len();
285 if entry.cids.len() > self.config.parallel_threshold {
286 large_queries += 1;
287 }
288 }
289
290 let hot_threshold = query_log.len() / 4; let hot_cids: Vec<_> = cid_access_count
293 .iter()
294 .filter(|(_, &count)| count >= hot_threshold)
295 .collect();
296
297 if !hot_cids.is_empty() {
298 recommendations.push(Recommendation {
299 priority: RecommendationPriority::High,
300 category: RecommendationCategory::Caching,
301 description: format!(
302 "Detected {} hot CIDs (accessed {}+ times). Consider pinning or caching.",
303 hot_cids.len(),
304 hot_threshold
305 ),
306 impact: "Improved cache hit rate by 20-40%".to_string(),
307 });
308 }
309
310 if large_queries > query_log.len() / 2 {
312 recommendations.push(Recommendation {
313 priority: RecommendationPriority::Medium,
314 category: RecommendationCategory::Performance,
315 description: format!(
316 "{}% of queries are large (>{} items). Enable parallel execution.",
317 (large_queries * 100) / query_log.len(),
318 self.config.parallel_threshold
319 ),
320 impact: "Reduced query latency by 30-50%".to_string(),
321 });
322 }
323
324 let avg_query_size = total_items / query_log.len();
326 if avg_query_size < self.config.min_batch_size {
327 recommendations.push(Recommendation {
328 priority: RecommendationPriority::Low,
329 category: RecommendationCategory::Efficiency,
330 description: format!(
331 "Average query size is {avg_query_size} items. Consider batching small queries."
332 ),
333 impact: "Reduced overhead by 10-20%".to_string(),
334 });
335 }
336
337 recommendations
338 }
339}
340
341impl Default for QueryOptimizer {
342 fn default() -> Self {
343 Self::new()
344 }
345}
346
347#[derive(Debug, Clone)]
349pub struct QueryLogEntry {
350 pub cids: Vec<Cid>,
352 pub duration: Duration,
354 pub cache_hit: bool,
356}
357
358#[derive(Debug, Clone, Serialize, Deserialize)]
360pub struct Recommendation {
361 pub priority: RecommendationPriority,
363 pub category: RecommendationCategory,
365 pub description: String,
367 pub impact: String,
369}
370
371#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
373pub enum RecommendationPriority {
374 Critical,
376 High,
378 Medium,
380 Low,
382}
383
384#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
386pub enum RecommendationCategory {
387 Performance,
389 Caching,
391 Efficiency,
393 Reliability,
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400 use ipfrs_core::Block;
401
402 #[test]
403 fn test_query_optimizer_basic() {
404 let optimizer = QueryOptimizer::new();
405
406 let block = Block::new(vec![0u8; 1024].into()).unwrap();
407 let cids = vec![block.cid().clone(); 100];
408
409 let plan = optimizer.optimize_batch_get(&cids);
410 assert!(plan.batch_size > 0);
411 assert!(plan.estimated_duration_us > 0);
412 }
413
414 #[test]
415 fn test_optimize_empty_query() {
416 let optimizer = QueryOptimizer::new();
417 let plan = optimizer.optimize_batch_get(&[]);
418
419 assert_eq!(plan.batch_size, 0);
420 assert_eq!(plan.estimated_duration_us, 0);
421 assert_eq!(plan.strategy, QueryStrategy::Sequential);
422 }
423
424 #[test]
425 fn test_optimize_large_query() {
426 let optimizer = QueryOptimizer::new();
427 let block = Block::new(vec![0u8; 1024].into()).unwrap();
428 let cids = vec![block.cid().clone(); 1000];
429
430 let plan = optimizer.optimize_batch_get(&cids);
431 assert_eq!(plan.strategy, QueryStrategy::ParallelBatch);
432 assert!(plan.use_parallel);
433 }
434
435 #[test]
436 fn test_optimize_streaming_query() {
437 let mut config = OptimizerConfig::default();
438 config.streaming_threshold_bytes = 1024; let mut optimizer = QueryOptimizer::with_config(config);
441 optimizer.update_stats(2048, 0.5); let block = Block::new(vec![0u8; 1024].into()).unwrap();
444 let cids = vec![block.cid().clone(); 100];
445
446 let plan = optimizer.optimize_batch_get(&cids);
447 assert_eq!(plan.strategy, QueryStrategy::Streaming);
448 }
449
450 #[test]
451 fn test_optimize_batch_put() {
452 let optimizer = QueryOptimizer::new();
453 let plan = optimizer.optimize_batch_put(100, 100 * 1024);
454
455 assert!(plan.batch_size > 0);
456 assert!(plan.estimated_duration_us > 0);
457 }
458
459 #[test]
460 fn test_pattern_analysis() {
461 let optimizer = QueryOptimizer::new();
462 let block = Block::new(vec![0u8; 1024].into()).unwrap();
463 let cid = block.cid().clone();
464
465 let log = vec![
467 QueryLogEntry {
468 cids: vec![cid.clone()],
469 duration: Duration::from_millis(10),
470 cache_hit: false,
471 };
472 10
473 ];
474
475 let recommendations = optimizer.analyze_patterns(&log);
476 assert!(!recommendations.is_empty());
477 }
478
479 #[test]
480 fn test_update_stats() {
481 let mut optimizer = QueryOptimizer::new();
482 optimizer.update_stats(1024, 0.9);
483
484 assert_eq!(optimizer.stats.avg_block_size, 1024);
485 assert_eq!(optimizer.stats.cache_hit_rate, 0.9);
486 assert_eq!(optimizer.stats.query_count, 1);
487 }
488
489 #[test]
490 fn test_cache_first_strategy() {
491 let mut optimizer = QueryOptimizer::new();
492 optimizer.update_stats(1024, 0.95); let block = Block::new(vec![0u8; 1024].into()).unwrap();
495 let cids = vec![block.cid().clone(); 50]; let plan = optimizer.optimize_batch_get(&cids);
498 assert_eq!(plan.strategy, QueryStrategy::CacheFirst);
499 }
500}