1pub mod delay;
106pub mod integration;
107pub mod network;
108pub mod queue;
109pub mod queue_event;
110pub mod queue_peek;
111
112use std::{
113 cmp::Ordering,
114 slice,
115 time::{Duration, Instant},
116};
117
118use delay::agg_delay_on_blocking_expire;
119use integration::Integration;
120use log::debug;
121use network::{Network, NetworkBottleneck, WindowCount};
122use queue::SimQueue;
123
124use maybenot::{Framework, Machine, MachineId, Timer, TriggerAction, TriggerEvent};
125use rand::{RngCore, rngs::ThreadRng};
126use rand_xoshiro::Xoshiro256StarStar;
127use rand_xoshiro::rand_core::SeedableRng;
128
129use crate::{
130 network::sim_network_stack,
131 queue_peek::{
132 peek_blocked_exp, peek_queue, peek_scheduled_action, peek_scheduled_internal_timer,
133 },
134};
135
136#[derive(Debug)]
141enum RngSource {
142 Thread(ThreadRng),
143 Xoshiro(Xoshiro256StarStar),
144}
145
146impl RngCore for RngSource {
147 fn next_u32(&mut self) -> u32 {
148 match self {
149 RngSource::Thread(rng) => rng.next_u32(),
150 RngSource::Xoshiro(rng) => rng.next_u32(),
151 }
152 }
153
154 fn next_u64(&mut self) -> u64 {
155 match self {
156 RngSource::Thread(rng) => rng.next_u64(),
157 RngSource::Xoshiro(rng) => rng.next_u64(),
158 }
159 }
160
161 fn fill_bytes(&mut self, dest: &mut [u8]) {
162 match self {
163 RngSource::Thread(rng) => rng.fill_bytes(dest),
164 RngSource::Xoshiro(rng) => rng.fill_bytes(dest),
165 }
166 }
167}
168
169#[derive(PartialEq, Hash, Eq, Clone, Debug)]
173pub struct SimEvent {
174 pub event: TriggerEvent,
176 pub time: Instant,
178 pub integration_delay: Duration,
180 pub client: bool,
182 pub contains_padding: bool,
184 bypass: bool,
186 replace: bool,
188 pub debug_note: Option<String>,
190}
191
192fn event_to_usize(e: &TriggerEvent) -> usize {
194 match e {
195 TriggerEvent::TunnelSent => 0,
197 TriggerEvent::NormalSent => 1,
198 TriggerEvent::PaddingSent { .. } => 2,
199 TriggerEvent::TunnelRecv => 3,
200 TriggerEvent::NormalRecv => 4,
201 TriggerEvent::PaddingRecv => 5,
202 TriggerEvent::BlockingBegin { .. } => 6,
204 TriggerEvent::BlockingEnd => 7,
205 TriggerEvent::TimerBegin { .. } => 8,
206 TriggerEvent::TimerEnd { .. } => 9,
207 }
208}
209
210impl Ord for SimEvent {
212 fn cmp(&self, other: &Self) -> Ordering {
213 self.time
215 .cmp(&other.time)
216 .then_with(|| event_to_usize(&self.event).cmp(&event_to_usize(&other.event)))
217 .reverse()
218 }
219}
220
221impl PartialOrd for SimEvent {
222 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
223 Some(self.cmp(other))
224 }
225}
226
227#[derive(PartialEq, Clone, Debug)]
230pub struct ScheduledAction {
231 action: TriggerAction,
232 time: Instant,
233}
234
235#[derive(Debug)]
237pub struct SimState<M, R> {
238 framework: Framework<M, R>,
240 scheduled_action: Vec<Option<ScheduledAction>>,
242 scheduled_internal_timer: Vec<Option<Instant>>,
244 blocking_until: Option<Instant>,
246 blocking_bypassable: bool,
248 integration: Option<Integration>,
250}
251
252impl<M> SimState<M, RngSource>
253where
254 M: AsRef<[Machine]>,
255{
256 pub fn new(
257 machines: M,
258 current_time: Instant,
259 max_padding_frac: f64,
260 max_blocking_frac: f64,
261 integration: Option<Integration>,
262 insecure_rng_seed: Option<u64>,
263 ) -> Self {
264 let rng = match insecure_rng_seed {
265 Some(seed) => RngSource::Xoshiro(Xoshiro256StarStar::seed_from_u64(seed)),
267 None => RngSource::Thread(rand::rng()),
269 };
270
271 let num_machines = machines.as_ref().len();
272
273 Self {
274 framework: Framework::new(
275 machines,
276 max_padding_frac,
277 max_blocking_frac,
278 current_time,
279 rng,
280 )
281 .unwrap(),
282 scheduled_action: vec![None; num_machines],
283 scheduled_internal_timer: vec![None; num_machines],
284 blocking_until: None,
285 blocking_bypassable: false,
286 integration,
287 }
288 }
289
290 pub fn reporting_delay(&self) -> Duration {
291 self.integration
292 .as_ref()
293 .map(Integration::reporting_delay)
294 .unwrap_or(Duration::from_micros(0))
295 }
296
297 pub fn action_delay(&self) -> Duration {
298 self.integration
299 .as_ref()
300 .map(Integration::action_delay)
301 .unwrap_or(Duration::from_micros(0))
302 }
303
304 pub fn trigger_delay(&self) -> Duration {
305 self.integration
306 .as_ref()
307 .map(Integration::trigger_delay)
308 .unwrap_or(Duration::from_micros(0))
309 }
310}
311
312pub fn sim(
334 machines_client: &[Machine],
335 machines_server: &[Machine],
336 sq: &mut SimQueue,
337 delay: Duration,
338 max_trace_length: usize,
339 only_network_activity: bool,
340) -> Vec<SimEvent> {
341 let network = Network::new(delay, None);
342 let args = SimulatorArgs::new(network, max_trace_length, only_network_activity);
343 sim_advanced(machines_client, machines_server, sq, &args)
344}
345
346#[derive(Clone, Debug)]
348pub struct SimulatorArgs {
349 pub network: Network,
352 pub max_trace_length: usize,
354 pub max_sim_iterations: usize,
357 pub continue_after_all_normal_packets_processed: bool,
360 pub only_client_events: bool,
362 pub only_network_activity: bool,
365 pub max_padding_frac_client: f64,
368 pub max_blocking_frac_client: f64,
371 pub max_padding_frac_server: f64,
374 pub max_blocking_frac_server: f64,
377 pub insecure_rng_seed: Option<u64>,
380 pub client_integration: Option<Integration>,
382 pub server_integration: Option<Integration>,
384}
385
386impl SimulatorArgs {
387 pub fn new(network: Network, max_trace_length: usize, only_network_activity: bool) -> Self {
388 Self {
389 network,
390 max_trace_length,
391 max_sim_iterations: 0,
392 continue_after_all_normal_packets_processed: false,
393 only_client_events: false,
394 only_network_activity,
395 max_padding_frac_client: 0.0,
396 max_blocking_frac_client: 0.0,
397 max_padding_frac_server: 0.0,
398 max_blocking_frac_server: 0.0,
399 insecure_rng_seed: None,
400 client_integration: None,
401 server_integration: None,
402 }
403 }
404}
405
406pub fn sim_advanced(
410 machines_client: &[Machine],
411 machines_server: &[Machine],
412 sq: &mut SimQueue,
413 args: &SimulatorArgs,
414) -> Vec<SimEvent> {
415 let expected_trace_len = if args.max_trace_length > 0 {
417 args.max_trace_length
418 } else {
419 sq.len() * 2
421 };
422 let mut trace: Vec<SimEvent> = Vec::with_capacity(expected_trace_len);
423
424 let mut current_time = sq.get_first_time().unwrap();
426
427 let mut client = SimState::new(
428 machines_client,
429 current_time,
430 args.max_padding_frac_client,
431 args.max_blocking_frac_client,
432 args.clone().client_integration,
433 args.insecure_rng_seed,
434 );
435 let mut server = SimState::new(
436 machines_server,
437 current_time,
438 args.max_padding_frac_server,
439 args.max_blocking_frac_server,
440 args.clone().server_integration,
441 args.insecure_rng_seed.map(|seed| seed.wrapping_add(1)),
444 );
445 debug!("sim(): client machines {}", machines_client.len());
446 debug!("sim(): server machines {}", machines_server.len());
447
448 let mut network = NetworkBottleneck::new(args.network, Duration::from_secs(1), sq.max_pps);
449
450 let mut sim_iterations = 0;
451 let start_time = current_time;
452 while let Some(next) = pick_next(sq, &mut client, &mut server, &mut network, current_time) {
453 debug!("#########################################################");
454 debug!("sim(): main loop start");
455
456 match next.time.cmp(¤t_time) {
458 Ordering::Less => {
459 debug!("sim(): {current_time:#?}");
460 debug!("sim(): {:#?}", next.time);
461 panic!("bug: next event moves time backwards");
462 }
463 Ordering::Greater => {
464 debug!("sim(): time moved forward {:#?}", next.time - current_time);
465 current_time = next.time;
466 }
467 _ => {}
468 }
469
470 debug!(
472 "sim(): at time {:#?}, aggregate network base delay {:#?} @client and {:#?} @server",
473 current_time.duration_since(start_time),
474 network.client_aggregate_base_delay,
475 network.server_aggregate_base_delay,
476 );
477 if next.client {
478 debug!("sim(): @client next\n{next:#?}");
479 } else {
480 debug!("sim(): @server next\n{next:#?}");
481 }
482 if let Some(blocking_until) = client.blocking_until {
483 debug!(
484 "sim(): client is blocked until time {:#?}",
485 blocking_until.duration_since(start_time)
486 );
487 }
488 if let Some(blocking_until) = server.blocking_until {
489 debug!(
490 "sim(): server is blocked until time {:#?}",
491 blocking_until.duration_since(start_time)
492 );
493 }
494
495 let network_activity = if next.client {
499 sim_network_stack(&next, sq, &client, &mut server, &mut network, ¤t_time)
500 } else {
501 sim_network_stack(&next, sq, &server, &mut client, &mut network, ¤t_time)
502 };
503
504 if next.client {
506 debug!("sim(): trigger @client framework {:?}", next.event);
507 trigger_update(&mut client, &next, ¤t_time, sq, true);
508 } else {
509 debug!("sim(): trigger @server framework {:?}", next.event);
510 trigger_update(&mut server, &next, ¤t_time, sq, false);
511 }
512
513 if (!args.only_network_activity || network_activity)
516 && (!args.only_client_events || next.client)
517 {
518 let mut n = next.clone();
521 match next.event {
522 TriggerEvent::NormalSent => {
523 n.time -= n.integration_delay;
525 }
526 TriggerEvent::PaddingSent { .. } => {
527 n.time += n.integration_delay;
529 }
530 TriggerEvent::TunnelSent => {
531 if n.contains_padding {
532 n.time += n.integration_delay;
534 } else {
535 n.time -= n.integration_delay;
537 }
538 }
539 TriggerEvent::TunnelRecv | TriggerEvent::PaddingRecv | TriggerEvent::NormalRecv => {
540 n.time -= n.integration_delay;
542 }
543
544 _ => {}
545 }
546
547 n.debug_note = Some(format!(
548 "agg. delay {:?} @c, {:?} @s",
549 network.client_aggregate_base_delay, network.server_aggregate_base_delay
550 ));
551
552 trace.push(n);
553 }
554
555 if args.max_trace_length > 0 && trace.len() >= args.max_trace_length {
556 debug!(
557 "sim(): we done, reached max trace length {}",
558 args.max_trace_length
559 );
560 break;
561 }
562
563 sim_iterations += 1;
565 if args.max_sim_iterations > 0 && sim_iterations >= args.max_sim_iterations {
566 debug!(
567 "sim(): we done, reached max sim iterations {}",
568 args.max_sim_iterations
569 );
570 break;
571 }
572
573 if !args.continue_after_all_normal_packets_processed && sq.no_normal_packets() {
575 debug!("sim(): we done, all normal packets processed");
576 break;
577 }
578
579 debug!("sim(): main loop end, more work?");
580 debug!("#########################################################");
581 }
582
583 trace.sort_by(|a, b| a.time.cmp(&b.time));
585
586 trace
587}
588
589fn pick_next<M: AsRef<[Machine]>>(
590 sq: &mut SimQueue,
591 client: &mut SimState<M, RngSource>,
592 server: &mut SimState<M, RngSource>,
593 network: &mut NetworkBottleneck,
594 current_time: Instant,
595) -> Option<SimEvent> {
596 let s = peek_scheduled_action(
599 &client.scheduled_action,
600 &server.scheduled_action,
601 current_time,
602 );
603 debug!("\tpick_next(): peek_scheduled_action = {s:?}");
604
605 let i = peek_scheduled_internal_timer(
606 &client.scheduled_internal_timer,
607 &server.scheduled_internal_timer,
608 current_time,
609 );
610 debug!("\tpick_next(): peek_scheduled_internal_timer = {i:?}");
611
612 let (b, b_is_client) =
613 peek_blocked_exp(client.blocking_until, server.blocking_until, current_time);
614 debug!("\tpick_next(): peek_blocked_exp = {b:?}");
615
616 let n = network.peek_aggregate_delay(current_time);
617 debug!("\tpick_next(): peek_aggregate_delay = {n:?}");
618
619 let (q, qid, q_is_client) = peek_queue(
620 sq,
621 client,
622 server,
623 network.client_aggregate_base_delay,
624 network.server_aggregate_base_delay,
625 s.min(i).min(b).min(n),
626 current_time,
627 );
628 debug!("\tpick_next(): peek_queue = {q:?}");
629
630 if s == Duration::MAX
632 && i == Duration::MAX
633 && b == Duration::MAX
634 && n == Duration::MAX
635 && q == Duration::MAX
636 {
637 return None;
638 }
639
640 if n <= s && n <= i && n <= b && n <= q {
643 debug!("\tpick_next(): picked aggregate delay");
644 network.pop_aggregate_delay();
645 return pick_next(sq, client, server, network, current_time);
646 }
647
648 if b <= s && b <= i && b <= q {
650 debug!("\tpick_next(): picked blocking");
651 let delay: Duration;
654 if b_is_client {
655 delay = client.reporting_delay();
656 client.blocking_until = None;
657 } else {
658 delay = server.reporting_delay();
659 server.blocking_until = None;
660 }
661
662 let (blocking, _) = sq.peek_blocking(false, b_is_client);
665 if let Some(event) = blocking {
666 if event.time < current_time + b {
669 let time_of_expiry = current_time + b;
671 if let Some(blocked_duration) = agg_delay_on_blocking_expire(
672 sq,
673 b_is_client,
674 time_of_expiry,
675 event,
676 match b_is_client {
677 true => network.client_aggregate_base_delay,
678 false => network.server_aggregate_base_delay,
679 },
680 ) {
681 network.push_aggregate_delay(blocked_duration, &time_of_expiry, b_is_client);
682 }
683 }
684 }
685
686 let e = SimEvent {
687 client: b_is_client,
688 event: TriggerEvent::BlockingEnd,
689 time: current_time + b + delay,
690 integration_delay: delay,
691 bypass: false,
692 replace: false,
693 contains_padding: false,
694 debug_note: None,
695 };
696 if delay > Duration::default() {
697 sq.push_sim(e);
700 return pick_next(sq, client, server, network, current_time);
701 }
702 return Some(e);
703 }
704
705 if q <= s && q <= i {
709 debug!("\tpick_next(): picked queue, is_client {q_is_client}, queue {qid:?}");
710 let mut tmp = sq
711 .pop(
712 qid,
713 q_is_client,
714 if q_is_client {
715 network.client_aggregate_base_delay
716 } else {
717 network.server_aggregate_base_delay
718 },
719 )
720 .unwrap();
721 debug!("\tpick_next(): popped from queue {tmp:?}");
722 if current_time + q > tmp.time {
724 tmp.time = current_time + q;
726 }
727
728 return Some(tmp);
729 }
730
731 if i <= s {
734 debug!("\tpick_next(): picked internal timer");
735 let target = current_time + i;
736 let act = do_internal_timer(client, server, target);
737 if let Some(a) = act {
738 sq.push_sim(a.clone());
739 }
740 return pick_next(sq, client, server, network, current_time);
741 }
742
743 debug!("\tpick_next(): picked scheduled action");
746 let target = current_time + s;
747 let act = do_scheduled_action(client, server, target);
748 if let Some(a) = act {
749 sq.push_sim(a.clone());
750 }
751 pick_next(sq, client, server, network, current_time)
752}
753
754fn do_internal_timer<M: AsRef<[Machine]>>(
755 client: &mut SimState<M, RngSource>,
756 server: &mut SimState<M, RngSource>,
757 target: Instant,
758) -> Option<SimEvent> {
759 let mut machine: Option<MachineId> = None;
760 let mut is_client = false;
761
762 for (id, opt) in client.scheduled_internal_timer.iter_mut().enumerate() {
763 if let Some(a) = opt {
764 if *a == target {
765 machine = Some(MachineId::from_raw(id));
766 is_client = true;
767 *opt = None;
768 break;
769 }
770 }
771 }
772
773 if machine.is_none() {
774 for (id, opt) in server.scheduled_internal_timer.iter_mut().enumerate() {
775 if let Some(a) = opt {
776 if *a == target {
777 machine = Some(MachineId::from_raw(id));
778 is_client = false;
779 *opt = None;
780 break;
781 }
782 }
783 }
784 }
785
786 assert!(machine.is_some(), "bug: no internal action found");
787
788 Some(SimEvent {
790 client: is_client,
791 event: TriggerEvent::TimerEnd {
792 machine: machine.unwrap(),
793 },
794 time: target,
795 integration_delay: Duration::from_micros(0), bypass: false,
797 replace: false,
798 contains_padding: false,
799 debug_note: None,
800 })
801}
802
803fn do_scheduled_action<M: AsRef<[Machine]>>(
804 client: &mut SimState<M, RngSource>,
805 server: &mut SimState<M, RngSource>,
806 target: Instant,
807) -> Option<SimEvent> {
808 let mut a: Option<ScheduledAction> = None;
810 let mut is_client = false;
811
812 for opt in client.scheduled_action.iter_mut() {
813 if let Some(sa) = opt {
814 if sa.time == target {
815 a = Some(sa.clone());
816 is_client = true;
817 *opt = None;
818 break;
819 }
820 }
821 }
822
823 if a.is_none() {
825 for opt in server.scheduled_action.iter_mut() {
826 if let Some(sa) = opt {
827 if sa.time == target {
828 a = Some(sa.clone());
829 is_client = false;
830 *opt = None;
831 break;
832 }
833 }
834 }
835 }
836
837 assert!(a.is_some(), "bug: no action found");
839 let a = a.unwrap();
840
841 match a.action {
843 TriggerAction::Cancel { .. } => {
844 panic!("bug: cancel action in scheduled action");
846 }
847 TriggerAction::UpdateTimer { .. } => {
848 panic!("bug: update timer action in scheduled action");
850 }
851 TriggerAction::SendPadding {
852 timeout: _,
853 bypass,
854 replace,
855 machine,
856 } => {
857 let action_delay = if is_client {
858 client.action_delay()
859 } else {
860 server.action_delay()
861 };
862
863 Some(SimEvent {
864 event: TriggerEvent::PaddingSent { machine },
865 time: a.time,
866 integration_delay: action_delay,
867 client: is_client,
868 bypass,
869 replace,
870 contains_padding: true,
871 debug_note: None,
872 })
873 }
874 TriggerAction::BlockOutgoing {
875 timeout: _,
876 duration,
877 bypass,
878 replace,
879 machine,
880 } => {
881 let block = a.time + duration;
882 let event_bypass;
883 let total_delay = if is_client {
885 client.action_delay() + client.reporting_delay()
886 } else {
887 server.action_delay() + server.reporting_delay()
888 };
889 let reported = a.time + total_delay;
890
891 if is_client {
893 if replace || block > client.blocking_until.unwrap_or(a.time) {
894 client.blocking_until = Some(block);
895 client.blocking_bypassable = bypass;
896 }
897 event_bypass = client.blocking_bypassable;
898 } else {
899 if replace || block > server.blocking_until.unwrap_or(a.time) {
900 server.blocking_until = Some(block);
901 server.blocking_bypassable = bypass;
902 }
903 event_bypass = server.blocking_bypassable;
904 }
905
906 Some(SimEvent {
908 event: TriggerEvent::BlockingBegin { machine },
909 time: reported,
910 integration_delay: total_delay,
911 client: is_client,
912 bypass: event_bypass,
913 replace: false,
914 contains_padding: false,
915 debug_note: None,
916 })
917 }
918 }
919}
920
921fn trigger_update<M: AsRef<[Machine]>>(
922 state: &mut SimState<M, RngSource>,
923 next: &SimEvent,
924 current_time: &Instant,
925 sq: &mut SimQueue,
926 is_client: bool,
927) {
928 let trigger_delay = state.trigger_delay();
929
930 for action in state
932 .framework
933 .trigger_events(slice::from_ref(&next.event), *current_time)
934 {
935 match action {
936 TriggerAction::Cancel { machine, timer } => {
937 debug!("\ttrigger_update(): cancel action {machine:?} {timer:?}");
938 match timer {
941 Timer::Action => {
942 state.scheduled_action[machine.into_raw()] = None;
943 }
944 Timer::Internal => {
945 state.scheduled_internal_timer[machine.into_raw()] = None;
946 }
947 Timer::All => {
948 state.scheduled_action[machine.into_raw()] = None;
949 state.scheduled_internal_timer[machine.into_raw()] = None;
950 }
951 }
952 }
953 TriggerAction::SendPadding {
954 timeout,
955 bypass: _,
956 replace: _,
957 machine,
958 } => {
959 debug!("\ttrigger_update(): send padding action {timeout:?} {machine:?}");
960 state.scheduled_action[machine.into_raw()] = Some(ScheduledAction {
961 action: action.clone(),
962 time: *current_time + *timeout + trigger_delay,
963 });
964 }
965 TriggerAction::BlockOutgoing {
966 timeout,
967 duration: _,
968 bypass: _,
969 replace: _,
970 machine,
971 } => {
972 debug!("\ttrigger_update(): block outgoing action {timeout:?} {machine:?}");
973 state.scheduled_action[machine.into_raw()] = Some(ScheduledAction {
974 action: action.clone(),
975 time: *current_time + *timeout + trigger_delay,
976 });
977 }
978 TriggerAction::UpdateTimer {
979 duration,
980 replace,
981 machine,
982 } => {
983 debug!("\ttrigger_update(): update timer action {duration:?} {machine:?}");
984 let current =
986 state.scheduled_internal_timer[machine.into_raw()].unwrap_or(*current_time);
987
988 if *replace || current < *current_time + *duration {
990 state.scheduled_internal_timer[machine.into_raw()] =
991 Some(*current_time + *duration);
992 sq.push_sim(SimEvent {
994 client: is_client,
995 event: TriggerEvent::TimerBegin { machine: *machine },
996 time: *current_time,
997 integration_delay: Duration::from_micros(0), bypass: false,
999 replace: false,
1000 contains_padding: false,
1001 debug_note: None,
1002 });
1003 }
1004 }
1005 }
1006 }
1007}
1008
1009pub fn parse_trace(trace: &str, network: Network) -> SimQueue {
1018 parse_trace_advanced(trace, network, None, None)
1019}
1020
1021pub fn parse_trace_advanced(
1022 trace: &str,
1023 network: Network,
1024 client: Option<&Integration>,
1025 server: Option<&Integration>,
1026) -> SimQueue {
1027 let mut sq = SimQueue::new();
1028 let mut sent_window = WindowCount::new(Duration::from_millis(100));
1031 let mut recv_window = WindowCount::new(Duration::from_millis(100));
1032 let mut sent_max_pps = 0;
1033 let mut recv_max_pps = 0;
1034
1035 let starting_time = Instant::now();
1038
1039 for l in trace.lines() {
1040 let parts: Vec<&str> = l.split(',').collect();
1041 if parts.len() >= 2 {
1042 let timestamp =
1043 starting_time + Duration::from_nanos(parts[0].trim().parse::<u64>().unwrap());
1044 match parts[1] {
1051 "s" | "sn" => {
1052 let reporting_delay = client
1054 .map(Integration::reporting_delay)
1055 .unwrap_or(Duration::from_micros(0));
1056 let reported = timestamp + reporting_delay;
1057 sq.push(
1058 TriggerEvent::NormalSent,
1059 true,
1060 false,
1061 reported,
1062 reporting_delay,
1063 );
1064
1065 let m = sent_window.add(×tamp);
1066 if m > sent_max_pps {
1067 sent_max_pps = m;
1068 }
1069 }
1070 "r" | "rn" => {
1071 let sent = timestamp - network.delay;
1073 let reporting_delay = server
1075 .map(Integration::reporting_delay)
1076 .unwrap_or(Duration::from_micros(0));
1077 let reported = sent + reporting_delay;
1078 sq.push(
1079 TriggerEvent::NormalSent,
1080 false,
1081 false,
1082 reported,
1083 reporting_delay,
1084 );
1085
1086 let m = recv_window.add(×tamp);
1087 if m > recv_max_pps {
1088 recv_max_pps = m;
1089 }
1090 }
1091 "sp" | "rp" => {
1092 }
1094 _ => {
1095 panic!("invalid direction")
1096 }
1097 }
1098 }
1099 }
1100
1101 sq.max_pps = Some(sent_max_pps.max(recv_max_pps) * 10);
1102
1103 sq
1104}