1use crossbeam_channel::{Receiver, Sender, bounded};
47use std::cmp::Ordering;
48use std::collections::BinaryHeap;
49use std::sync::Arc;
50use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
51#[cfg(test)]
52use std::thread;
53
54#[derive(Clone, Debug)]
56pub struct MergeEdge {
57 pub edge_id: u128,
59 pub timestamp_us: u64,
61 pub is_tombstone: bool,
63 pub data: [u8; 128],
65 pub source_idx: usize,
67}
68
69impl PartialEq for MergeEdge {
70 fn eq(&self, other: &Self) -> bool {
71 self.timestamp_us == other.timestamp_us && self.edge_id == other.edge_id
72 }
73}
74
75impl Eq for MergeEdge {}
76
77impl PartialOrd for MergeEdge {
78 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
79 Some(self.cmp(other))
80 }
81}
82
83impl Ord for MergeEdge {
84 fn cmp(&self, other: &Self) -> Ordering {
85 match other.timestamp_us.cmp(&self.timestamp_us) {
87 Ordering::Equal => other.edge_id.cmp(&self.edge_id),
88 ord => ord,
89 }
90 }
91}
92
93struct HeapEntry {
95 edge: MergeEdge,
96 source_idx: usize,
97}
98
99impl PartialEq for HeapEntry {
100 fn eq(&self, other: &Self) -> bool {
101 self.edge == other.edge
102 }
103}
104
105impl Eq for HeapEntry {}
106
107impl PartialOrd for HeapEntry {
108 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
109 Some(self.cmp(other))
110 }
111}
112
113impl Ord for HeapEntry {
114 fn cmp(&self, other: &Self) -> Ordering {
115 self.edge.cmp(&other.edge)
116 }
117}
118
119#[derive(Debug, Clone)]
121pub struct ParallelMergeConfig {
122 pub reader_threads: usize,
124 pub merger_threads: usize,
126 pub channel_buffer_size: usize,
128 pub read_batch_size: usize,
130}
131
132impl Default for ParallelMergeConfig {
133 fn default() -> Self {
134 let num_cpus = num_cpus::get();
135 Self {
136 reader_threads: (num_cpus / 2).max(1),
137 merger_threads: (num_cpus / 2).max(1),
138 channel_buffer_size: 1024,
139 read_batch_size: 256,
140 }
141 }
142}
143
144impl ParallelMergeConfig {
145 pub fn for_inputs(num_inputs: usize) -> Self {
147 let num_cpus = num_cpus::get();
148 Self {
149 reader_threads: num_inputs.min(num_cpus),
150 merger_threads: (num_cpus / 2).max(1),
151 channel_buffer_size: 1024,
152 read_batch_size: 256,
153 }
154 }
155}
156
157#[derive(Debug, Default)]
159pub struct ParallelMergeStats {
160 pub edges_read: AtomicU64,
162 pub edges_written: AtomicU64,
164 pub tombstones_filtered: AtomicU64,
166 pub duplicates_merged: AtomicU64,
168}
169
170impl ParallelMergeStats {
171 pub fn new() -> Arc<Self> {
173 Arc::new(Self::default())
174 }
175
176 pub fn record_read(&self, count: u64) {
178 self.edges_read.fetch_add(count, AtomicOrdering::Relaxed);
179 }
180
181 pub fn record_written(&self, count: u64) {
183 self.edges_written.fetch_add(count, AtomicOrdering::Relaxed);
184 }
185
186 pub fn record_tombstone(&self) {
188 self.tombstones_filtered
189 .fetch_add(1, AtomicOrdering::Relaxed);
190 }
191
192 pub fn record_duplicate(&self) {
194 self.duplicates_merged.fetch_add(1, AtomicOrdering::Relaxed);
195 }
196
197 pub fn snapshot(&self) -> ParallelMergeStatsSnapshot {
199 ParallelMergeStatsSnapshot {
200 edges_read: self.edges_read.load(AtomicOrdering::Relaxed),
201 edges_written: self.edges_written.load(AtomicOrdering::Relaxed),
202 tombstones_filtered: self.tombstones_filtered.load(AtomicOrdering::Relaxed),
203 duplicates_merged: self.duplicates_merged.load(AtomicOrdering::Relaxed),
204 }
205 }
206}
207
208#[derive(Debug, Clone)]
210pub struct ParallelMergeStatsSnapshot {
211 pub edges_read: u64,
212 pub edges_written: u64,
213 pub tombstones_filtered: u64,
214 pub duplicates_merged: u64,
215}
216
217#[derive(Debug)]
237pub struct IoThrottler {
238 tokens: AtomicU64,
240 capacity: u64,
242 refill_rate: u64,
244 last_refill: AtomicU64,
246 total_throttled: AtomicU64,
248 total_wait_us: AtomicU64,
250 enabled: std::sync::atomic::AtomicBool,
252}
253
254impl IoThrottler {
255 pub fn new(rate_bytes_per_sec: u64, burst_bytes: u64) -> Self {
261 Self {
262 tokens: AtomicU64::new(burst_bytes),
263 capacity: burst_bytes,
264 refill_rate: rate_bytes_per_sec,
265 last_refill: AtomicU64::new(Self::now_us()),
266 total_throttled: AtomicU64::new(0),
267 total_wait_us: AtomicU64::new(0),
268 enabled: std::sync::atomic::AtomicBool::new(true),
269 }
270 }
271
272 pub fn for_compaction() -> Self {
276 Self::new(100 * 1024 * 1024, 10 * 1024 * 1024)
277 }
278
279 pub fn unlimited() -> Self {
281 let throttler = Self::new(u64::MAX, u64::MAX);
282 throttler.enabled.store(false, AtomicOrdering::Relaxed);
283 throttler
284 }
285
286 fn now_us() -> u64 {
287 std::time::SystemTime::now()
288 .duration_since(std::time::UNIX_EPOCH)
289 .unwrap()
290 .as_micros() as u64
291 }
292
293 fn refill(&self) {
295 let now = Self::now_us();
296 let last = self.last_refill.swap(now, AtomicOrdering::AcqRel);
297 let elapsed_us = now.saturating_sub(last);
298
299 if elapsed_us > 0 {
300 let tokens_to_add = (elapsed_us as u128 * self.refill_rate as u128 / 1_000_000) as u64;
303
304 if tokens_to_add > 0 {
305 let current = self.tokens.load(AtomicOrdering::Relaxed);
306 let new_tokens = current.saturating_add(tokens_to_add).min(self.capacity);
307 self.tokens.store(new_tokens, AtomicOrdering::Release);
308 }
309 }
310 }
311
312 pub fn acquire(&self, bytes: u64) -> u64 {
316 if !self.enabled.load(AtomicOrdering::Relaxed) {
317 return 0;
318 }
319
320 let mut total_wait = 0u64;
321
322 loop {
323 self.refill();
324
325 let current = self.tokens.load(AtomicOrdering::Acquire);
326
327 if current >= bytes {
328 match self.tokens.compare_exchange_weak(
330 current,
331 current - bytes,
332 AtomicOrdering::AcqRel,
333 AtomicOrdering::Acquire,
334 ) {
335 Ok(_) => {
336 if total_wait > 0 {
337 self.total_wait_us
338 .fetch_add(total_wait, AtomicOrdering::Relaxed);
339 self.total_throttled
340 .fetch_add(bytes, AtomicOrdering::Relaxed);
341 }
342 return total_wait;
343 }
344 Err(_) => continue, }
346 }
347
348 let deficit = bytes.saturating_sub(current);
351 let wait_us = (deficit as u128 * 1_000_000 / self.refill_rate as u128) as u64;
352 let wait_us = wait_us.clamp(100, 100_000); std::thread::sleep(std::time::Duration::from_micros(wait_us));
355 total_wait += wait_us;
356 }
357 }
358
359 pub fn try_acquire(&self, bytes: u64) -> bool {
363 if !self.enabled.load(AtomicOrdering::Relaxed) {
364 return true;
365 }
366
367 self.refill();
368
369 let current = self.tokens.load(AtomicOrdering::Acquire);
370 if current >= bytes {
371 self.tokens
372 .compare_exchange_weak(
373 current,
374 current - bytes,
375 AtomicOrdering::AcqRel,
376 AtomicOrdering::Acquire,
377 )
378 .is_ok()
379 } else {
380 false
381 }
382 }
383
384 pub fn set_enabled(&self, enabled: bool) {
386 self.enabled.store(enabled, AtomicOrdering::Release);
387 }
388
389 pub fn is_enabled(&self) -> bool {
391 self.enabled.load(AtomicOrdering::Relaxed)
392 }
393
394 pub fn set_rate(&mut self, rate_bytes_per_sec: u64) {
396 self.refill_rate = rate_bytes_per_sec;
397 }
398
399 pub fn available_tokens(&self) -> u64 {
401 self.refill();
402 self.tokens.load(AtomicOrdering::Relaxed)
403 }
404
405 pub fn stats(&self) -> IoThrottlerStats {
407 IoThrottlerStats {
408 total_throttled_bytes: self.total_throttled.load(AtomicOrdering::Relaxed),
409 total_wait_us: self.total_wait_us.load(AtomicOrdering::Relaxed),
410 available_tokens: self.available_tokens(),
411 rate_bytes_per_sec: self.refill_rate,
412 enabled: self.is_enabled(),
413 }
414 }
415}
416
417#[derive(Debug, Clone)]
419pub struct IoThrottlerStats {
420 pub total_throttled_bytes: u64,
422 pub total_wait_us: u64,
424 pub available_tokens: u64,
426 pub rate_bytes_per_sec: u64,
428 pub enabled: bool,
430}
431
432#[derive(Debug)]
437pub struct AdaptiveIoController {
438 throttler: IoThrottler,
440 target_latency_us: u64,
442 min_rate: u64,
444 max_rate: u64,
446 rate_multiplier: std::sync::atomic::AtomicU64,
448}
449
450impl AdaptiveIoController {
451 pub fn new(base_rate: u64, target_latency_us: u64) -> Self {
453 Self {
454 throttler: IoThrottler::new(base_rate, base_rate / 10),
455 target_latency_us,
456 min_rate: base_rate / 10,
457 max_rate: base_rate,
458 rate_multiplier: std::sync::atomic::AtomicU64::new(1000), }
460 }
461
462 pub fn report_latency(&self, latency_us: u64) {
466 let current_mult = self.rate_multiplier.load(AtomicOrdering::Relaxed);
467
468 let new_mult = if latency_us > self.target_latency_us * 2 {
469 (current_mult * 8 / 10).max(100) } else if latency_us > self.target_latency_us {
472 (current_mult * 95 / 100).max(100) } else if latency_us < self.target_latency_us / 2 && current_mult < 1000 {
475 (current_mult * 105 / 100).min(1000) } else {
478 current_mult
479 };
480
481 self.rate_multiplier
482 .store(new_mult, AtomicOrdering::Relaxed);
483 }
484
485 pub fn effective_rate(&self) -> u64 {
487 let mult = self.rate_multiplier.load(AtomicOrdering::Relaxed);
488 let rate = (self.max_rate as u128 * mult as u128 / 1000) as u64;
489 rate.max(self.min_rate).min(self.max_rate)
490 }
491
492 pub fn acquire(&self, bytes: u64) -> u64 {
494 self.throttler.acquire(bytes)
495 }
496
497 pub fn throttler(&self) -> &IoThrottler {
499 &self.throttler
500 }
501}
502
503pub struct ParallelReader {
505 sender: Sender<MergeEdge>,
507 source_idx: usize,
509 stats: Arc<ParallelMergeStats>,
511}
512
513impl ParallelReader {
514 pub fn new(
516 sender: Sender<MergeEdge>,
517 source_idx: usize,
518 stats: Arc<ParallelMergeStats>,
519 ) -> Self {
520 Self {
521 sender,
522 source_idx,
523 stats,
524 }
525 }
526
527 #[allow(clippy::result_large_err)]
529 pub fn send(&self, edge: MergeEdge) -> Result<(), crossbeam_channel::SendError<MergeEdge>> {
530 self.stats.record_read(1);
531 self.sender.send(edge)
532 }
533
534 pub fn source_idx(&self) -> usize {
536 self.source_idx
537 }
538}
539
540pub struct ParallelMerger {
542 receivers: Vec<Receiver<MergeEdge>>,
544 #[allow(dead_code)]
546 config: ParallelMergeConfig,
547 stats: Arc<ParallelMergeStats>,
549}
550
551impl ParallelMerger {
552 pub fn create_channels(
554 num_inputs: usize,
555 config: &ParallelMergeConfig,
556 stats: Arc<ParallelMergeStats>,
557 ) -> (Vec<ParallelReader>, Self) {
558 let mut senders = Vec::with_capacity(num_inputs);
559 let mut receivers = Vec::with_capacity(num_inputs);
560
561 for i in 0..num_inputs {
562 let (tx, rx) = bounded(config.channel_buffer_size);
563 senders.push(ParallelReader::new(tx, i, stats.clone()));
564 receivers.push(rx);
565 }
566
567 let merger = Self {
568 receivers,
569 config: config.clone(),
570 stats,
571 };
572
573 (senders, merger)
574 }
575
576 pub fn merge(self) -> Vec<MergeEdge> {
580 let mut heap: BinaryHeap<HeapEntry> = BinaryHeap::new();
581 let mut result = Vec::new();
582 let mut last_key: Option<(u64, u128)> = None;
583
584 for (idx, rx) in self.receivers.iter().enumerate() {
586 if let Ok(edge) = rx.recv() {
587 heap.push(HeapEntry {
588 edge,
589 source_idx: idx,
590 });
591 }
592 }
593
594 while let Some(entry) = heap.pop() {
595 let edge = entry.edge;
596 let source_idx = entry.source_idx;
597
598 let key = (edge.timestamp_us, edge.edge_id);
600 let is_duplicate = last_key.map(|k| k == key).unwrap_or(false);
601
602 if is_duplicate {
603 self.stats.record_duplicate();
604 } else if edge.is_tombstone {
605 self.stats.record_tombstone();
607 result.push(edge.clone());
608 self.stats.record_written(1);
609 } else {
610 result.push(edge.clone());
611 self.stats.record_written(1);
612 }
613
614 last_key = Some(key);
615
616 if let Ok(next_edge) = self.receivers[source_idx].recv() {
618 heap.push(HeapEntry {
619 edge: next_edge,
620 source_idx,
621 });
622 }
623 }
624
625 result
626 }
627
628 pub fn merge_with_callback<F>(self, mut callback: F)
630 where
631 F: FnMut(MergeEdge),
632 {
633 let mut heap: BinaryHeap<HeapEntry> = BinaryHeap::new();
634 let mut last_key: Option<(u64, u128)> = None;
635
636 for (idx, rx) in self.receivers.iter().enumerate() {
638 if let Ok(edge) = rx.recv() {
639 heap.push(HeapEntry {
640 edge,
641 source_idx: idx,
642 });
643 }
644 }
645
646 while let Some(entry) = heap.pop() {
647 let edge = entry.edge;
648 let source_idx = entry.source_idx;
649
650 let key = (edge.timestamp_us, edge.edge_id);
651 let is_duplicate = last_key.map(|k| k == key).unwrap_or(false);
652
653 if is_duplicate {
654 self.stats.record_duplicate();
655 } else {
656 if edge.is_tombstone {
657 self.stats.record_tombstone();
658 }
659 callback(edge.clone());
660 self.stats.record_written(1);
661 }
662
663 last_key = Some(key);
664
665 if let Ok(next_edge) = self.receivers[source_idx].recv() {
667 heap.push(HeapEntry {
668 edge: next_edge,
669 source_idx,
670 });
671 }
672 }
673 }
674}
675
676#[derive(Debug, Clone)]
678pub struct KeyRange {
679 pub min_ts: u64,
681 pub max_ts: u64,
683}
684
685impl KeyRange {
686 pub fn new(min_ts: u64, max_ts: u64) -> Self {
688 Self { min_ts, max_ts }
689 }
690
691 pub fn contains(&self, ts: u64) -> bool {
693 ts >= self.min_ts && ts < self.max_ts
694 }
695}
696
697pub fn partition_key_space(min_ts: u64, max_ts: u64, num_partitions: usize) -> Vec<KeyRange> {
699 if num_partitions == 0 || max_ts <= min_ts {
700 return vec![KeyRange::new(min_ts, max_ts)];
701 }
702
703 let range = max_ts - min_ts;
704 let partition_size = range / num_partitions as u64;
705
706 (0..num_partitions)
707 .map(|i| {
708 let start = min_ts + (i as u64 * partition_size);
709 let end = if i == num_partitions - 1 {
710 max_ts
711 } else {
712 min_ts + ((i as u64 + 1) * partition_size)
713 };
714 KeyRange::new(start, end)
715 })
716 .collect()
717}
718
719pub struct ParallelMergeBuilder {
721 config: ParallelMergeConfig,
722 stats: Arc<ParallelMergeStats>,
723}
724
725impl ParallelMergeBuilder {
726 pub fn new() -> Self {
728 Self {
729 config: ParallelMergeConfig::default(),
730 stats: ParallelMergeStats::new(),
731 }
732 }
733
734 pub fn config(mut self, config: ParallelMergeConfig) -> Self {
736 self.config = config;
737 self
738 }
739
740 pub fn stats(mut self, stats: Arc<ParallelMergeStats>) -> Self {
742 self.stats = stats;
743 self
744 }
745
746 pub fn build(self, num_inputs: usize) -> (Vec<ParallelReader>, ParallelMerger) {
748 ParallelMerger::create_channels(num_inputs, &self.config, self.stats)
749 }
750}
751
752impl Default for ParallelMergeBuilder {
753 fn default() -> Self {
754 Self::new()
755 }
756}
757
758#[cfg(test)]
759mod tests {
760 use super::*;
761 use std::sync::atomic::AtomicUsize;
762
763 fn create_test_edge(edge_id: u128, timestamp_us: u64, is_tombstone: bool) -> MergeEdge {
764 MergeEdge {
765 edge_id,
766 timestamp_us,
767 is_tombstone,
768 data: [0u8; 128],
769 source_idx: 0,
770 }
771 }
772
773 #[test]
774 fn test_merge_edge_ordering() {
775 let e1 = create_test_edge(1, 1000, false);
776 let e2 = create_test_edge(2, 1000, false);
777 let e3 = create_test_edge(1, 2000, false);
778
779 assert!(e1 > e2); assert!(e1 > e3); }
783
784 #[test]
785 fn test_parallel_merge_basic() {
786 let stats = ParallelMergeStats::new();
787 let config = ParallelMergeConfig {
788 reader_threads: 2,
789 merger_threads: 1,
790 channel_buffer_size: 100,
791 read_batch_size: 10,
792 };
793
794 let (readers, merger) = ParallelMerger::create_channels(3, &config, stats.clone());
795
796 let handles: Vec<_> = readers
798 .into_iter()
799 .enumerate()
800 .map(|(idx, reader)| {
801 thread::spawn(move || {
802 for i in 0..10u64 {
803 let edge = MergeEdge {
804 edge_id: (idx * 100 + i as usize) as u128,
805 timestamp_us: i * 100 + idx as u64, is_tombstone: false,
807 data: [0u8; 128],
808 source_idx: idx,
809 };
810 reader.send(edge).unwrap();
811 }
812 })
813 })
814 .collect();
815
816 for h in handles {
818 h.join().unwrap();
819 }
820
821 let result = merger.merge();
823
824 assert_eq!(result.len(), 30);
826
827 for i in 1..result.len() {
829 let prev = &result[i - 1];
830 let curr = &result[i];
831 assert!(
832 prev.timestamp_us < curr.timestamp_us
833 || (prev.timestamp_us == curr.timestamp_us && prev.edge_id < curr.edge_id),
834 "Results should be sorted"
835 );
836 }
837
838 let snapshot = stats.snapshot();
840 assert_eq!(snapshot.edges_read, 30);
841 assert_eq!(snapshot.edges_written, 30);
842 }
843
844 #[test]
845 fn test_parallel_merge_duplicates() {
846 let stats = ParallelMergeStats::new();
847 let config = ParallelMergeConfig::default();
848 let (readers, merger) = ParallelMerger::create_channels(2, &config, stats.clone());
849
850 let handles: Vec<_> = readers
852 .into_iter()
853 .map(|reader| {
854 thread::spawn(move || {
855 let edge = create_test_edge(42, 1000, false);
856 reader.send(edge).unwrap();
857 })
858 })
859 .collect();
860
861 for h in handles {
862 h.join().unwrap();
863 }
864
865 let result = merger.merge();
866
867 assert_eq!(result.len(), 1);
869
870 let snapshot = stats.snapshot();
871 assert_eq!(snapshot.edges_read, 2);
872 assert_eq!(snapshot.duplicates_merged, 1);
873 }
874
875 #[test]
876 fn test_partition_key_space() {
877 let partitions = partition_key_space(0, 1000, 4);
878
879 assert_eq!(partitions.len(), 4);
880 assert_eq!(partitions[0].min_ts, 0);
881 assert_eq!(partitions[0].max_ts, 250);
882 assert_eq!(partitions[1].min_ts, 250);
883 assert_eq!(partitions[1].max_ts, 500);
884 assert_eq!(partitions[2].min_ts, 500);
885 assert_eq!(partitions[2].max_ts, 750);
886 assert_eq!(partitions[3].min_ts, 750);
887 assert_eq!(partitions[3].max_ts, 1000);
888 }
889
890 #[test]
891 fn test_key_range_contains() {
892 let range = KeyRange::new(100, 200);
893
894 assert!(!range.contains(99));
895 assert!(range.contains(100));
896 assert!(range.contains(150));
897 assert!(!range.contains(200));
898 }
899
900 #[test]
901 fn test_merge_with_callback() {
902 let stats = ParallelMergeStats::new();
903 let config = ParallelMergeConfig::default();
904 let (readers, merger) = ParallelMerger::create_channels(2, &config, stats.clone());
905
906 let handles: Vec<_> = readers
907 .into_iter()
908 .enumerate()
909 .map(|(idx, reader)| {
910 thread::spawn(move || {
911 for i in 0..5u64 {
912 let edge =
913 create_test_edge((idx * 10 + i as usize) as u128, i * 100, false);
914 reader.send(edge).unwrap();
915 }
916 })
917 })
918 .collect();
919
920 for h in handles {
921 h.join().unwrap();
922 }
923
924 let count = Arc::new(AtomicUsize::new(0));
925 let count_clone = count.clone();
926
927 merger.merge_with_callback(move |_edge| {
928 count_clone.fetch_add(1, AtomicOrdering::Relaxed);
929 });
930
931 assert_eq!(count.load(AtomicOrdering::Relaxed), 10);
932 }
933
934 #[test]
935 fn test_parallel_merge_config() {
936 let config = ParallelMergeConfig::for_inputs(8);
937 assert!(config.reader_threads >= 1);
938 assert!(config.merger_threads >= 1);
939
940 let default = ParallelMergeConfig::default();
941 assert!(default.channel_buffer_size > 0);
942 assert!(default.read_batch_size > 0);
943 }
944
945 #[test]
948 fn test_throttler_basic() {
949 let throttler = IoThrottler::new(1_000_000, 100_000); assert!(throttler.try_acquire(50_000));
953 assert!(throttler.try_acquire(50_000));
954
955 assert!(!throttler.try_acquire(50_000));
957 }
958
959 #[test]
960 fn test_throttler_unlimited() {
961 let throttler = IoThrottler::unlimited();
962
963 assert!(throttler.try_acquire(1_000_000_000));
965 assert!(!throttler.is_enabled());
966 }
967
968 #[test]
969 fn test_throttler_blocking_acquire() {
970 let throttler = IoThrottler::new(1_000, 100); assert!(throttler.try_acquire(100));
975
976 let start = std::time::Instant::now();
978 let _wait = throttler.acquire(100);
979 let elapsed = start.elapsed();
980
981 assert!(elapsed.as_millis() >= 10 || throttler.stats().total_wait_us > 0);
984 }
985
986 #[test]
987 fn test_throttler_stats() {
988 let throttler = IoThrottler::new(10_000_000, 10_000);
989
990 let initial = throttler.stats().available_tokens;
992
993 throttler.try_acquire(5_000);
994 throttler.try_acquire(3_000);
995
996 let stats = throttler.stats();
997 assert!(stats.available_tokens < initial);
999 assert_eq!(stats.rate_bytes_per_sec, 10_000_000);
1000 assert!(stats.enabled);
1001 }
1002
1003 #[test]
1004 fn test_adaptive_controller() {
1005 let controller = AdaptiveIoController::new(100_000_000, 10_000); controller.report_latency(50_000); let rate1 = controller.effective_rate();
1010
1011 controller.report_latency(50_000);
1012 let rate2 = controller.effective_rate();
1013
1014 assert!(rate2 <= rate1);
1016
1017 for _ in 0..20 {
1019 controller.report_latency(1_000); }
1021 let rate3 = controller.effective_rate();
1022
1023 assert!(rate3 >= rate2);
1025 }
1026}