1use parking_lot::RwLock;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use thiserror::Error;
14use tokio::sync::mpsc;
15
16#[derive(Error, Debug, Clone)]
18pub enum QueryBatcherError {
19 #[error("Batch queue is full")]
20 QueueFull,
21
22 #[error("Query rate limit exceeded")]
23 RateLimitExceeded,
24
25 #[error("Invalid configuration: {0}")]
26 InvalidConfig(String),
27}
28
29#[derive(Debug, Clone)]
31pub struct QueryBatcherConfig {
32 pub max_batch_size: usize,
34
35 pub batch_window: Duration,
37
38 pub max_queries_per_second: u64,
40
41 pub enable_deduplication: bool,
43
44 pub dedup_window: Duration,
46
47 pub max_pending_queries: usize,
49
50 pub enable_adaptive_rate: bool,
52
53 pub target_success_rate: f64,
55}
56
57impl Default for QueryBatcherConfig {
58 fn default() -> Self {
59 Self {
60 max_batch_size: 10,
61 batch_window: Duration::from_millis(100),
62 max_queries_per_second: 100,
63 enable_deduplication: true,
64 dedup_window: Duration::from_secs(5),
65 max_pending_queries: 1000,
66 enable_adaptive_rate: true,
67 target_success_rate: 0.8,
68 }
69 }
70}
71
72impl QueryBatcherConfig {
73 pub fn low_power() -> Self {
75 Self {
76 max_batch_size: 20,
77 batch_window: Duration::from_millis(500),
78 max_queries_per_second: 10,
79 enable_deduplication: true,
80 dedup_window: Duration::from_secs(10),
81 max_pending_queries: 100,
82 enable_adaptive_rate: true,
83 target_success_rate: 0.7,
84 }
85 }
86
87 pub fn mobile() -> Self {
89 Self {
90 max_batch_size: 15,
91 batch_window: Duration::from_millis(200),
92 max_queries_per_second: 50,
93 enable_deduplication: true,
94 dedup_window: Duration::from_secs(5),
95 max_pending_queries: 500,
96 enable_adaptive_rate: true,
97 target_success_rate: 0.75,
98 }
99 }
100
101 pub fn high_performance() -> Self {
103 Self {
104 max_batch_size: 5,
105 batch_window: Duration::from_millis(50),
106 max_queries_per_second: 500,
107 enable_deduplication: false,
108 dedup_window: Duration::from_secs(1),
109 max_pending_queries: 5000,
110 enable_adaptive_rate: false,
111 target_success_rate: 0.9,
112 }
113 }
114
115 pub fn validate(&self) -> Result<(), QueryBatcherError> {
117 if self.max_batch_size == 0 {
118 return Err(QueryBatcherError::InvalidConfig(
119 "max_batch_size must be > 0".to_string(),
120 ));
121 }
122
123 if self.max_queries_per_second == 0 {
124 return Err(QueryBatcherError::InvalidConfig(
125 "max_queries_per_second must be > 0".to_string(),
126 ));
127 }
128
129 if self.target_success_rate < 0.0 || self.target_success_rate > 1.0 {
130 return Err(QueryBatcherError::InvalidConfig(
131 "target_success_rate must be in [0.0, 1.0]".to_string(),
132 ));
133 }
134
135 Ok(())
136 }
137}
138
139#[derive(Debug, Clone, PartialEq, Eq, Hash)]
141pub enum QueryType {
142 FindProviders(String),
144 FindPeer(String),
146 GetValue(String),
148 PutValue(String),
150}
151
152impl QueryType {
153 pub fn key(&self) -> String {
155 match self {
156 QueryType::FindProviders(cid) => format!("providers:{}", cid),
157 QueryType::FindPeer(peer) => format!("peer:{}", peer),
158 QueryType::GetValue(key) => format!("get:{}", key),
159 QueryType::PutValue(key) => format!("put:{}", key),
160 }
161 }
162}
163
164#[derive(Debug, Clone)]
166pub struct PendingQuery {
167 pub query_type: QueryType,
169 pub added_at: Instant,
171 pub response_tx: Option<mpsc::UnboundedSender<QueryBatchResult>>,
173}
174
175#[derive(Debug, Clone)]
177pub struct QueryBatchResult {
178 pub success: bool,
180 pub result_count: usize,
182 pub duration: Duration,
184}
185
186#[derive(Debug)]
188struct BatcherState {
189 current_batch: Vec<PendingQuery>,
191 last_batch_sent: Instant,
193 queries_this_second: u64,
195 second_start: Instant,
197 recent_queries: HashMap<String, Instant>,
199 rate_multiplier: f64,
201 recent_success_rate: f64,
203}
204
205impl BatcherState {
206 fn new() -> Self {
207 let now = Instant::now();
208 Self {
209 current_batch: Vec::new(),
210 last_batch_sent: now,
211 queries_this_second: 0,
212 second_start: now,
213 recent_queries: HashMap::new(),
214 rate_multiplier: 1.0,
215 recent_success_rate: 1.0,
216 }
217 }
218}
219
220pub struct QueryBatcher {
222 config: QueryBatcherConfig,
223 state: Arc<RwLock<BatcherState>>,
224 stats: Arc<RwLock<QueryBatcherStats>>,
225}
226
227impl QueryBatcher {
228 pub fn new(config: QueryBatcherConfig) -> Result<Self, QueryBatcherError> {
230 config.validate()?;
231
232 Ok(Self {
233 config,
234 state: Arc::new(RwLock::new(BatcherState::new())),
235 stats: Arc::new(RwLock::new(QueryBatcherStats::default())),
236 })
237 }
238
239 pub fn add_query(&self, query: QueryType) -> Result<(), QueryBatcherError> {
241 let mut state = self.state.write();
242 let mut stats = self.stats.write();
243
244 if state.current_batch.len() >= self.config.max_pending_queries {
246 stats.queries_dropped += 1;
247 return Err(QueryBatcherError::QueueFull);
248 }
249
250 let now = Instant::now();
252 if now.duration_since(state.second_start) >= Duration::from_secs(1) {
253 state.queries_this_second = 0;
254 state.second_start = now;
255 }
256
257 let effective_rate_limit =
258 (self.config.max_queries_per_second as f64 * state.rate_multiplier) as u64;
259
260 if state.queries_this_second >= effective_rate_limit {
261 stats.queries_rate_limited += 1;
262 return Err(QueryBatcherError::RateLimitExceeded);
263 }
264
265 if self.config.enable_deduplication {
267 let key = query.key();
268 if let Some(&last_query) = state.recent_queries.get(&key) {
269 if now.duration_since(last_query) < self.config.dedup_window {
270 stats.queries_deduplicated += 1;
271 return Ok(()); }
273 }
274 state.recent_queries.insert(key, now);
275 }
276
277 let pending = PendingQuery {
279 query_type: query,
280 added_at: now,
281 response_tx: None,
282 };
283
284 state.current_batch.push(pending);
285 state.queries_this_second += 1;
286 stats.queries_batched += 1;
287
288 Ok(())
289 }
290
291 pub fn should_send_batch(&self) -> bool {
293 let state = self.state.read();
294
295 if state.current_batch.is_empty() {
296 return false;
297 }
298
299 if state.current_batch.len() >= self.config.max_batch_size {
301 return true;
302 }
303
304 let now = Instant::now();
306 if now.duration_since(state.last_batch_sent) >= self.config.batch_window {
307 return true;
308 }
309
310 false
311 }
312
313 pub fn take_batch(&self) -> Vec<PendingQuery> {
315 let mut state = self.state.write();
316 let mut stats = self.stats.write();
317
318 let batch = std::mem::take(&mut state.current_batch);
319 state.last_batch_sent = Instant::now();
320
321 if !batch.is_empty() {
322 stats.batches_sent += 1;
323 stats.total_queries_sent += batch.len() as u64;
324 }
325
326 batch
327 }
328
329 pub fn record_result(&self, result: QueryBatchResult) {
331 let mut state = self.state.write();
332 let mut stats = self.stats.write();
333
334 if result.success {
335 stats.successful_queries += 1;
336 } else {
337 stats.failed_queries += 1;
338 }
339
340 if self.config.enable_adaptive_rate {
342 let total = stats.successful_queries + stats.failed_queries;
343 if total > 0 {
344 state.recent_success_rate = stats.successful_queries as f64 / total as f64;
345
346 if state.recent_success_rate < self.config.target_success_rate {
348 state.rate_multiplier = (state.rate_multiplier * 0.9).max(0.1);
350 stats.rate_adjustments += 1;
351 } else if state.recent_success_rate > self.config.target_success_rate + 0.1 {
352 state.rate_multiplier = (state.rate_multiplier * 1.1).min(2.0);
354 stats.rate_adjustments += 1;
355 }
356 }
357 }
358 }
359
360 pub fn stats(&self) -> QueryBatcherStats {
362 self.stats.read().clone()
363 }
364
365 pub fn rate_multiplier(&self) -> f64 {
367 self.state.read().rate_multiplier
368 }
369
370 pub fn success_rate(&self) -> f64 {
372 self.state.read().recent_success_rate
373 }
374
375 pub fn cleanup_dedup_cache(&self) {
377 let mut state = self.state.write();
378 let now = Instant::now();
379
380 state.recent_queries.retain(|_, &mut last_query| {
381 now.duration_since(last_query) < self.config.dedup_window * 2
382 });
383 }
384
385 pub fn reset_stats(&self) {
387 *self.stats.write() = QueryBatcherStats::default();
388 }
389}
390
391#[derive(Debug, Clone, Default)]
393pub struct QueryBatcherStats {
394 pub queries_batched: u64,
396 pub queries_dropped: u64,
398 pub queries_rate_limited: u64,
400 pub queries_deduplicated: u64,
402 pub batches_sent: u64,
404 pub total_queries_sent: u64,
406 pub successful_queries: u64,
408 pub failed_queries: u64,
410 pub rate_adjustments: u64,
412}
413
414impl QueryBatcherStats {
415 pub fn dedup_ratio(&self) -> f64 {
417 if self.queries_batched == 0 {
418 return 0.0;
419 }
420 self.queries_deduplicated as f64 / self.queries_batched as f64
421 }
422
423 pub fn batching_efficiency(&self) -> f64 {
425 if self.queries_batched == 0 {
426 return 0.0;
427 }
428 let saved = self.queries_batched - self.total_queries_sent;
429 saved as f64 / self.queries_batched as f64
430 }
431
432 pub fn success_rate(&self) -> f64 {
434 let total = self.successful_queries + self.failed_queries;
435 if total == 0 {
436 return 0.0;
437 }
438 self.successful_queries as f64 / total as f64
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445
446 #[test]
447 fn test_config_default() {
448 let config = QueryBatcherConfig::default();
449 assert!(config.validate().is_ok());
450 assert_eq!(config.max_batch_size, 10);
451 assert!(config.enable_deduplication);
452 }
453
454 #[test]
455 fn test_config_low_power() {
456 let config = QueryBatcherConfig::low_power();
457 assert!(config.validate().is_ok());
458 assert_eq!(config.max_queries_per_second, 10);
459 }
460
461 #[test]
462 fn test_config_mobile() {
463 let config = QueryBatcherConfig::mobile();
464 assert!(config.validate().is_ok());
465 assert_eq!(config.max_queries_per_second, 50);
466 }
467
468 #[test]
469 fn test_config_high_performance() {
470 let config = QueryBatcherConfig::high_performance();
471 assert!(config.validate().is_ok());
472 assert!(!config.enable_deduplication);
473 }
474
475 #[test]
476 fn test_config_validation() {
477 let config = QueryBatcherConfig {
478 max_batch_size: 0,
479 ..Default::default()
480 };
481 assert!(config.validate().is_err());
482 }
483
484 #[test]
485 fn test_query_type_key() {
486 let q1 = QueryType::FindProviders("QmTest".to_string());
487 let q2 = QueryType::FindProviders("QmTest".to_string());
488 assert_eq!(q1.key(), q2.key());
489 }
490
491 #[test]
492 fn test_add_query() {
493 let config = QueryBatcherConfig::default();
494 let batcher = QueryBatcher::new(config).unwrap();
495
496 let query = QueryType::FindProviders("QmTest".to_string());
497 let result = batcher.add_query(query);
498 assert!(result.is_ok());
499
500 let stats = batcher.stats();
501 assert_eq!(stats.queries_batched, 1);
502 }
503
504 #[test]
505 fn test_deduplication() {
506 let config = QueryBatcherConfig::default();
507 let batcher = QueryBatcher::new(config).unwrap();
508
509 let query = QueryType::FindProviders("QmTest".to_string());
510
511 batcher.add_query(query.clone()).unwrap();
512 batcher.add_query(query).unwrap(); let stats = batcher.stats();
515 assert_eq!(stats.queries_deduplicated, 1);
516 assert_eq!(stats.queries_batched, 1); }
518
519 #[test]
520 fn test_batch_ready_when_full() {
521 let config = QueryBatcherConfig {
522 max_batch_size: 3,
523 ..Default::default()
524 };
525 let batcher = QueryBatcher::new(config).unwrap();
526
527 for i in 0..3 {
528 let query = QueryType::FindProviders(format!("QmTest{}", i));
529 batcher.add_query(query).unwrap();
530 }
531
532 assert!(batcher.should_send_batch());
533 }
534
535 #[test]
536 fn test_take_batch() {
537 let config = QueryBatcherConfig::default();
538 let batcher = QueryBatcher::new(config).unwrap();
539
540 for i in 0..5 {
541 let query = QueryType::FindProviders(format!("QmTest{}", i));
542 batcher.add_query(query).unwrap();
543 }
544
545 let batch = batcher.take_batch();
546 assert_eq!(batch.len(), 5);
547
548 let batch2 = batcher.take_batch();
549 assert_eq!(batch2.len(), 0);
550 }
551
552 #[test]
553 fn test_rate_limit() {
554 let config = QueryBatcherConfig {
555 max_queries_per_second: 5,
556 ..Default::default()
557 };
558 let batcher = QueryBatcher::new(config).unwrap();
559
560 for i in 0..5 {
562 let query = QueryType::FindProviders(format!("QmTest{}", i));
563 assert!(batcher.add_query(query).is_ok());
564 }
565
566 let query = QueryType::FindProviders("QmTest6".to_string());
568 let result = batcher.add_query(query);
569 assert!(matches!(result, Err(QueryBatcherError::RateLimitExceeded)));
570 }
571
572 #[test]
573 fn test_adaptive_rate_limiting() {
574 let config = QueryBatcherConfig::default();
575 let batcher = QueryBatcher::new(config).unwrap();
576
577 let initial_rate = batcher.rate_multiplier();
578
579 for _ in 0..10 {
581 batcher.record_result(QueryBatchResult {
582 success: false,
583 result_count: 0,
584 duration: Duration::from_millis(100),
585 });
586 }
587
588 let rate_after_failures = batcher.rate_multiplier();
589 assert!(rate_after_failures < initial_rate);
590 }
591
592 #[test]
593 fn test_stats_dedup_ratio() {
594 let stats = QueryBatcherStats {
595 queries_batched: 100,
596 queries_deduplicated: 20,
597 ..Default::default()
598 };
599
600 assert_eq!(stats.dedup_ratio(), 0.2);
601 }
602
603 #[test]
604 fn test_stats_batching_efficiency() {
605 let stats = QueryBatcherStats {
606 queries_batched: 100,
607 total_queries_sent: 60,
608 ..Default::default()
609 };
610
611 assert_eq!(stats.batching_efficiency(), 0.4);
612 }
613
614 #[test]
615 fn test_cleanup_dedup_cache() {
616 let config = QueryBatcherConfig::default();
617 let batcher = QueryBatcher::new(config).unwrap();
618
619 let query = QueryType::FindProviders("QmTest".to_string());
620 batcher.add_query(query).unwrap();
621
622 {
624 let state = batcher.state.read();
625 assert_eq!(state.recent_queries.len(), 1);
626 }
627
628 batcher.cleanup_dedup_cache();
629
630 {
632 let state = batcher.state.read();
633 assert_eq!(state.recent_queries.len(), 1);
634 }
635 }
636}