ipfrs_network/
query_optimizer.rs

1//! DHT Query Optimization Module
2//!
3//! Provides query optimization features including:
4//! - Early termination based on result quality
5//! - Query pipelining for sequential operations
6//! - Query performance tracking
7//! - Adaptive query strategies
8
9use libp2p::PeerId;
10use parking_lot::RwLock;
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15/// Query optimization configuration
16#[derive(Debug, Clone)]
17pub struct QueryOptimizerConfig {
18    /// Enable early termination
19    pub enable_early_termination: bool,
20    /// Minimum acceptable result quality (0.0-1.0)
21    pub min_result_quality: f64,
22    /// Enable query pipelining
23    pub enable_pipelining: bool,
24    /// Maximum pipelined queries
25    pub max_pipelined_queries: usize,
26    /// Query timeout
27    pub query_timeout: Duration,
28}
29
30impl Default for QueryOptimizerConfig {
31    fn default() -> Self {
32        Self {
33            enable_early_termination: true,
34            min_result_quality: 0.7,
35            enable_pipelining: true,
36            max_pipelined_queries: 5,
37            query_timeout: Duration::from_secs(10),
38        }
39    }
40}
41
42/// Query result with quality score
43#[derive(Debug, Clone)]
44pub struct QueryResult {
45    /// Result peers
46    pub peers: Vec<PeerId>,
47    /// Quality score (0.0-1.0)
48    pub quality: f64,
49    /// Query duration
50    pub duration: Duration,
51    /// Number of peers queried
52    pub peers_queried: usize,
53}
54
55/// Query performance metrics
56#[derive(Debug, Clone, Default)]
57pub struct QueryMetrics {
58    /// Total queries
59    pub total_queries: u64,
60    /// Queries with early termination
61    pub early_terminated: u64,
62    /// Average query duration
63    pub avg_duration: Duration,
64    /// Average result quality
65    pub avg_quality: f64,
66    /// Queries that timed out
67    pub timeouts: u64,
68}
69
70/// Query optimizer for DHT operations
71pub struct QueryOptimizer {
72    config: QueryOptimizerConfig,
73    /// Query performance history (query_id -> metrics)
74    query_history: Arc<RwLock<HashMap<String, QueryPerformance>>>,
75    /// Global metrics
76    metrics: Arc<RwLock<QueryMetrics>>,
77}
78
79/// Performance data for a single query
80#[derive(Debug, Clone)]
81struct QueryPerformance {
82    started_at: Instant,
83    duration: Option<Duration>,
84    quality: Option<f64>,
85    peers_queried: usize,
86    early_terminated: bool,
87}
88
89impl QueryOptimizer {
90    /// Create a new query optimizer
91    pub fn new(config: QueryOptimizerConfig) -> Self {
92        Self {
93            config,
94            query_history: Arc::new(RwLock::new(HashMap::new())),
95            metrics: Arc::new(RwLock::new(QueryMetrics::default())),
96        }
97    }
98
99    /// Start tracking a new query
100    pub fn start_query(&self, query_id: String) {
101        let mut history = self.query_history.write();
102        history.insert(
103            query_id,
104            QueryPerformance {
105                started_at: Instant::now(),
106                duration: None,
107                quality: None,
108                peers_queried: 0,
109                early_terminated: false,
110            },
111        );
112    }
113
114    /// Check if query should terminate early based on result quality
115    pub fn should_terminate_early(&self, query_id: &str, current_results: &[PeerId]) -> bool {
116        if !self.config.enable_early_termination {
117            return false;
118        }
119
120        // Calculate result quality based on number of results and response time
121        let quality = self.calculate_result_quality(query_id, current_results);
122
123        quality >= self.config.min_result_quality
124    }
125
126    /// Calculate result quality score
127    fn calculate_result_quality(&self, query_id: &str, results: &[PeerId]) -> f64 {
128        let history = self.query_history.read();
129
130        if let Some(perf) = history.get(query_id) {
131            let elapsed = perf.started_at.elapsed();
132            let timeout = self.config.query_timeout;
133
134            // Quality factors:
135            // 1. Number of results (more is better, up to a point)
136            let result_score = (results.len() as f64 / 20.0).min(1.0);
137
138            // 2. Response time (faster is better)
139            let time_score = 1.0 - (elapsed.as_secs_f64() / timeout.as_secs_f64()).min(1.0);
140
141            // Weighted average
142            (result_score * 0.7) + (time_score * 0.3)
143        } else {
144            0.0
145        }
146    }
147
148    /// Complete a query and record metrics
149    pub fn complete_query(&self, query_id: &str, result: QueryResult) {
150        let mut history = self.query_history.write();
151        let mut metrics = self.metrics.write();
152
153        if let Some(perf) = history.get_mut(query_id) {
154            perf.duration = Some(result.duration);
155            perf.quality = Some(result.quality);
156            perf.peers_queried = result.peers_queried;
157
158            // Update global metrics
159            metrics.total_queries += 1;
160            if perf.early_terminated {
161                metrics.early_terminated += 1;
162            }
163
164            // Update average duration (running average)
165            if metrics.total_queries == 1 {
166                metrics.avg_duration = result.duration;
167                metrics.avg_quality = result.quality;
168            } else {
169                let count = metrics.total_queries as f64;
170                let old_avg = metrics.avg_duration.as_secs_f64();
171                let new_avg = (old_avg * (count - 1.0) + result.duration.as_secs_f64()) / count;
172                metrics.avg_duration = Duration::from_secs_f64(new_avg);
173
174                metrics.avg_quality =
175                    (metrics.avg_quality * (count - 1.0) + result.quality) / count;
176            }
177        }
178
179        // Clean up old history (keep last 1000 queries)
180        if history.len() > 1000 {
181            let oldest_keys: Vec<String> = history
182                .iter()
183                .filter_map(|(k, v)| {
184                    if v.started_at.elapsed() > Duration::from_secs(3600) {
185                        Some(k.clone())
186                    } else {
187                        None
188                    }
189                })
190                .collect();
191
192            for key in oldest_keys {
193                history.remove(&key);
194            }
195        }
196    }
197
198    /// Mark a query as early terminated
199    pub fn mark_early_terminated(&self, query_id: &str) {
200        let mut history = self.query_history.write();
201        if let Some(perf) = history.get_mut(query_id) {
202            perf.early_terminated = true;
203        }
204    }
205
206    /// Record a query timeout
207    pub fn record_timeout(&self, query_id: &str) {
208        let mut metrics = self.metrics.write();
209        metrics.timeouts += 1;
210
211        // Also remove from history
212        let mut history = self.query_history.write();
213        history.remove(query_id);
214    }
215
216    /// Get query metrics
217    pub fn get_metrics(&self) -> QueryMetrics {
218        self.metrics.read().clone()
219    }
220
221    /// Get early termination rate
222    pub fn early_termination_rate(&self) -> f64 {
223        let metrics = self.metrics.read();
224        if metrics.total_queries == 0 {
225            0.0
226        } else {
227            metrics.early_terminated as f64 / metrics.total_queries as f64
228        }
229    }
230
231    /// Check if we can pipeline a new query
232    pub fn can_pipeline_query(&self) -> bool {
233        if !self.config.enable_pipelining {
234            return false;
235        }
236
237        let history = self.query_history.read();
238        let active_queries = history.values().filter(|p| p.duration.is_none()).count();
239
240        active_queries < self.config.max_pipelined_queries
241    }
242
243    /// Get active query count
244    pub fn active_query_count(&self) -> usize {
245        let history = self.query_history.read();
246        history.values().filter(|p| p.duration.is_none()).count()
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253
254    #[test]
255    fn test_query_optimizer_creation() {
256        let config = QueryOptimizerConfig::default();
257        let optimizer = QueryOptimizer::new(config);
258
259        assert_eq!(optimizer.active_query_count(), 0);
260        assert_eq!(optimizer.get_metrics().total_queries, 0);
261    }
262
263    #[test]
264    fn test_start_query() {
265        let optimizer = QueryOptimizer::new(QueryOptimizerConfig::default());
266        optimizer.start_query("test_query".to_string());
267
268        assert_eq!(optimizer.active_query_count(), 1);
269    }
270
271    #[test]
272    fn test_complete_query() {
273        let optimizer = QueryOptimizer::new(QueryOptimizerConfig::default());
274        optimizer.start_query("test_query".to_string());
275
276        let result = QueryResult {
277            peers: vec![],
278            quality: 0.8,
279            duration: Duration::from_millis(100),
280            peers_queried: 5,
281        };
282
283        optimizer.complete_query("test_query", result);
284
285        let metrics = optimizer.get_metrics();
286        assert_eq!(metrics.total_queries, 1);
287        assert_eq!(metrics.avg_quality, 0.8);
288    }
289
290    #[test]
291    fn test_early_termination() {
292        let config = QueryOptimizerConfig {
293            min_result_quality: 0.5,
294            ..Default::default()
295        };
296
297        let optimizer = QueryOptimizer::new(config);
298        optimizer.start_query("test_query".to_string());
299
300        // Simulate getting many results quickly (should have high quality)
301        let peers: Vec<PeerId> = (0..20).map(|_| PeerId::random()).collect();
302
303        let should_terminate = optimizer.should_terminate_early("test_query", &peers);
304        assert!(should_terminate);
305    }
306
307    #[test]
308    fn test_pipelining() {
309        let config = QueryOptimizerConfig {
310            max_pipelined_queries: 3,
311            ..Default::default()
312        };
313
314        let optimizer = QueryOptimizer::new(config);
315
316        // Start multiple queries
317        optimizer.start_query("query1".to_string());
318        optimizer.start_query("query2".to_string());
319        optimizer.start_query("query3".to_string());
320
321        assert_eq!(optimizer.active_query_count(), 3);
322        assert!(!optimizer.can_pipeline_query()); // At limit
323
324        // Complete one query
325        optimizer.complete_query(
326            "query1",
327            QueryResult {
328                peers: vec![],
329                quality: 0.8,
330                duration: Duration::from_millis(100),
331                peers_queried: 5,
332            },
333        );
334
335        assert!(optimizer.can_pipeline_query()); // Now we can pipeline
336    }
337
338    #[test]
339    fn test_metrics_tracking() {
340        let optimizer = QueryOptimizer::new(QueryOptimizerConfig::default());
341
342        optimizer.start_query("query1".to_string());
343        optimizer.mark_early_terminated("query1");
344        optimizer.complete_query(
345            "query1",
346            QueryResult {
347                peers: vec![],
348                quality: 0.9,
349                duration: Duration::from_millis(50),
350                peers_queried: 10,
351            },
352        );
353
354        let metrics = optimizer.get_metrics();
355        assert_eq!(metrics.total_queries, 1);
356        assert_eq!(metrics.early_terminated, 1);
357        assert_eq!(optimizer.early_termination_rate(), 1.0);
358    }
359
360    #[test]
361    fn test_timeout_recording() {
362        let optimizer = QueryOptimizer::new(QueryOptimizerConfig::default());
363
364        optimizer.start_query("slow_query".to_string());
365        optimizer.record_timeout("slow_query");
366
367        let metrics = optimizer.get_metrics();
368        assert_eq!(metrics.timeouts, 1);
369    }
370
371    #[test]
372    fn test_query_quality_calculation() {
373        let optimizer = QueryOptimizer::new(QueryOptimizerConfig::default());
374        optimizer.start_query("test_query".to_string());
375
376        // With many results, quality should be high
377        let many_peers: Vec<PeerId> = (0..20).map(|_| PeerId::random()).collect();
378        let quality_high = optimizer.calculate_result_quality("test_query", &many_peers);
379
380        // With few results, quality should be lower
381        let few_peers: Vec<PeerId> = (0..2).map(|_| PeerId::random()).collect();
382        let quality_low = optimizer.calculate_result_quality("test_query", &few_peers);
383
384        assert!(quality_high > quality_low);
385    }
386
387    #[test]
388    fn test_pipelining_disabled() {
389        let config = QueryOptimizerConfig {
390            enable_pipelining: false,
391            ..Default::default()
392        };
393
394        let optimizer = QueryOptimizer::new(config);
395        assert!(!optimizer.can_pipeline_query());
396    }
397
398    #[test]
399    fn test_early_termination_disabled() {
400        let config = QueryOptimizerConfig {
401            enable_early_termination: false,
402            ..Default::default()
403        };
404
405        let optimizer = QueryOptimizer::new(config);
406        optimizer.start_query("test_query".to_string());
407
408        let peers: Vec<PeerId> = (0..20).map(|_| PeerId::random()).collect();
409        assert!(!optimizer.should_terminate_early("test_query", &peers));
410    }
411}