1use libp2p::PeerId;
10use parking_lot::RwLock;
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15#[derive(Debug, Clone)]
17pub struct QueryOptimizerConfig {
18 pub enable_early_termination: bool,
20 pub min_result_quality: f64,
22 pub enable_pipelining: bool,
24 pub max_pipelined_queries: usize,
26 pub query_timeout: Duration,
28}
29
30impl Default for QueryOptimizerConfig {
31 fn default() -> Self {
32 Self {
33 enable_early_termination: true,
34 min_result_quality: 0.7,
35 enable_pipelining: true,
36 max_pipelined_queries: 5,
37 query_timeout: Duration::from_secs(10),
38 }
39 }
40}
41
42#[derive(Debug, Clone)]
44pub struct QueryResult {
45 pub peers: Vec<PeerId>,
47 pub quality: f64,
49 pub duration: Duration,
51 pub peers_queried: usize,
53}
54
55#[derive(Debug, Clone, Default)]
57pub struct QueryMetrics {
58 pub total_queries: u64,
60 pub early_terminated: u64,
62 pub avg_duration: Duration,
64 pub avg_quality: f64,
66 pub timeouts: u64,
68}
69
70pub struct QueryOptimizer {
72 config: QueryOptimizerConfig,
73 query_history: Arc<RwLock<HashMap<String, QueryPerformance>>>,
75 metrics: Arc<RwLock<QueryMetrics>>,
77}
78
79#[derive(Debug, Clone)]
81struct QueryPerformance {
82 started_at: Instant,
83 duration: Option<Duration>,
84 quality: Option<f64>,
85 peers_queried: usize,
86 early_terminated: bool,
87}
88
89impl QueryOptimizer {
90 pub fn new(config: QueryOptimizerConfig) -> Self {
92 Self {
93 config,
94 query_history: Arc::new(RwLock::new(HashMap::new())),
95 metrics: Arc::new(RwLock::new(QueryMetrics::default())),
96 }
97 }
98
99 pub fn start_query(&self, query_id: String) {
101 let mut history = self.query_history.write();
102 history.insert(
103 query_id,
104 QueryPerformance {
105 started_at: Instant::now(),
106 duration: None,
107 quality: None,
108 peers_queried: 0,
109 early_terminated: false,
110 },
111 );
112 }
113
114 pub fn should_terminate_early(&self, query_id: &str, current_results: &[PeerId]) -> bool {
116 if !self.config.enable_early_termination {
117 return false;
118 }
119
120 let quality = self.calculate_result_quality(query_id, current_results);
122
123 quality >= self.config.min_result_quality
124 }
125
126 fn calculate_result_quality(&self, query_id: &str, results: &[PeerId]) -> f64 {
128 let history = self.query_history.read();
129
130 if let Some(perf) = history.get(query_id) {
131 let elapsed = perf.started_at.elapsed();
132 let timeout = self.config.query_timeout;
133
134 let result_score = (results.len() as f64 / 20.0).min(1.0);
137
138 let time_score = 1.0 - (elapsed.as_secs_f64() / timeout.as_secs_f64()).min(1.0);
140
141 (result_score * 0.7) + (time_score * 0.3)
143 } else {
144 0.0
145 }
146 }
147
148 pub fn complete_query(&self, query_id: &str, result: QueryResult) {
150 let mut history = self.query_history.write();
151 let mut metrics = self.metrics.write();
152
153 if let Some(perf) = history.get_mut(query_id) {
154 perf.duration = Some(result.duration);
155 perf.quality = Some(result.quality);
156 perf.peers_queried = result.peers_queried;
157
158 metrics.total_queries += 1;
160 if perf.early_terminated {
161 metrics.early_terminated += 1;
162 }
163
164 if metrics.total_queries == 1 {
166 metrics.avg_duration = result.duration;
167 metrics.avg_quality = result.quality;
168 } else {
169 let count = metrics.total_queries as f64;
170 let old_avg = metrics.avg_duration.as_secs_f64();
171 let new_avg = (old_avg * (count - 1.0) + result.duration.as_secs_f64()) / count;
172 metrics.avg_duration = Duration::from_secs_f64(new_avg);
173
174 metrics.avg_quality =
175 (metrics.avg_quality * (count - 1.0) + result.quality) / count;
176 }
177 }
178
179 if history.len() > 1000 {
181 let oldest_keys: Vec<String> = history
182 .iter()
183 .filter_map(|(k, v)| {
184 if v.started_at.elapsed() > Duration::from_secs(3600) {
185 Some(k.clone())
186 } else {
187 None
188 }
189 })
190 .collect();
191
192 for key in oldest_keys {
193 history.remove(&key);
194 }
195 }
196 }
197
198 pub fn mark_early_terminated(&self, query_id: &str) {
200 let mut history = self.query_history.write();
201 if let Some(perf) = history.get_mut(query_id) {
202 perf.early_terminated = true;
203 }
204 }
205
206 pub fn record_timeout(&self, query_id: &str) {
208 let mut metrics = self.metrics.write();
209 metrics.timeouts += 1;
210
211 let mut history = self.query_history.write();
213 history.remove(query_id);
214 }
215
216 pub fn get_metrics(&self) -> QueryMetrics {
218 self.metrics.read().clone()
219 }
220
221 pub fn early_termination_rate(&self) -> f64 {
223 let metrics = self.metrics.read();
224 if metrics.total_queries == 0 {
225 0.0
226 } else {
227 metrics.early_terminated as f64 / metrics.total_queries as f64
228 }
229 }
230
231 pub fn can_pipeline_query(&self) -> bool {
233 if !self.config.enable_pipelining {
234 return false;
235 }
236
237 let history = self.query_history.read();
238 let active_queries = history.values().filter(|p| p.duration.is_none()).count();
239
240 active_queries < self.config.max_pipelined_queries
241 }
242
243 pub fn active_query_count(&self) -> usize {
245 let history = self.query_history.read();
246 history.values().filter(|p| p.duration.is_none()).count()
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253
254 #[test]
255 fn test_query_optimizer_creation() {
256 let config = QueryOptimizerConfig::default();
257 let optimizer = QueryOptimizer::new(config);
258
259 assert_eq!(optimizer.active_query_count(), 0);
260 assert_eq!(optimizer.get_metrics().total_queries, 0);
261 }
262
263 #[test]
264 fn test_start_query() {
265 let optimizer = QueryOptimizer::new(QueryOptimizerConfig::default());
266 optimizer.start_query("test_query".to_string());
267
268 assert_eq!(optimizer.active_query_count(), 1);
269 }
270
271 #[test]
272 fn test_complete_query() {
273 let optimizer = QueryOptimizer::new(QueryOptimizerConfig::default());
274 optimizer.start_query("test_query".to_string());
275
276 let result = QueryResult {
277 peers: vec![],
278 quality: 0.8,
279 duration: Duration::from_millis(100),
280 peers_queried: 5,
281 };
282
283 optimizer.complete_query("test_query", result);
284
285 let metrics = optimizer.get_metrics();
286 assert_eq!(metrics.total_queries, 1);
287 assert_eq!(metrics.avg_quality, 0.8);
288 }
289
290 #[test]
291 fn test_early_termination() {
292 let config = QueryOptimizerConfig {
293 min_result_quality: 0.5,
294 ..Default::default()
295 };
296
297 let optimizer = QueryOptimizer::new(config);
298 optimizer.start_query("test_query".to_string());
299
300 let peers: Vec<PeerId> = (0..20).map(|_| PeerId::random()).collect();
302
303 let should_terminate = optimizer.should_terminate_early("test_query", &peers);
304 assert!(should_terminate);
305 }
306
307 #[test]
308 fn test_pipelining() {
309 let config = QueryOptimizerConfig {
310 max_pipelined_queries: 3,
311 ..Default::default()
312 };
313
314 let optimizer = QueryOptimizer::new(config);
315
316 optimizer.start_query("query1".to_string());
318 optimizer.start_query("query2".to_string());
319 optimizer.start_query("query3".to_string());
320
321 assert_eq!(optimizer.active_query_count(), 3);
322 assert!(!optimizer.can_pipeline_query()); optimizer.complete_query(
326 "query1",
327 QueryResult {
328 peers: vec![],
329 quality: 0.8,
330 duration: Duration::from_millis(100),
331 peers_queried: 5,
332 },
333 );
334
335 assert!(optimizer.can_pipeline_query()); }
337
338 #[test]
339 fn test_metrics_tracking() {
340 let optimizer = QueryOptimizer::new(QueryOptimizerConfig::default());
341
342 optimizer.start_query("query1".to_string());
343 optimizer.mark_early_terminated("query1");
344 optimizer.complete_query(
345 "query1",
346 QueryResult {
347 peers: vec![],
348 quality: 0.9,
349 duration: Duration::from_millis(50),
350 peers_queried: 10,
351 },
352 );
353
354 let metrics = optimizer.get_metrics();
355 assert_eq!(metrics.total_queries, 1);
356 assert_eq!(metrics.early_terminated, 1);
357 assert_eq!(optimizer.early_termination_rate(), 1.0);
358 }
359
360 #[test]
361 fn test_timeout_recording() {
362 let optimizer = QueryOptimizer::new(QueryOptimizerConfig::default());
363
364 optimizer.start_query("slow_query".to_string());
365 optimizer.record_timeout("slow_query");
366
367 let metrics = optimizer.get_metrics();
368 assert_eq!(metrics.timeouts, 1);
369 }
370
371 #[test]
372 fn test_query_quality_calculation() {
373 let optimizer = QueryOptimizer::new(QueryOptimizerConfig::default());
374 optimizer.start_query("test_query".to_string());
375
376 let many_peers: Vec<PeerId> = (0..20).map(|_| PeerId::random()).collect();
378 let quality_high = optimizer.calculate_result_quality("test_query", &many_peers);
379
380 let few_peers: Vec<PeerId> = (0..2).map(|_| PeerId::random()).collect();
382 let quality_low = optimizer.calculate_result_quality("test_query", &few_peers);
383
384 assert!(quality_high > quality_low);
385 }
386
387 #[test]
388 fn test_pipelining_disabled() {
389 let config = QueryOptimizerConfig {
390 enable_pipelining: false,
391 ..Default::default()
392 };
393
394 let optimizer = QueryOptimizer::new(config);
395 assert!(!optimizer.can_pipeline_query());
396 }
397
398 #[test]
399 fn test_early_termination_disabled() {
400 let config = QueryOptimizerConfig {
401 enable_early_termination: false,
402 ..Default::default()
403 };
404
405 let optimizer = QueryOptimizer::new(config);
406 optimizer.start_query("test_query".to_string());
407
408 let peers: Vec<PeerId> = (0..20).map(|_| PeerId::random()).collect();
409 assert!(!optimizer.should_terminate_early("test_query", &peers));
410 }
411}