ipfrs_network/
query_batcher.rs

1//! DHT query batching and frequency optimization
2//!
3//! This module provides batching and rate limiting for DHT queries to:
4//! - Reduce network traffic by batching similar queries
5//! - Control query frequency to prevent network flooding
6//! - Merge duplicate queries
7//! - Implement adaptive query delays based on network conditions
8
9use parking_lot::RwLock;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use thiserror::Error;
14use tokio::sync::mpsc;
15
16/// Errors that can occur during query batching
17#[derive(Error, Debug, Clone)]
18pub enum QueryBatcherError {
19    #[error("Batch queue is full")]
20    QueueFull,
21
22    #[error("Query rate limit exceeded")]
23    RateLimitExceeded,
24
25    #[error("Invalid configuration: {0}")]
26    InvalidConfig(String),
27}
28
29/// Configuration for query batching
30#[derive(Debug, Clone)]
31pub struct QueryBatcherConfig {
32    /// Maximum batch size (queries per batch)
33    pub max_batch_size: usize,
34
35    /// Batch window duration (wait time before sending batch)
36    pub batch_window: Duration,
37
38    /// Maximum queries per second (rate limit)
39    pub max_queries_per_second: u64,
40
41    /// Enable query deduplication
42    pub enable_deduplication: bool,
43
44    /// Deduplication window (merge queries within this window)
45    pub dedup_window: Duration,
46
47    /// Maximum pending queries in queue
48    pub max_pending_queries: usize,
49
50    /// Enable adaptive rate limiting
51    pub enable_adaptive_rate: bool,
52
53    /// Target success rate for adaptive limiting (0.0-1.0)
54    pub target_success_rate: f64,
55}
56
57impl Default for QueryBatcherConfig {
58    fn default() -> Self {
59        Self {
60            max_batch_size: 10,
61            batch_window: Duration::from_millis(100),
62            max_queries_per_second: 100,
63            enable_deduplication: true,
64            dedup_window: Duration::from_secs(5),
65            max_pending_queries: 1000,
66            enable_adaptive_rate: true,
67            target_success_rate: 0.8,
68        }
69    }
70}
71
72impl QueryBatcherConfig {
73    /// Configuration for low-power mode (minimal queries)
74    pub fn low_power() -> Self {
75        Self {
76            max_batch_size: 20,
77            batch_window: Duration::from_millis(500),
78            max_queries_per_second: 10,
79            enable_deduplication: true,
80            dedup_window: Duration::from_secs(10),
81            max_pending_queries: 100,
82            enable_adaptive_rate: true,
83            target_success_rate: 0.7,
84        }
85    }
86
87    /// Configuration for mobile devices
88    pub fn mobile() -> Self {
89        Self {
90            max_batch_size: 15,
91            batch_window: Duration::from_millis(200),
92            max_queries_per_second: 50,
93            enable_deduplication: true,
94            dedup_window: Duration::from_secs(5),
95            max_pending_queries: 500,
96            enable_adaptive_rate: true,
97            target_success_rate: 0.75,
98        }
99    }
100
101    /// Configuration for high-performance mode
102    pub fn high_performance() -> Self {
103        Self {
104            max_batch_size: 5,
105            batch_window: Duration::from_millis(50),
106            max_queries_per_second: 500,
107            enable_deduplication: false,
108            dedup_window: Duration::from_secs(1),
109            max_pending_queries: 5000,
110            enable_adaptive_rate: false,
111            target_success_rate: 0.9,
112        }
113    }
114
115    /// Validate the configuration
116    pub fn validate(&self) -> Result<(), QueryBatcherError> {
117        if self.max_batch_size == 0 {
118            return Err(QueryBatcherError::InvalidConfig(
119                "max_batch_size must be > 0".to_string(),
120            ));
121        }
122
123        if self.max_queries_per_second == 0 {
124            return Err(QueryBatcherError::InvalidConfig(
125                "max_queries_per_second must be > 0".to_string(),
126            ));
127        }
128
129        if self.target_success_rate < 0.0 || self.target_success_rate > 1.0 {
130            return Err(QueryBatcherError::InvalidConfig(
131                "target_success_rate must be in [0.0, 1.0]".to_string(),
132            ));
133        }
134
135        Ok(())
136    }
137}
138
139/// Type of DHT query
140#[derive(Debug, Clone, PartialEq, Eq, Hash)]
141pub enum QueryType {
142    /// Find providers for a CID
143    FindProviders(String),
144    /// Find a specific peer
145    FindPeer(String),
146    /// Get value for a key
147    GetValue(String),
148    /// Put value for a key
149    PutValue(String),
150}
151
152impl QueryType {
153    /// Get the query key for deduplication
154    pub fn key(&self) -> String {
155        match self {
156            QueryType::FindProviders(cid) => format!("providers:{}", cid),
157            QueryType::FindPeer(peer) => format!("peer:{}", peer),
158            QueryType::GetValue(key) => format!("get:{}", key),
159            QueryType::PutValue(key) => format!("put:{}", key),
160        }
161    }
162}
163
164/// A pending query in the batch queue
165#[derive(Debug, Clone)]
166pub struct PendingQuery {
167    /// Query type
168    pub query_type: QueryType,
169    /// Timestamp when query was added
170    pub added_at: Instant,
171    /// Response channel (if needed)
172    pub response_tx: Option<mpsc::UnboundedSender<QueryBatchResult>>,
173}
174
175/// Result of a batched query
176#[derive(Debug, Clone)]
177pub struct QueryBatchResult {
178    /// Whether the query succeeded
179    pub success: bool,
180    /// Number of results found
181    pub result_count: usize,
182    /// Time taken
183    pub duration: Duration,
184}
185
186/// Query batching state
187#[derive(Debug)]
188struct BatcherState {
189    /// Current batch being assembled
190    current_batch: Vec<PendingQuery>,
191    /// Last batch send time
192    last_batch_sent: Instant,
193    /// Query count in current second
194    queries_this_second: u64,
195    /// Current second start
196    second_start: Instant,
197    /// Query history for deduplication
198    recent_queries: HashMap<String, Instant>,
199    /// Adaptive rate limit multiplier (1.0 = normal)
200    rate_multiplier: f64,
201    /// Recent success rate (for adaptive limiting)
202    recent_success_rate: f64,
203}
204
205impl BatcherState {
206    fn new() -> Self {
207        let now = Instant::now();
208        Self {
209            current_batch: Vec::new(),
210            last_batch_sent: now,
211            queries_this_second: 0,
212            second_start: now,
213            recent_queries: HashMap::new(),
214            rate_multiplier: 1.0,
215            recent_success_rate: 1.0,
216        }
217    }
218}
219
220/// DHT query batcher
221pub struct QueryBatcher {
222    config: QueryBatcherConfig,
223    state: Arc<RwLock<BatcherState>>,
224    stats: Arc<RwLock<QueryBatcherStats>>,
225}
226
227impl QueryBatcher {
228    /// Create a new query batcher
229    pub fn new(config: QueryBatcherConfig) -> Result<Self, QueryBatcherError> {
230        config.validate()?;
231
232        Ok(Self {
233            config,
234            state: Arc::new(RwLock::new(BatcherState::new())),
235            stats: Arc::new(RwLock::new(QueryBatcherStats::default())),
236        })
237    }
238
239    /// Add a query to the batch
240    pub fn add_query(&self, query: QueryType) -> Result<(), QueryBatcherError> {
241        let mut state = self.state.write();
242        let mut stats = self.stats.write();
243
244        // Check queue size
245        if state.current_batch.len() >= self.config.max_pending_queries {
246            stats.queries_dropped += 1;
247            return Err(QueryBatcherError::QueueFull);
248        }
249
250        // Check rate limit
251        let now = Instant::now();
252        if now.duration_since(state.second_start) >= Duration::from_secs(1) {
253            state.queries_this_second = 0;
254            state.second_start = now;
255        }
256
257        let effective_rate_limit =
258            (self.config.max_queries_per_second as f64 * state.rate_multiplier) as u64;
259
260        if state.queries_this_second >= effective_rate_limit {
261            stats.queries_rate_limited += 1;
262            return Err(QueryBatcherError::RateLimitExceeded);
263        }
264
265        // Deduplication check
266        if self.config.enable_deduplication {
267            let key = query.key();
268            if let Some(&last_query) = state.recent_queries.get(&key) {
269                if now.duration_since(last_query) < self.config.dedup_window {
270                    stats.queries_deduplicated += 1;
271                    return Ok(()); // Skip duplicate
272                }
273            }
274            state.recent_queries.insert(key, now);
275        }
276
277        // Add to batch
278        let pending = PendingQuery {
279            query_type: query,
280            added_at: now,
281            response_tx: None,
282        };
283
284        state.current_batch.push(pending);
285        state.queries_this_second += 1;
286        stats.queries_batched += 1;
287
288        Ok(())
289    }
290
291    /// Check if batch is ready to send
292    pub fn should_send_batch(&self) -> bool {
293        let state = self.state.read();
294
295        if state.current_batch.is_empty() {
296            return false;
297        }
298
299        // Send if batch is full
300        if state.current_batch.len() >= self.config.max_batch_size {
301            return true;
302        }
303
304        // Send if batch window expired
305        let now = Instant::now();
306        if now.duration_since(state.last_batch_sent) >= self.config.batch_window {
307            return true;
308        }
309
310        false
311    }
312
313    /// Get the current batch and clear it
314    pub fn take_batch(&self) -> Vec<PendingQuery> {
315        let mut state = self.state.write();
316        let mut stats = self.stats.write();
317
318        let batch = std::mem::take(&mut state.current_batch);
319        state.last_batch_sent = Instant::now();
320
321        if !batch.is_empty() {
322            stats.batches_sent += 1;
323            stats.total_queries_sent += batch.len() as u64;
324        }
325
326        batch
327    }
328
329    /// Record query result for adaptive rate limiting
330    pub fn record_result(&self, result: QueryBatchResult) {
331        let mut state = self.state.write();
332        let mut stats = self.stats.write();
333
334        if result.success {
335            stats.successful_queries += 1;
336        } else {
337            stats.failed_queries += 1;
338        }
339
340        // Update adaptive rate limiter
341        if self.config.enable_adaptive_rate {
342            let total = stats.successful_queries + stats.failed_queries;
343            if total > 0 {
344                state.recent_success_rate = stats.successful_queries as f64 / total as f64;
345
346                // Adjust rate multiplier based on success rate
347                if state.recent_success_rate < self.config.target_success_rate {
348                    // Too many failures, slow down
349                    state.rate_multiplier = (state.rate_multiplier * 0.9).max(0.1);
350                    stats.rate_adjustments += 1;
351                } else if state.recent_success_rate > self.config.target_success_rate + 0.1 {
352                    // High success rate, can speed up
353                    state.rate_multiplier = (state.rate_multiplier * 1.1).min(2.0);
354                    stats.rate_adjustments += 1;
355                }
356            }
357        }
358    }
359
360    /// Get current statistics
361    pub fn stats(&self) -> QueryBatcherStats {
362        self.stats.read().clone()
363    }
364
365    /// Get current rate multiplier (for adaptive rate limiting)
366    pub fn rate_multiplier(&self) -> f64 {
367        self.state.read().rate_multiplier
368    }
369
370    /// Get current success rate
371    pub fn success_rate(&self) -> f64 {
372        self.state.read().recent_success_rate
373    }
374
375    /// Clean up old deduplication entries
376    pub fn cleanup_dedup_cache(&self) {
377        let mut state = self.state.write();
378        let now = Instant::now();
379
380        state.recent_queries.retain(|_, &mut last_query| {
381            now.duration_since(last_query) < self.config.dedup_window * 2
382        });
383    }
384
385    /// Reset statistics
386    pub fn reset_stats(&self) {
387        *self.stats.write() = QueryBatcherStats::default();
388    }
389}
390
391/// Statistics for query batching
392#[derive(Debug, Clone, Default)]
393pub struct QueryBatcherStats {
394    /// Total queries added to batches
395    pub queries_batched: u64,
396    /// Queries dropped due to full queue
397    pub queries_dropped: u64,
398    /// Queries skipped due to rate limiting
399    pub queries_rate_limited: u64,
400    /// Queries deduplicated
401    pub queries_deduplicated: u64,
402    /// Number of batches sent
403    pub batches_sent: u64,
404    /// Total queries actually sent (after batching/dedup)
405    pub total_queries_sent: u64,
406    /// Successful queries
407    pub successful_queries: u64,
408    /// Failed queries
409    pub failed_queries: u64,
410    /// Number of rate adjustments made
411    pub rate_adjustments: u64,
412}
413
414impl QueryBatcherStats {
415    /// Calculate the deduplication ratio
416    pub fn dedup_ratio(&self) -> f64 {
417        if self.queries_batched == 0 {
418            return 0.0;
419        }
420        self.queries_deduplicated as f64 / self.queries_batched as f64
421    }
422
423    /// Calculate the batching efficiency (queries saved)
424    pub fn batching_efficiency(&self) -> f64 {
425        if self.queries_batched == 0 {
426            return 0.0;
427        }
428        let saved = self.queries_batched - self.total_queries_sent;
429        saved as f64 / self.queries_batched as f64
430    }
431
432    /// Calculate success rate
433    pub fn success_rate(&self) -> f64 {
434        let total = self.successful_queries + self.failed_queries;
435        if total == 0 {
436            return 0.0;
437        }
438        self.successful_queries as f64 / total as f64
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445
446    #[test]
447    fn test_config_default() {
448        let config = QueryBatcherConfig::default();
449        assert!(config.validate().is_ok());
450        assert_eq!(config.max_batch_size, 10);
451        assert!(config.enable_deduplication);
452    }
453
454    #[test]
455    fn test_config_low_power() {
456        let config = QueryBatcherConfig::low_power();
457        assert!(config.validate().is_ok());
458        assert_eq!(config.max_queries_per_second, 10);
459    }
460
461    #[test]
462    fn test_config_mobile() {
463        let config = QueryBatcherConfig::mobile();
464        assert!(config.validate().is_ok());
465        assert_eq!(config.max_queries_per_second, 50);
466    }
467
468    #[test]
469    fn test_config_high_performance() {
470        let config = QueryBatcherConfig::high_performance();
471        assert!(config.validate().is_ok());
472        assert!(!config.enable_deduplication);
473    }
474
475    #[test]
476    fn test_config_validation() {
477        let config = QueryBatcherConfig {
478            max_batch_size: 0,
479            ..Default::default()
480        };
481        assert!(config.validate().is_err());
482    }
483
484    #[test]
485    fn test_query_type_key() {
486        let q1 = QueryType::FindProviders("QmTest".to_string());
487        let q2 = QueryType::FindProviders("QmTest".to_string());
488        assert_eq!(q1.key(), q2.key());
489    }
490
491    #[test]
492    fn test_add_query() {
493        let config = QueryBatcherConfig::default();
494        let batcher = QueryBatcher::new(config).unwrap();
495
496        let query = QueryType::FindProviders("QmTest".to_string());
497        let result = batcher.add_query(query);
498        assert!(result.is_ok());
499
500        let stats = batcher.stats();
501        assert_eq!(stats.queries_batched, 1);
502    }
503
504    #[test]
505    fn test_deduplication() {
506        let config = QueryBatcherConfig::default();
507        let batcher = QueryBatcher::new(config).unwrap();
508
509        let query = QueryType::FindProviders("QmTest".to_string());
510
511        batcher.add_query(query.clone()).unwrap();
512        batcher.add_query(query).unwrap(); // Duplicate
513
514        let stats = batcher.stats();
515        assert_eq!(stats.queries_deduplicated, 1);
516        assert_eq!(stats.queries_batched, 1); // Only one unique query
517    }
518
519    #[test]
520    fn test_batch_ready_when_full() {
521        let config = QueryBatcherConfig {
522            max_batch_size: 3,
523            ..Default::default()
524        };
525        let batcher = QueryBatcher::new(config).unwrap();
526
527        for i in 0..3 {
528            let query = QueryType::FindProviders(format!("QmTest{}", i));
529            batcher.add_query(query).unwrap();
530        }
531
532        assert!(batcher.should_send_batch());
533    }
534
535    #[test]
536    fn test_take_batch() {
537        let config = QueryBatcherConfig::default();
538        let batcher = QueryBatcher::new(config).unwrap();
539
540        for i in 0..5 {
541            let query = QueryType::FindProviders(format!("QmTest{}", i));
542            batcher.add_query(query).unwrap();
543        }
544
545        let batch = batcher.take_batch();
546        assert_eq!(batch.len(), 5);
547
548        let batch2 = batcher.take_batch();
549        assert_eq!(batch2.len(), 0);
550    }
551
552    #[test]
553    fn test_rate_limit() {
554        let config = QueryBatcherConfig {
555            max_queries_per_second: 5,
556            ..Default::default()
557        };
558        let batcher = QueryBatcher::new(config).unwrap();
559
560        // Add 5 queries (should succeed)
561        for i in 0..5 {
562            let query = QueryType::FindProviders(format!("QmTest{}", i));
563            assert!(batcher.add_query(query).is_ok());
564        }
565
566        // 6th query should be rate limited
567        let query = QueryType::FindProviders("QmTest6".to_string());
568        let result = batcher.add_query(query);
569        assert!(matches!(result, Err(QueryBatcherError::RateLimitExceeded)));
570    }
571
572    #[test]
573    fn test_adaptive_rate_limiting() {
574        let config = QueryBatcherConfig::default();
575        let batcher = QueryBatcher::new(config).unwrap();
576
577        let initial_rate = batcher.rate_multiplier();
578
579        // Record failures
580        for _ in 0..10 {
581            batcher.record_result(QueryBatchResult {
582                success: false,
583                result_count: 0,
584                duration: Duration::from_millis(100),
585            });
586        }
587
588        let rate_after_failures = batcher.rate_multiplier();
589        assert!(rate_after_failures < initial_rate);
590    }
591
592    #[test]
593    fn test_stats_dedup_ratio() {
594        let stats = QueryBatcherStats {
595            queries_batched: 100,
596            queries_deduplicated: 20,
597            ..Default::default()
598        };
599
600        assert_eq!(stats.dedup_ratio(), 0.2);
601    }
602
603    #[test]
604    fn test_stats_batching_efficiency() {
605        let stats = QueryBatcherStats {
606            queries_batched: 100,
607            total_queries_sent: 60,
608            ..Default::default()
609        };
610
611        assert_eq!(stats.batching_efficiency(), 0.4);
612    }
613
614    #[test]
615    fn test_cleanup_dedup_cache() {
616        let config = QueryBatcherConfig::default();
617        let batcher = QueryBatcher::new(config).unwrap();
618
619        let query = QueryType::FindProviders("QmTest".to_string());
620        batcher.add_query(query).unwrap();
621
622        // Cache should have entry
623        {
624            let state = batcher.state.read();
625            assert_eq!(state.recent_queries.len(), 1);
626        }
627
628        batcher.cleanup_dedup_cache();
629
630        // Cache should still have entry (not old enough)
631        {
632            let state = batcher.state.read();
633            assert_eq!(state.recent_queries.len(), 1);
634        }
635    }
636}