1use crossbeam_channel::{Receiver, Sender, bounded};
44use std::cmp::Ordering;
45use std::collections::BinaryHeap;
46use std::sync::Arc;
47use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
48#[cfg(test)]
49use std::thread;
50
51#[derive(Clone, Debug)]
53pub struct MergeEdge {
54 pub edge_id: u128,
56 pub timestamp_us: u64,
58 pub is_tombstone: bool,
60 pub data: [u8; 128],
62 pub source_idx: usize,
64}
65
66impl PartialEq for MergeEdge {
67 fn eq(&self, other: &Self) -> bool {
68 self.timestamp_us == other.timestamp_us && self.edge_id == other.edge_id
69 }
70}
71
72impl Eq for MergeEdge {}
73
74impl PartialOrd for MergeEdge {
75 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
76 Some(self.cmp(other))
77 }
78}
79
80impl Ord for MergeEdge {
81 fn cmp(&self, other: &Self) -> Ordering {
82 match other.timestamp_us.cmp(&self.timestamp_us) {
84 Ordering::Equal => other.edge_id.cmp(&self.edge_id),
85 ord => ord,
86 }
87 }
88}
89
90struct HeapEntry {
92 edge: MergeEdge,
93 source_idx: usize,
94}
95
96impl PartialEq for HeapEntry {
97 fn eq(&self, other: &Self) -> bool {
98 self.edge == other.edge
99 }
100}
101
102impl Eq for HeapEntry {}
103
104impl PartialOrd for HeapEntry {
105 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
106 Some(self.cmp(other))
107 }
108}
109
110impl Ord for HeapEntry {
111 fn cmp(&self, other: &Self) -> Ordering {
112 self.edge.cmp(&other.edge)
113 }
114}
115
116#[derive(Debug, Clone)]
118pub struct ParallelMergeConfig {
119 pub reader_threads: usize,
121 pub merger_threads: usize,
123 pub channel_buffer_size: usize,
125 pub read_batch_size: usize,
127}
128
129impl Default for ParallelMergeConfig {
130 fn default() -> Self {
131 let num_cpus = num_cpus::get();
132 Self {
133 reader_threads: (num_cpus / 2).max(1),
134 merger_threads: (num_cpus / 2).max(1),
135 channel_buffer_size: 1024,
136 read_batch_size: 256,
137 }
138 }
139}
140
141impl ParallelMergeConfig {
142 pub fn for_inputs(num_inputs: usize) -> Self {
144 let num_cpus = num_cpus::get();
145 Self {
146 reader_threads: num_inputs.min(num_cpus),
147 merger_threads: (num_cpus / 2).max(1),
148 channel_buffer_size: 1024,
149 read_batch_size: 256,
150 }
151 }
152}
153
154#[derive(Debug, Default)]
156pub struct ParallelMergeStats {
157 pub edges_read: AtomicU64,
159 pub edges_written: AtomicU64,
161 pub tombstones_filtered: AtomicU64,
163 pub duplicates_merged: AtomicU64,
165}
166
167impl ParallelMergeStats {
168 pub fn new() -> Arc<Self> {
170 Arc::new(Self::default())
171 }
172
173 pub fn record_read(&self, count: u64) {
175 self.edges_read.fetch_add(count, AtomicOrdering::Relaxed);
176 }
177
178 pub fn record_written(&self, count: u64) {
180 self.edges_written.fetch_add(count, AtomicOrdering::Relaxed);
181 }
182
183 pub fn record_tombstone(&self) {
185 self.tombstones_filtered
186 .fetch_add(1, AtomicOrdering::Relaxed);
187 }
188
189 pub fn record_duplicate(&self) {
191 self.duplicates_merged.fetch_add(1, AtomicOrdering::Relaxed);
192 }
193
194 pub fn snapshot(&self) -> ParallelMergeStatsSnapshot {
196 ParallelMergeStatsSnapshot {
197 edges_read: self.edges_read.load(AtomicOrdering::Relaxed),
198 edges_written: self.edges_written.load(AtomicOrdering::Relaxed),
199 tombstones_filtered: self.tombstones_filtered.load(AtomicOrdering::Relaxed),
200 duplicates_merged: self.duplicates_merged.load(AtomicOrdering::Relaxed),
201 }
202 }
203}
204
205#[derive(Debug, Clone)]
207pub struct ParallelMergeStatsSnapshot {
208 pub edges_read: u64,
209 pub edges_written: u64,
210 pub tombstones_filtered: u64,
211 pub duplicates_merged: u64,
212}
213
214#[derive(Debug)]
234pub struct IoThrottler {
235 tokens: AtomicU64,
237 capacity: u64,
239 refill_rate: u64,
241 last_refill: AtomicU64,
243 total_throttled: AtomicU64,
245 total_wait_us: AtomicU64,
247 enabled: std::sync::atomic::AtomicBool,
249}
250
251impl IoThrottler {
252 pub fn new(rate_bytes_per_sec: u64, burst_bytes: u64) -> Self {
258 Self {
259 tokens: AtomicU64::new(burst_bytes),
260 capacity: burst_bytes,
261 refill_rate: rate_bytes_per_sec,
262 last_refill: AtomicU64::new(Self::now_us()),
263 total_throttled: AtomicU64::new(0),
264 total_wait_us: AtomicU64::new(0),
265 enabled: std::sync::atomic::AtomicBool::new(true),
266 }
267 }
268
269 pub fn for_compaction() -> Self {
273 Self::new(100 * 1024 * 1024, 10 * 1024 * 1024)
274 }
275
276 pub fn unlimited() -> Self {
278 let throttler = Self::new(u64::MAX, u64::MAX);
279 throttler.enabled.store(false, AtomicOrdering::Relaxed);
280 throttler
281 }
282
283 fn now_us() -> u64 {
284 std::time::SystemTime::now()
285 .duration_since(std::time::UNIX_EPOCH)
286 .unwrap()
287 .as_micros() as u64
288 }
289
290 fn refill(&self) {
292 let now = Self::now_us();
293 let last = self.last_refill.swap(now, AtomicOrdering::AcqRel);
294 let elapsed_us = now.saturating_sub(last);
295
296 if elapsed_us > 0 {
297 let tokens_to_add = (elapsed_us as u128 * self.refill_rate as u128 / 1_000_000) as u64;
300
301 if tokens_to_add > 0 {
302 let current = self.tokens.load(AtomicOrdering::Relaxed);
303 let new_tokens = current.saturating_add(tokens_to_add).min(self.capacity);
304 self.tokens.store(new_tokens, AtomicOrdering::Release);
305 }
306 }
307 }
308
309 pub fn acquire(&self, bytes: u64) -> u64 {
313 if !self.enabled.load(AtomicOrdering::Relaxed) {
314 return 0;
315 }
316
317 let mut total_wait = 0u64;
318
319 loop {
320 self.refill();
321
322 let current = self.tokens.load(AtomicOrdering::Acquire);
323
324 if current >= bytes {
325 match self.tokens.compare_exchange_weak(
327 current,
328 current - bytes,
329 AtomicOrdering::AcqRel,
330 AtomicOrdering::Acquire,
331 ) {
332 Ok(_) => {
333 if total_wait > 0 {
334 self.total_wait_us
335 .fetch_add(total_wait, AtomicOrdering::Relaxed);
336 self.total_throttled
337 .fetch_add(bytes, AtomicOrdering::Relaxed);
338 }
339 return total_wait;
340 }
341 Err(_) => continue, }
343 }
344
345 let deficit = bytes.saturating_sub(current);
348 let wait_us = (deficit as u128 * 1_000_000 / self.refill_rate as u128) as u64;
349 let wait_us = wait_us.clamp(100, 100_000); std::thread::sleep(std::time::Duration::from_micros(wait_us));
352 total_wait += wait_us;
353 }
354 }
355
356 pub fn try_acquire(&self, bytes: u64) -> bool {
360 if !self.enabled.load(AtomicOrdering::Relaxed) {
361 return true;
362 }
363
364 self.refill();
365
366 let current = self.tokens.load(AtomicOrdering::Acquire);
367 if current >= bytes {
368 self.tokens
369 .compare_exchange_weak(
370 current,
371 current - bytes,
372 AtomicOrdering::AcqRel,
373 AtomicOrdering::Acquire,
374 )
375 .is_ok()
376 } else {
377 false
378 }
379 }
380
381 pub fn set_enabled(&self, enabled: bool) {
383 self.enabled.store(enabled, AtomicOrdering::Release);
384 }
385
386 pub fn is_enabled(&self) -> bool {
388 self.enabled.load(AtomicOrdering::Relaxed)
389 }
390
391 pub fn set_rate(&mut self, rate_bytes_per_sec: u64) {
393 self.refill_rate = rate_bytes_per_sec;
394 }
395
396 pub fn available_tokens(&self) -> u64 {
398 self.refill();
399 self.tokens.load(AtomicOrdering::Relaxed)
400 }
401
402 pub fn stats(&self) -> IoThrottlerStats {
404 IoThrottlerStats {
405 total_throttled_bytes: self.total_throttled.load(AtomicOrdering::Relaxed),
406 total_wait_us: self.total_wait_us.load(AtomicOrdering::Relaxed),
407 available_tokens: self.available_tokens(),
408 rate_bytes_per_sec: self.refill_rate,
409 enabled: self.is_enabled(),
410 }
411 }
412}
413
414#[derive(Debug, Clone)]
416pub struct IoThrottlerStats {
417 pub total_throttled_bytes: u64,
419 pub total_wait_us: u64,
421 pub available_tokens: u64,
423 pub rate_bytes_per_sec: u64,
425 pub enabled: bool,
427}
428
429#[derive(Debug)]
434pub struct AdaptiveIoController {
435 throttler: IoThrottler,
437 target_latency_us: u64,
439 min_rate: u64,
441 max_rate: u64,
443 rate_multiplier: std::sync::atomic::AtomicU64,
445}
446
447impl AdaptiveIoController {
448 pub fn new(base_rate: u64, target_latency_us: u64) -> Self {
450 Self {
451 throttler: IoThrottler::new(base_rate, base_rate / 10),
452 target_latency_us,
453 min_rate: base_rate / 10,
454 max_rate: base_rate,
455 rate_multiplier: std::sync::atomic::AtomicU64::new(1000), }
457 }
458
459 pub fn report_latency(&self, latency_us: u64) {
463 let current_mult = self.rate_multiplier.load(AtomicOrdering::Relaxed);
464
465 let new_mult = if latency_us > self.target_latency_us * 2 {
466 (current_mult * 8 / 10).max(100) } else if latency_us > self.target_latency_us {
469 (current_mult * 95 / 100).max(100) } else if latency_us < self.target_latency_us / 2 && current_mult < 1000 {
472 (current_mult * 105 / 100).min(1000) } else {
475 current_mult
476 };
477
478 self.rate_multiplier
479 .store(new_mult, AtomicOrdering::Relaxed);
480 }
481
482 pub fn effective_rate(&self) -> u64 {
484 let mult = self.rate_multiplier.load(AtomicOrdering::Relaxed);
485 let rate = (self.max_rate as u128 * mult as u128 / 1000) as u64;
486 rate.max(self.min_rate).min(self.max_rate)
487 }
488
489 pub fn acquire(&self, bytes: u64) -> u64 {
491 self.throttler.acquire(bytes)
492 }
493
494 pub fn throttler(&self) -> &IoThrottler {
496 &self.throttler
497 }
498}
499
500pub struct ParallelReader {
502 sender: Sender<MergeEdge>,
504 source_idx: usize,
506 stats: Arc<ParallelMergeStats>,
508}
509
510impl ParallelReader {
511 pub fn new(
513 sender: Sender<MergeEdge>,
514 source_idx: usize,
515 stats: Arc<ParallelMergeStats>,
516 ) -> Self {
517 Self {
518 sender,
519 source_idx,
520 stats,
521 }
522 }
523
524 #[allow(clippy::result_large_err)]
526 pub fn send(&self, edge: MergeEdge) -> Result<(), crossbeam_channel::SendError<MergeEdge>> {
527 self.stats.record_read(1);
528 self.sender.send(edge)
529 }
530
531 pub fn source_idx(&self) -> usize {
533 self.source_idx
534 }
535}
536
537pub struct ParallelMerger {
539 receivers: Vec<Receiver<MergeEdge>>,
541 #[allow(dead_code)]
543 config: ParallelMergeConfig,
544 stats: Arc<ParallelMergeStats>,
546}
547
548impl ParallelMerger {
549 pub fn create_channels(
551 num_inputs: usize,
552 config: &ParallelMergeConfig,
553 stats: Arc<ParallelMergeStats>,
554 ) -> (Vec<ParallelReader>, Self) {
555 let mut senders = Vec::with_capacity(num_inputs);
556 let mut receivers = Vec::with_capacity(num_inputs);
557
558 for i in 0..num_inputs {
559 let (tx, rx) = bounded(config.channel_buffer_size);
560 senders.push(ParallelReader::new(tx, i, stats.clone()));
561 receivers.push(rx);
562 }
563
564 let merger = Self {
565 receivers,
566 config: config.clone(),
567 stats,
568 };
569
570 (senders, merger)
571 }
572
573 pub fn merge(self) -> Vec<MergeEdge> {
577 let mut heap: BinaryHeap<HeapEntry> = BinaryHeap::new();
578 let mut result = Vec::new();
579 let mut last_key: Option<(u64, u128)> = None;
580
581 for (idx, rx) in self.receivers.iter().enumerate() {
583 if let Ok(edge) = rx.recv() {
584 heap.push(HeapEntry {
585 edge,
586 source_idx: idx,
587 });
588 }
589 }
590
591 while let Some(entry) = heap.pop() {
592 let edge = entry.edge;
593 let source_idx = entry.source_idx;
594
595 let key = (edge.timestamp_us, edge.edge_id);
597 let is_duplicate = last_key.map(|k| k == key).unwrap_or(false);
598
599 if is_duplicate {
600 self.stats.record_duplicate();
601 } else if edge.is_tombstone {
602 self.stats.record_tombstone();
604 result.push(edge.clone());
605 self.stats.record_written(1);
606 } else {
607 result.push(edge.clone());
608 self.stats.record_written(1);
609 }
610
611 last_key = Some(key);
612
613 if let Ok(next_edge) = self.receivers[source_idx].recv() {
615 heap.push(HeapEntry {
616 edge: next_edge,
617 source_idx,
618 });
619 }
620 }
621
622 result
623 }
624
625 pub fn merge_with_callback<F>(self, mut callback: F)
627 where
628 F: FnMut(MergeEdge),
629 {
630 let mut heap: BinaryHeap<HeapEntry> = BinaryHeap::new();
631 let mut last_key: Option<(u64, u128)> = None;
632
633 for (idx, rx) in self.receivers.iter().enumerate() {
635 if let Ok(edge) = rx.recv() {
636 heap.push(HeapEntry {
637 edge,
638 source_idx: idx,
639 });
640 }
641 }
642
643 while let Some(entry) = heap.pop() {
644 let edge = entry.edge;
645 let source_idx = entry.source_idx;
646
647 let key = (edge.timestamp_us, edge.edge_id);
648 let is_duplicate = last_key.map(|k| k == key).unwrap_or(false);
649
650 if is_duplicate {
651 self.stats.record_duplicate();
652 } else {
653 if edge.is_tombstone {
654 self.stats.record_tombstone();
655 }
656 callback(edge.clone());
657 self.stats.record_written(1);
658 }
659
660 last_key = Some(key);
661
662 if let Ok(next_edge) = self.receivers[source_idx].recv() {
664 heap.push(HeapEntry {
665 edge: next_edge,
666 source_idx,
667 });
668 }
669 }
670 }
671}
672
673#[derive(Debug, Clone)]
675pub struct KeyRange {
676 pub min_ts: u64,
678 pub max_ts: u64,
680}
681
682impl KeyRange {
683 pub fn new(min_ts: u64, max_ts: u64) -> Self {
685 Self { min_ts, max_ts }
686 }
687
688 pub fn contains(&self, ts: u64) -> bool {
690 ts >= self.min_ts && ts < self.max_ts
691 }
692}
693
694pub fn partition_key_space(min_ts: u64, max_ts: u64, num_partitions: usize) -> Vec<KeyRange> {
696 if num_partitions == 0 || max_ts <= min_ts {
697 return vec![KeyRange::new(min_ts, max_ts)];
698 }
699
700 let range = max_ts - min_ts;
701 let partition_size = range / num_partitions as u64;
702
703 (0..num_partitions)
704 .map(|i| {
705 let start = min_ts + (i as u64 * partition_size);
706 let end = if i == num_partitions - 1 {
707 max_ts
708 } else {
709 min_ts + ((i as u64 + 1) * partition_size)
710 };
711 KeyRange::new(start, end)
712 })
713 .collect()
714}
715
716pub struct ParallelMergeBuilder {
718 config: ParallelMergeConfig,
719 stats: Arc<ParallelMergeStats>,
720}
721
722impl ParallelMergeBuilder {
723 pub fn new() -> Self {
725 Self {
726 config: ParallelMergeConfig::default(),
727 stats: ParallelMergeStats::new(),
728 }
729 }
730
731 pub fn config(mut self, config: ParallelMergeConfig) -> Self {
733 self.config = config;
734 self
735 }
736
737 pub fn stats(mut self, stats: Arc<ParallelMergeStats>) -> Self {
739 self.stats = stats;
740 self
741 }
742
743 pub fn build(self, num_inputs: usize) -> (Vec<ParallelReader>, ParallelMerger) {
745 ParallelMerger::create_channels(num_inputs, &self.config, self.stats)
746 }
747}
748
749impl Default for ParallelMergeBuilder {
750 fn default() -> Self {
751 Self::new()
752 }
753}
754
755#[cfg(test)]
756mod tests {
757 use super::*;
758 use std::sync::atomic::AtomicUsize;
759
760 fn create_test_edge(edge_id: u128, timestamp_us: u64, is_tombstone: bool) -> MergeEdge {
761 MergeEdge {
762 edge_id,
763 timestamp_us,
764 is_tombstone,
765 data: [0u8; 128],
766 source_idx: 0,
767 }
768 }
769
770 #[test]
771 fn test_merge_edge_ordering() {
772 let e1 = create_test_edge(1, 1000, false);
773 let e2 = create_test_edge(2, 1000, false);
774 let e3 = create_test_edge(1, 2000, false);
775
776 assert!(e1 > e2); assert!(e1 > e3); }
780
781 #[test]
782 fn test_parallel_merge_basic() {
783 let stats = ParallelMergeStats::new();
784 let config = ParallelMergeConfig {
785 reader_threads: 2,
786 merger_threads: 1,
787 channel_buffer_size: 100,
788 read_batch_size: 10,
789 };
790
791 let (readers, merger) = ParallelMerger::create_channels(3, &config, stats.clone());
792
793 let handles: Vec<_> = readers
795 .into_iter()
796 .enumerate()
797 .map(|(idx, reader)| {
798 thread::spawn(move || {
799 for i in 0..10u64 {
800 let edge = MergeEdge {
801 edge_id: (idx * 100 + i as usize) as u128,
802 timestamp_us: i * 100 + idx as u64, is_tombstone: false,
804 data: [0u8; 128],
805 source_idx: idx,
806 };
807 reader.send(edge).unwrap();
808 }
809 })
810 })
811 .collect();
812
813 for h in handles {
815 h.join().unwrap();
816 }
817
818 let result = merger.merge();
820
821 assert_eq!(result.len(), 30);
823
824 for i in 1..result.len() {
826 let prev = &result[i - 1];
827 let curr = &result[i];
828 assert!(
829 prev.timestamp_us < curr.timestamp_us
830 || (prev.timestamp_us == curr.timestamp_us && prev.edge_id < curr.edge_id),
831 "Results should be sorted"
832 );
833 }
834
835 let snapshot = stats.snapshot();
837 assert_eq!(snapshot.edges_read, 30);
838 assert_eq!(snapshot.edges_written, 30);
839 }
840
841 #[test]
842 fn test_parallel_merge_duplicates() {
843 let stats = ParallelMergeStats::new();
844 let config = ParallelMergeConfig::default();
845 let (readers, merger) = ParallelMerger::create_channels(2, &config, stats.clone());
846
847 let handles: Vec<_> = readers
849 .into_iter()
850 .map(|reader| {
851 thread::spawn(move || {
852 let edge = create_test_edge(42, 1000, false);
853 reader.send(edge).unwrap();
854 })
855 })
856 .collect();
857
858 for h in handles {
859 h.join().unwrap();
860 }
861
862 let result = merger.merge();
863
864 assert_eq!(result.len(), 1);
866
867 let snapshot = stats.snapshot();
868 assert_eq!(snapshot.edges_read, 2);
869 assert_eq!(snapshot.duplicates_merged, 1);
870 }
871
872 #[test]
873 fn test_partition_key_space() {
874 let partitions = partition_key_space(0, 1000, 4);
875
876 assert_eq!(partitions.len(), 4);
877 assert_eq!(partitions[0].min_ts, 0);
878 assert_eq!(partitions[0].max_ts, 250);
879 assert_eq!(partitions[1].min_ts, 250);
880 assert_eq!(partitions[1].max_ts, 500);
881 assert_eq!(partitions[2].min_ts, 500);
882 assert_eq!(partitions[2].max_ts, 750);
883 assert_eq!(partitions[3].min_ts, 750);
884 assert_eq!(partitions[3].max_ts, 1000);
885 }
886
887 #[test]
888 fn test_key_range_contains() {
889 let range = KeyRange::new(100, 200);
890
891 assert!(!range.contains(99));
892 assert!(range.contains(100));
893 assert!(range.contains(150));
894 assert!(!range.contains(200));
895 }
896
897 #[test]
898 fn test_merge_with_callback() {
899 let stats = ParallelMergeStats::new();
900 let config = ParallelMergeConfig::default();
901 let (readers, merger) = ParallelMerger::create_channels(2, &config, stats.clone());
902
903 let handles: Vec<_> = readers
904 .into_iter()
905 .enumerate()
906 .map(|(idx, reader)| {
907 thread::spawn(move || {
908 for i in 0..5u64 {
909 let edge =
910 create_test_edge((idx * 10 + i as usize) as u128, i * 100, false);
911 reader.send(edge).unwrap();
912 }
913 })
914 })
915 .collect();
916
917 for h in handles {
918 h.join().unwrap();
919 }
920
921 let count = Arc::new(AtomicUsize::new(0));
922 let count_clone = count.clone();
923
924 merger.merge_with_callback(move |_edge| {
925 count_clone.fetch_add(1, AtomicOrdering::Relaxed);
926 });
927
928 assert_eq!(count.load(AtomicOrdering::Relaxed), 10);
929 }
930
931 #[test]
932 fn test_parallel_merge_config() {
933 let config = ParallelMergeConfig::for_inputs(8);
934 assert!(config.reader_threads >= 1);
935 assert!(config.merger_threads >= 1);
936
937 let default = ParallelMergeConfig::default();
938 assert!(default.channel_buffer_size > 0);
939 assert!(default.read_batch_size > 0);
940 }
941
942 #[test]
945 fn test_throttler_basic() {
946 let throttler = IoThrottler::new(1_000_000, 100_000); assert!(throttler.try_acquire(50_000));
950 assert!(throttler.try_acquire(50_000));
951
952 assert!(!throttler.try_acquire(50_000));
954 }
955
956 #[test]
957 fn test_throttler_unlimited() {
958 let throttler = IoThrottler::unlimited();
959
960 assert!(throttler.try_acquire(1_000_000_000));
962 assert!(!throttler.is_enabled());
963 }
964
965 #[test]
966 fn test_throttler_blocking_acquire() {
967 let throttler = IoThrottler::new(1_000, 100); assert!(throttler.try_acquire(100));
972
973 let start = std::time::Instant::now();
975 let _wait = throttler.acquire(100);
976 let elapsed = start.elapsed();
977
978 assert!(elapsed.as_millis() >= 10 || throttler.stats().total_wait_us > 0);
981 }
982
983 #[test]
984 fn test_throttler_stats() {
985 let throttler = IoThrottler::new(10_000_000, 10_000);
986
987 let initial = throttler.stats().available_tokens;
989
990 throttler.try_acquire(5_000);
991 throttler.try_acquire(3_000);
992
993 let stats = throttler.stats();
994 assert!(stats.available_tokens < initial);
996 assert_eq!(stats.rate_bytes_per_sec, 10_000_000);
997 assert!(stats.enabled);
998 }
999
1000 #[test]
1001 fn test_adaptive_controller() {
1002 let controller = AdaptiveIoController::new(100_000_000, 10_000); controller.report_latency(50_000); let rate1 = controller.effective_rate();
1007
1008 controller.report_latency(50_000);
1009 let rate2 = controller.effective_rate();
1010
1011 assert!(rate2 <= rate1);
1013
1014 for _ in 0..20 {
1016 controller.report_latency(1_000); }
1018 let rate3 = controller.effective_rate();
1019
1020 assert!(rate3 >= rate2);
1022 }
1023}