maybenot_simulator/
lib.rs

1//! A simulator for the Maybenot framework. The [`Maybenot`](maybenot) framework
2//! is intended for traffic analysis defenses that can be used to hide patterns
3//! in encrypted communication. The goal of the simulator is to assist in the
4//! development of such defenses.
5//!
6//! The simulator consists of two core functions: [`parse_trace`] and [`sim`].
7//! The intended use is to first parse a trace (e.g., from a pcap file or a
8//! Website Fingerprinting dataset) using [`parse_trace`], and then simulate the
9//! trace using [`sim`] together with one or more Maybenot
10//! [`Machines`](maybenot::Machine) running at the client and/or
11//! server. The output of the simulator can then be parsed to produce a
12//! simulated trace that then in turn can be used to, e.g., train a Website
13//! Fingerprinting attack.
14//!
15//! ## Example usage
16//! ```
17//! use maybenot::{event::TriggerEvent, Machine};
18//! use maybenot_simulator::{network::Network, parse_trace, sim};
19//! use std::{str::FromStr, time::Duration};
20//!
21//! // The first ten packets of a network trace from the client's perspective
22//! // when visiting google.com. The format is: "time,direction\n". The
23//! // direction is either "s" (sent) or "r" (received). The time is in
24//! // nanoseconds since the start of the trace.
25//! let raw_trace = "0,s
26//! 19714282,r
27//! 183976147,s
28//! 243699564,r
29//! 1696037773,s
30//! 2047985926,s
31//! 2055955094,r
32//! 9401039609,s
33//! 9401094589,s
34//! 9420892765,r";
35//!
36//! // The network model for simulating the network between the client and the
37//! // server. Currently just a delay.
38//! let network = Network::new(Duration::from_millis(10), None);
39//!
40//! // Parse the raw trace into a queue of events for the simulator. This uses
41//! // the delay to generate a queue of events at the client and server in such
42//! // a way that the client is ensured to get the packets in the same order and
43//! // at the same time as in the raw trace.
44//! let mut input_trace = parse_trace(raw_trace, network);
45//!
46//! // A simple machine that sends one padding packet 20 milliseconds after the
47//! // first normal packet is sent.
48//! let m = "02eNp1ibEJAEAIA5Nf7B3N0v1cSESwEL0m5A6YvBqSgP7WeXfM5UoBW7ICYg==";
49//! let m = Machine::from_str(m).unwrap();
50//!
51//! // Run the simulator with the machine at the client. Run the simulation up
52//! // until 100 packets have been recorded (total, client and server).
53//! let trace = sim(&[m], &[], &mut input_trace, network.delay, 100, true);
54//!
55//! // print packets from the client's perspective
56//! let starting_time = trace[0].time;
57//! trace
58//!     .into_iter()
59//!     .filter(|p| p.client)
60//!     .for_each(|p| match p.event {
61//!         TriggerEvent::TunnelSent => {
62//!             if p.contains_padding {
63//!                 println!(
64//!                     "sent a padding packet at {} ms",
65//!                     (p.time - starting_time).as_millis()
66//!                 );
67//!             } else {
68//!                 println!(
69//!                     "sent a normal packet at {} ms",
70//!                     (p.time - starting_time).as_millis()
71//!                 );
72//!             }
73//!         }
74//!         TriggerEvent::TunnelRecv => {
75//!             if p.contains_padding {
76//!                 println!(
77//!                     "received a padding packet at {} ms",
78//!                     (p.time - starting_time).as_millis()
79//!                 );
80//!             } else {
81//!                 println!(
82//!                     "received a normal packet at {} ms",
83//!                     (p.time - starting_time).as_millis()
84//!                 );
85//!             }
86//!         }
87//!         _ => {}
88//!     });
89
90//!
91//! // Output:
92//! // sent a normal packet at 0 ms
93//! // received a normal packet at 19 ms
94//! // sent a padding packet at 20 ms
95//! // sent a normal packet at 183 ms
96//! // received a normal packet at 243 ms
97//! // sent a normal packet at 1696 ms
98//! // sent a normal packet at 2047 ms
99//! // received a normal packet at 2055 ms
100//! // sent a normal packet at 9401 ms
101//! // sent a normal packet at 9401 ms
102//! // received a normal packet at 9420 ms
103//! ```
104
105pub mod delay;
106pub mod integration;
107pub mod network;
108pub mod queue;
109pub mod queue_event;
110pub mod queue_peek;
111
112use std::{
113    cmp::Ordering,
114    slice,
115    time::{Duration, Instant},
116};
117
118use delay::agg_delay_on_blocking_expire;
119use integration::Integration;
120use log::debug;
121use network::{Network, NetworkBottleneck, WindowCount};
122use queue::SimQueue;
123
124use maybenot::{Framework, Machine, MachineId, Timer, TriggerAction, TriggerEvent};
125use rand::{RngCore, rngs::ThreadRng};
126use rand_xoshiro::Xoshiro256StarStar;
127use rand_xoshiro::rand_core::SeedableRng;
128
129use crate::{
130    network::sim_network_stack,
131    queue_peek::{
132        peek_blocked_exp, peek_queue, peek_scheduled_action, peek_scheduled_internal_timer,
133    },
134};
135
136// Enum to encapsulate different RngCore sources: in the Maybenot Framework, the
137// RngCore trait is not ?Sized (unnecessary overhead for the framework), so we
138// have to work around this by using an enum to support selecting rng source as
139// a simulation option.
140#[derive(Debug)]
141enum RngSource {
142    Thread(ThreadRng),
143    Xoshiro(Xoshiro256StarStar),
144}
145
146impl RngCore for RngSource {
147    fn next_u32(&mut self) -> u32 {
148        match self {
149            RngSource::Thread(rng) => rng.next_u32(),
150            RngSource::Xoshiro(rng) => rng.next_u32(),
151        }
152    }
153
154    fn next_u64(&mut self) -> u64 {
155        match self {
156            RngSource::Thread(rng) => rng.next_u64(),
157            RngSource::Xoshiro(rng) => rng.next_u64(),
158        }
159    }
160
161    fn fill_bytes(&mut self, dest: &mut [u8]) {
162        match self {
163            RngSource::Thread(rng) => rng.fill_bytes(dest),
164            RngSource::Xoshiro(rng) => rng.fill_bytes(dest),
165        }
166    }
167}
168
169/// SimEvent represents an event in the simulator. It is used internally to
170/// represent events that are to be processed by the simulator (in SimQueue) and
171/// events that are produced by the simulator (the resulting trace).
172#[derive(PartialEq, Hash, Eq, Clone, Debug)]
173pub struct SimEvent {
174    /// the actual event
175    pub event: TriggerEvent,
176    /// the time of the event taking place
177    pub time: Instant,
178    /// the delay of the event due to integration
179    pub integration_delay: Duration,
180    /// flag to track if the event is from the client
181    pub client: bool,
182    /// flag to track padding or normal packet
183    pub contains_padding: bool,
184    /// internal flag to mark event as bypass
185    bypass: bool,
186    /// internal flag to mark event as replace
187    replace: bool,
188    // debug note
189    pub debug_note: Option<String>,
190}
191
192/// Helper function to convert a TriggerEvent to a usize for sorting purposes.
193fn event_to_usize(e: &TriggerEvent) -> usize {
194    match e {
195        // tunnel before normal before padding
196        TriggerEvent::TunnelSent => 0,
197        TriggerEvent::NormalSent => 1,
198        TriggerEvent::PaddingSent { .. } => 2,
199        TriggerEvent::TunnelRecv => 3,
200        TriggerEvent::NormalRecv => 4,
201        TriggerEvent::PaddingRecv => 5,
202        // begin before end
203        TriggerEvent::BlockingBegin { .. } => 6,
204        TriggerEvent::BlockingEnd => 7,
205        TriggerEvent::TimerBegin { .. } => 8,
206        TriggerEvent::TimerEnd { .. } => 9,
207    }
208}
209
210// for SimEvent, implement Ord and PartialOrd to allow for sorting by time
211impl Ord for SimEvent {
212    fn cmp(&self, other: &Self) -> Ordering {
213        // reverse order to get the smallest time first
214        self.time
215            .cmp(&other.time)
216            .then_with(|| event_to_usize(&self.event).cmp(&event_to_usize(&other.event)))
217            .reverse()
218    }
219}
220
221impl PartialOrd for SimEvent {
222    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
223        Some(self.cmp(other))
224    }
225}
226
227/// ScheduledAction represents an action that is scheduled to be executed at a
228/// certain time.
229#[derive(PartialEq, Clone, Debug)]
230pub struct ScheduledAction {
231    action: TriggerAction,
232    time: Instant,
233}
234
235/// The state of the client or the server in the simulator.
236#[derive(Debug)]
237pub struct SimState<M, R> {
238    /// an instance of the Maybenot framework
239    framework: Framework<M, R>,
240    /// scheduled action timers
241    scheduled_action: Vec<Option<ScheduledAction>>,
242    /// scheduled internal timers
243    scheduled_internal_timer: Vec<Option<Instant>>,
244    /// blocking until time, active is set
245    blocking_until: Option<Instant>,
246    /// whether the active blocking bypassable or not
247    blocking_bypassable: bool,
248    /// integration aspects for this state
249    integration: Option<Integration>,
250}
251
252impl<M> SimState<M, RngSource>
253where
254    M: AsRef<[Machine]>,
255{
256    pub fn new(
257        machines: M,
258        current_time: Instant,
259        max_padding_frac: f64,
260        max_blocking_frac: f64,
261        integration: Option<Integration>,
262        insecure_rng_seed: Option<u64>,
263    ) -> Self {
264        let rng = match insecure_rng_seed {
265            // deterministic, insecure RNG
266            Some(seed) => RngSource::Xoshiro(Xoshiro256StarStar::seed_from_u64(seed)),
267            // secure RNG, default
268            None => RngSource::Thread(rand::rng()),
269        };
270
271        let num_machines = machines.as_ref().len();
272
273        Self {
274            framework: Framework::new(
275                machines,
276                max_padding_frac,
277                max_blocking_frac,
278                current_time,
279                rng,
280            )
281            .unwrap(),
282            scheduled_action: vec![None; num_machines],
283            scheduled_internal_timer: vec![None; num_machines],
284            blocking_until: None,
285            blocking_bypassable: false,
286            integration,
287        }
288    }
289
290    pub fn reporting_delay(&self) -> Duration {
291        self.integration
292            .as_ref()
293            .map(Integration::reporting_delay)
294            .unwrap_or(Duration::from_micros(0))
295    }
296
297    pub fn action_delay(&self) -> Duration {
298        self.integration
299            .as_ref()
300            .map(Integration::action_delay)
301            .unwrap_or(Duration::from_micros(0))
302    }
303
304    pub fn trigger_delay(&self) -> Duration {
305        self.integration
306            .as_ref()
307            .map(Integration::trigger_delay)
308            .unwrap_or(Duration::from_micros(0))
309    }
310}
311
312/// The main simulator function.
313///
314/// Zero or more machines can concurrently be run on the client and server. The
315/// machines can be different. The framework is designed to support many
316/// machines.
317///
318/// The queue MUST have been created by [`parse_trace`] with the same delay. The
319/// queue is modified by the simulator and should be re-created for each run of
320/// the simulator or cloned.
321///
322/// If max_trace_length is > 0, the simulator will stop after max_trace_length
323/// events have been *simulated* by the simulator and added to the simulating
324/// output trace. Note that some machines may schedule infinite actions (e.g.,
325/// schedule new padding after sending padding), so the simulator may never
326/// stop. Use [`sim_advanced`] to set the maximum number of iterations to run
327/// the simulator for and other advanced settings.
328///
329/// If only_network_activity is true, the simulator will only append events that
330/// are related to network activity (i.e., packets sent and received) to the
331/// output trace. This is recommended if you want to use the output trace for
332/// traffic analysis without further (recursive) simulation.
333pub fn sim(
334    machines_client: &[Machine],
335    machines_server: &[Machine],
336    sq: &mut SimQueue,
337    delay: Duration,
338    max_trace_length: usize,
339    only_network_activity: bool,
340) -> Vec<SimEvent> {
341    let network = Network::new(delay, None);
342    let args = SimulatorArgs::new(network, max_trace_length, only_network_activity);
343    sim_advanced(machines_client, machines_server, sq, &args)
344}
345
346/// Arguments for [`sim_advanced`].
347#[derive(Clone, Debug)]
348pub struct SimulatorArgs {
349    /// The network model for simulating the network between the client and the
350    /// server.
351    pub network: Network,
352    /// The maximum number of events to simulate.
353    pub max_trace_length: usize,
354    /// The maximum number of iterations to run the simulator for. If 0, the
355    /// simulator will run until it stops.
356    pub max_sim_iterations: usize,
357    /// If true, the simulator will continue after all normal packets have been
358    /// processed.
359    pub continue_after_all_normal_packets_processed: bool,
360    /// If true, only client events are returned in the output trace.
361    pub only_client_events: bool,
362    /// If true, only events that represent network packets are returned in the
363    /// output trace.
364    pub only_network_activity: bool,
365    /// The maximum fraction of padding for the client's instance of the
366    /// Maybenot framework.
367    pub max_padding_frac_client: f64,
368    /// The maximum fraction of blocking for the client's instance of the
369    /// Maybenot framework.
370    pub max_blocking_frac_client: f64,
371    /// The maximum fraction of padding for the server's instance of the
372    /// Maybenot framework.
373    pub max_padding_frac_server: f64,
374    /// The maximum fraction of blocking for the server's instance of the
375    /// Maybenot framework.
376    pub max_blocking_frac_server: f64,
377    /// The seed for the deterministic (insecure) Xoshiro256StarStar RNG. If
378    /// None, the simulator will use the cryptographically secure thread_rng().
379    pub insecure_rng_seed: Option<u64>,
380    /// Optional client integration delays.
381    pub client_integration: Option<Integration>,
382    /// Optional server integration delays.
383    pub server_integration: Option<Integration>,
384}
385
386impl SimulatorArgs {
387    pub fn new(network: Network, max_trace_length: usize, only_network_activity: bool) -> Self {
388        Self {
389            network,
390            max_trace_length,
391            max_sim_iterations: 0,
392            continue_after_all_normal_packets_processed: false,
393            only_client_events: false,
394            only_network_activity,
395            max_padding_frac_client: 0.0,
396            max_blocking_frac_client: 0.0,
397            max_padding_frac_server: 0.0,
398            max_blocking_frac_server: 0.0,
399            insecure_rng_seed: None,
400            client_integration: None,
401            server_integration: None,
402        }
403    }
404}
405
406/// Like [`sim`], but allows to (i) set the maximum padding and blocking
407/// fractions for the client and server, (ii) specify the maximum number of
408/// iterations to run the simulator for, and (iii) only returning client events.
409pub fn sim_advanced(
410    machines_client: &[Machine],
411    machines_server: &[Machine],
412    sq: &mut SimQueue,
413    args: &SimulatorArgs,
414) -> Vec<SimEvent> {
415    // the resulting simulated trace
416    let expected_trace_len = if args.max_trace_length > 0 {
417        args.max_trace_length
418    } else {
419        // a rough estimate of the number of events in the trace
420        sq.len() * 2
421    };
422    let mut trace: Vec<SimEvent> = Vec::with_capacity(expected_trace_len);
423
424    // put the mocked current time at the first event
425    let mut current_time = sq.get_first_time().unwrap();
426
427    let mut client = SimState::new(
428        machines_client,
429        current_time,
430        args.max_padding_frac_client,
431        args.max_blocking_frac_client,
432        args.clone().client_integration,
433        args.insecure_rng_seed,
434    );
435    let mut server = SimState::new(
436        machines_server,
437        current_time,
438        args.max_padding_frac_server,
439        args.max_blocking_frac_server,
440        args.clone().server_integration,
441        // if we have an insecure seed, we use the next number in the sequence
442        // to avoid the same seed for both client and server
443        args.insecure_rng_seed.map(|seed| seed.wrapping_add(1)),
444    );
445    debug!("sim(): client machines {}", machines_client.len());
446    debug!("sim(): server machines {}", machines_server.len());
447
448    let mut network = NetworkBottleneck::new(args.network, Duration::from_secs(1), sq.max_pps);
449
450    let mut sim_iterations = 0;
451    let start_time = current_time;
452    while let Some(next) = pick_next(sq, &mut client, &mut server, &mut network, current_time) {
453        debug!("#########################################################");
454        debug!("sim(): main loop start");
455
456        // move time forward?
457        match next.time.cmp(&current_time) {
458            Ordering::Less => {
459                debug!("sim(): {current_time:#?}");
460                debug!("sim(): {:#?}", next.time);
461                panic!("bug: next event moves time backwards");
462            }
463            Ordering::Greater => {
464                debug!("sim(): time moved forward {:#?}", next.time - current_time);
465                current_time = next.time;
466            }
467            _ => {}
468        }
469
470        // status
471        debug!(
472            "sim(): at time {:#?}, aggregate network base delay {:#?} @client and {:#?} @server",
473            current_time.duration_since(start_time),
474            network.client_aggregate_base_delay,
475            network.server_aggregate_base_delay,
476        );
477        if next.client {
478            debug!("sim(): @client next\n{next:#?}");
479        } else {
480            debug!("sim(): @server next\n{next:#?}");
481        }
482        if let Some(blocking_until) = client.blocking_until {
483            debug!(
484                "sim(): client is blocked until time {:#?}",
485                blocking_until.duration_since(start_time)
486            );
487        }
488        if let Some(blocking_until) = server.blocking_until {
489            debug!(
490                "sim(): server is blocked until time {:#?}",
491                blocking_until.duration_since(start_time)
492            );
493        }
494
495        // Where the simulator simulates the entire network between the client
496        // and the server. Returns true if there was network activity (i.e., a
497        // packet was sent or received over the network), false otherwise.
498        let network_activity = if next.client {
499            sim_network_stack(&next, sq, &client, &mut server, &mut network, &current_time)
500        } else {
501            sim_network_stack(&next, sq, &server, &mut client, &mut network, &current_time)
502        };
503
504        // get actions, update scheduled actions
505        if next.client {
506            debug!("sim(): trigger @client framework {:?}", next.event);
507            trigger_update(&mut client, &next, &current_time, sq, true);
508        } else {
509            debug!("sim(): trigger @server framework {:?}", next.event);
510            trigger_update(&mut server, &next, &current_time, sq, false);
511        }
512
513        // conditional save to resulting trace: only on network activity if set
514        // in fn arg, and only on client activity if set in fn arg
515        if (!args.only_network_activity || network_activity)
516            && (!args.only_client_events || next.client)
517        {
518            // this should be a network trace: adjust timestamps based on any
519            // integration delays
520            let mut n = next.clone();
521            match next.event {
522                TriggerEvent::NormalSent => {
523                    // remove the reporting delay
524                    n.time -= n.integration_delay;
525                }
526                TriggerEvent::PaddingSent { .. } => {
527                    // padding packet adds the action delay
528                    n.time += n.integration_delay;
529                }
530                TriggerEvent::TunnelSent => {
531                    if n.contains_padding {
532                        // padding packet adds the action delay
533                        n.time += n.integration_delay;
534                    } else {
535                        // normal packet removes the reporting delay
536                        n.time -= n.integration_delay;
537                    }
538                }
539                TriggerEvent::TunnelRecv | TriggerEvent::PaddingRecv | TriggerEvent::NormalRecv => {
540                    // remove the reporting delay
541                    n.time -= n.integration_delay;
542                }
543
544                _ => {}
545            }
546
547            n.debug_note = Some(format!(
548                "agg. delay {:?} @c, {:?} @s",
549                network.client_aggregate_base_delay, network.server_aggregate_base_delay
550            ));
551
552            trace.push(n);
553        }
554
555        if args.max_trace_length > 0 && trace.len() >= args.max_trace_length {
556            debug!(
557                "sim(): we done, reached max trace length {}",
558                args.max_trace_length
559            );
560            break;
561        }
562
563        // check if we should stop
564        sim_iterations += 1;
565        if args.max_sim_iterations > 0 && sim_iterations >= args.max_sim_iterations {
566            debug!(
567                "sim(): we done, reached max sim iterations {}",
568                args.max_sim_iterations
569            );
570            break;
571        }
572
573        // check if we should stop after all normal packets have been processed
574        if !args.continue_after_all_normal_packets_processed && sq.no_normal_packets() {
575            debug!("sim(): we done, all normal packets processed");
576            break;
577        }
578
579        debug!("sim(): main loop end, more work?");
580        debug!("#########################################################");
581    }
582
583    // sort the trace by time
584    trace.sort_by(|a, b| a.time.cmp(&b.time));
585
586    trace
587}
588
589fn pick_next<M: AsRef<[Machine]>>(
590    sq: &mut SimQueue,
591    client: &mut SimState<M, RngSource>,
592    server: &mut SimState<M, RngSource>,
593    network: &mut NetworkBottleneck,
594    current_time: Instant,
595) -> Option<SimEvent> {
596    // find the earliest scheduled action, internal timer, block expiry,
597    // aggregate delay, and queued events to determine the next event
598    let s = peek_scheduled_action(
599        &client.scheduled_action,
600        &server.scheduled_action,
601        current_time,
602    );
603    debug!("\tpick_next(): peek_scheduled_action = {s:?}");
604
605    let i = peek_scheduled_internal_timer(
606        &client.scheduled_internal_timer,
607        &server.scheduled_internal_timer,
608        current_time,
609    );
610    debug!("\tpick_next(): peek_scheduled_internal_timer = {i:?}");
611
612    let (b, b_is_client) =
613        peek_blocked_exp(client.blocking_until, server.blocking_until, current_time);
614    debug!("\tpick_next(): peek_blocked_exp = {b:?}");
615
616    let n = network.peek_aggregate_delay(current_time);
617    debug!("\tpick_next(): peek_aggregate_delay = {n:?}");
618
619    let (q, qid, q_is_client) = peek_queue(
620        sq,
621        client,
622        server,
623        network.client_aggregate_base_delay,
624        network.server_aggregate_base_delay,
625        s.min(i).min(b).min(n),
626        current_time,
627    );
628    debug!("\tpick_next(): peek_queue = {q:?}");
629
630    // no next?
631    if s == Duration::MAX
632        && i == Duration::MAX
633        && b == Duration::MAX
634        && n == Duration::MAX
635        && q == Duration::MAX
636    {
637        return None;
638    }
639
640    // We prioritize the aggregate delay first: it is fundamental and may lead
641    // to further delays for picked_queue
642    if n <= s && n <= i && n <= b && n <= q {
643        debug!("\tpick_next(): picked aggregate delay");
644        network.pop_aggregate_delay();
645        return pick_next(sq, client, server, network, current_time);
646    }
647
648    // next is blocking expiry, fundamental due to how we aggregate delay
649    if b <= s && b <= i && b <= q {
650        debug!("\tpick_next(): picked blocking");
651        // create SimEvent and turn off blocking, ASSUMPTION: block outgoing is
652        // reported from integration
653        let delay: Duration;
654        if b_is_client {
655            delay = client.reporting_delay();
656            client.blocking_until = None;
657        } else {
658            delay = server.reporting_delay();
659            server.blocking_until = None;
660        }
661
662        // determine if we have any aggregate delay to schedule (are we blocking
663        // anything?)
664        let (blocking, _) = sq.peek_blocking(false, b_is_client);
665        if let Some(event) = blocking {
666            // if the first blocking event is in the past, it was delayed by the
667            // blocking: note that the blocking ends at current_time + b
668            if event.time < current_time + b {
669                //let blocked_duration = current_time + b - event.time;
670                let time_of_expiry = current_time + b;
671                if let Some(blocked_duration) = agg_delay_on_blocking_expire(
672                    sq,
673                    b_is_client,
674                    time_of_expiry,
675                    event,
676                    match b_is_client {
677                        true => network.client_aggregate_base_delay,
678                        false => network.server_aggregate_base_delay,
679                    },
680                ) {
681                    network.push_aggregate_delay(blocked_duration, &time_of_expiry, b_is_client);
682                }
683            }
684        }
685
686        let e = SimEvent {
687            client: b_is_client,
688            event: TriggerEvent::BlockingEnd,
689            time: current_time + b + delay,
690            integration_delay: delay,
691            bypass: false,
692            replace: false,
693            contains_padding: false,
694            debug_note: None,
695        };
696        if delay > Duration::default() {
697            // if any delay, there might be events before the BlockingEnd event,
698            // so queue up and pick again
699            sq.push_sim(e);
700            return pick_next(sq, client, server, network, current_time);
701        }
702        return Some(e);
703    }
704
705    // We prioritize the queue next: in general, stuff happens faster outside
706    // the framework than inside it. On overload, the user of the framework will
707    // bulk trigger events in the framework.
708    if q <= s && q <= i {
709        debug!("\tpick_next(): picked queue, is_client {q_is_client}, queue {qid:?}");
710        let mut tmp = sq
711            .pop(
712                qid,
713                q_is_client,
714                if q_is_client {
715                    network.client_aggregate_base_delay
716                } else {
717                    network.server_aggregate_base_delay
718                },
719            )
720            .unwrap();
721        debug!("\tpick_next(): popped from queue {tmp:?}");
722        // check if blocking moves the event forward in time
723        if current_time + q > tmp.time {
724            // move the event forward in time
725            tmp.time = current_time + q;
726        }
727
728        return Some(tmp);
729    }
730
731    // next we pick internal events, which should be faster than scheduled
732    // actions due to less work
733    if i <= s {
734        debug!("\tpick_next(): picked internal timer");
735        let target = current_time + i;
736        let act = do_internal_timer(client, server, target);
737        if let Some(a) = act {
738            sq.push_sim(a.clone());
739        }
740        return pick_next(sq, client, server, network, current_time);
741    }
742
743    // what's left is scheduled actions: find the action act on the action,
744    // putting the event into the sim queue, and then recurse
745    debug!("\tpick_next(): picked scheduled action");
746    let target = current_time + s;
747    let act = do_scheduled_action(client, server, target);
748    if let Some(a) = act {
749        sq.push_sim(a.clone());
750    }
751    pick_next(sq, client, server, network, current_time)
752}
753
754fn do_internal_timer<M: AsRef<[Machine]>>(
755    client: &mut SimState<M, RngSource>,
756    server: &mut SimState<M, RngSource>,
757    target: Instant,
758) -> Option<SimEvent> {
759    let mut machine: Option<MachineId> = None;
760    let mut is_client = false;
761
762    for (id, opt) in client.scheduled_internal_timer.iter_mut().enumerate() {
763        if let Some(a) = opt {
764            if *a == target {
765                machine = Some(MachineId::from_raw(id));
766                is_client = true;
767                *opt = None;
768                break;
769            }
770        }
771    }
772
773    if machine.is_none() {
774        for (id, opt) in server.scheduled_internal_timer.iter_mut().enumerate() {
775            if let Some(a) = opt {
776                if *a == target {
777                    machine = Some(MachineId::from_raw(id));
778                    is_client = false;
779                    *opt = None;
780                    break;
781                }
782            }
783        }
784    }
785
786    assert!(machine.is_some(), "bug: no internal action found");
787
788    // create SimEvent with TimerEnd
789    Some(SimEvent {
790        client: is_client,
791        event: TriggerEvent::TimerEnd {
792            machine: machine.unwrap(),
793        },
794        time: target,
795        integration_delay: Duration::from_micros(0), // TODO: is this correct?
796        bypass: false,
797        replace: false,
798        contains_padding: false,
799        debug_note: None,
800    })
801}
802
803fn do_scheduled_action<M: AsRef<[Machine]>>(
804    client: &mut SimState<M, RngSource>,
805    server: &mut SimState<M, RngSource>,
806    target: Instant,
807) -> Option<SimEvent> {
808    // find the action
809    let mut a: Option<ScheduledAction> = None;
810    let mut is_client = false;
811
812    for opt in client.scheduled_action.iter_mut() {
813        if let Some(sa) = opt {
814            if sa.time == target {
815                a = Some(sa.clone());
816                is_client = true;
817                *opt = None;
818                break;
819            }
820        }
821    }
822
823    // cannot schedule a None action, so if we found one, done
824    if a.is_none() {
825        for opt in server.scheduled_action.iter_mut() {
826            if let Some(sa) = opt {
827                if sa.time == target {
828                    a = Some(sa.clone());
829                    is_client = false;
830                    *opt = None;
831                    break;
832                }
833            }
834        }
835    }
836
837    // no action found
838    assert!(a.is_some(), "bug: no action found");
839    let a = a.unwrap();
840
841    // do the action
842    match a.action {
843        TriggerAction::Cancel { .. } => {
844            // this should never happen, bug
845            panic!("bug: cancel action in scheduled action");
846        }
847        TriggerAction::UpdateTimer { .. } => {
848            // this should never happen, bug
849            panic!("bug: update timer action in scheduled action");
850        }
851        TriggerAction::SendPadding {
852            timeout: _,
853            bypass,
854            replace,
855            machine,
856        } => {
857            let action_delay = if is_client {
858                client.action_delay()
859            } else {
860                server.action_delay()
861            };
862
863            Some(SimEvent {
864                event: TriggerEvent::PaddingSent { machine },
865                time: a.time,
866                integration_delay: action_delay,
867                client: is_client,
868                bypass,
869                replace,
870                contains_padding: true,
871                debug_note: None,
872            })
873        }
874        TriggerAction::BlockOutgoing {
875            timeout: _,
876            duration,
877            bypass,
878            replace,
879            machine,
880        } => {
881            let block = a.time + duration;
882            let event_bypass;
883            // ASSUMPTION: block outgoing reported from integration
884            let total_delay = if is_client {
885                client.action_delay() + client.reporting_delay()
886            } else {
887                server.action_delay() + server.reporting_delay()
888            };
889            let reported = a.time + total_delay;
890
891            // should we update client/server blocking?
892            if is_client {
893                if replace || block > client.blocking_until.unwrap_or(a.time) {
894                    client.blocking_until = Some(block);
895                    client.blocking_bypassable = bypass;
896                }
897                event_bypass = client.blocking_bypassable;
898            } else {
899                if replace || block > server.blocking_until.unwrap_or(a.time) {
900                    server.blocking_until = Some(block);
901                    server.blocking_bypassable = bypass;
902                }
903                event_bypass = server.blocking_bypassable;
904            }
905
906            // event triggered regardless
907            Some(SimEvent {
908                event: TriggerEvent::BlockingBegin { machine },
909                time: reported,
910                integration_delay: total_delay,
911                client: is_client,
912                bypass: event_bypass,
913                replace: false,
914                contains_padding: false,
915                debug_note: None,
916            })
917        }
918    }
919}
920
921fn trigger_update<M: AsRef<[Machine]>>(
922    state: &mut SimState<M, RngSource>,
923    next: &SimEvent,
924    current_time: &Instant,
925    sq: &mut SimQueue,
926    is_client: bool,
927) {
928    let trigger_delay = state.trigger_delay();
929
930    // parse actions and update
931    for action in state
932        .framework
933        .trigger_events(slice::from_ref(&next.event), *current_time)
934    {
935        match action {
936            TriggerAction::Cancel { machine, timer } => {
937                debug!("\ttrigger_update(): cancel action {machine:?} {timer:?}");
938                // here we make a simplifying assumption of no trigger delay for
939                // cancel actions
940                match timer {
941                    Timer::Action => {
942                        state.scheduled_action[machine.into_raw()] = None;
943                    }
944                    Timer::Internal => {
945                        state.scheduled_internal_timer[machine.into_raw()] = None;
946                    }
947                    Timer::All => {
948                        state.scheduled_action[machine.into_raw()] = None;
949                        state.scheduled_internal_timer[machine.into_raw()] = None;
950                    }
951                }
952            }
953            TriggerAction::SendPadding {
954                timeout,
955                bypass: _,
956                replace: _,
957                machine,
958            } => {
959                debug!("\ttrigger_update(): send padding action {timeout:?} {machine:?}");
960                state.scheduled_action[machine.into_raw()] = Some(ScheduledAction {
961                    action: action.clone(),
962                    time: *current_time + *timeout + trigger_delay,
963                });
964            }
965            TriggerAction::BlockOutgoing {
966                timeout,
967                duration: _,
968                bypass: _,
969                replace: _,
970                machine,
971            } => {
972                debug!("\ttrigger_update(): block outgoing action {timeout:?} {machine:?}");
973                state.scheduled_action[machine.into_raw()] = Some(ScheduledAction {
974                    action: action.clone(),
975                    time: *current_time + *timeout + trigger_delay,
976                });
977            }
978            TriggerAction::UpdateTimer {
979                duration,
980                replace,
981                machine,
982            } => {
983                debug!("\ttrigger_update(): update timer action {duration:?} {machine:?}");
984                // get current internal timer duration, if any
985                let current =
986                    state.scheduled_internal_timer[machine.into_raw()].unwrap_or(*current_time);
987
988                // update the timer
989                if *replace || current < *current_time + *duration {
990                    state.scheduled_internal_timer[machine.into_raw()] =
991                        Some(*current_time + *duration);
992                    // TimerBegin event
993                    sq.push_sim(SimEvent {
994                        client: is_client,
995                        event: TriggerEvent::TimerBegin { machine: *machine },
996                        time: *current_time,
997                        integration_delay: Duration::from_micros(0), // TODO: is this correct?
998                        bypass: false,
999                        replace: false,
1000                        contains_padding: false,
1001                        debug_note: None,
1002                    });
1003                }
1004            }
1005        }
1006    }
1007}
1008
1009/// Parse a trace into a [`SimQueue`] for use with [`sim`].
1010///
1011/// The trace should contain one or more lines of the form
1012/// "time,direction,size\n", where time is in nanoseconds relative to the first
1013/// line, direction is either "s" for sent or "r" for received, and size is the
1014/// number of bytes sent or received. The delay is used to model the network
1015/// delay between the client and server. Returns a SimQueue with the events in
1016/// the trace for use with [`sim`].
1017pub fn parse_trace(trace: &str, network: Network) -> SimQueue {
1018    parse_trace_advanced(trace, network, None, None)
1019}
1020
1021pub fn parse_trace_advanced(
1022    trace: &str,
1023    network: Network,
1024    client: Option<&Integration>,
1025    server: Option<&Integration>,
1026) -> SimQueue {
1027    let mut sq = SimQueue::new();
1028    // compute max observed packets per second by checking the max number of
1029    // packets in a 100ms window
1030    let mut sent_window = WindowCount::new(Duration::from_millis(100));
1031    let mut recv_window = WindowCount::new(Duration::from_millis(100));
1032    let mut sent_max_pps = 0;
1033    let mut recv_max_pps = 0;
1034
1035    // we just need a random starting time to make sure that we don't start from
1036    // absolute 0
1037    let starting_time = Instant::now();
1038
1039    for l in trace.lines() {
1040        let parts: Vec<&str> = l.split(',').collect();
1041        if parts.len() >= 2 {
1042            let timestamp =
1043                starting_time + Duration::from_nanos(parts[0].trim().parse::<u64>().unwrap());
1044            // let size = parts[2].trim().parse::<u64>().unwrap();
1045
1046            // NOTE: for supporting deterministic simulation with a seed, note
1047            // that once network is randomized and integration delays are used,
1048            // both need to be updated below. Unfortunately, users of the
1049            // simulator would have to take this parsing into account as well.
1050            match parts[1] {
1051                "s" | "sn" => {
1052                    // client sent at the given time
1053                    let reporting_delay = client
1054                        .map(Integration::reporting_delay)
1055                        .unwrap_or(Duration::from_micros(0));
1056                    let reported = timestamp + reporting_delay;
1057                    sq.push(
1058                        TriggerEvent::NormalSent,
1059                        true,
1060                        false,
1061                        reported,
1062                        reporting_delay,
1063                    );
1064
1065                    let m = sent_window.add(&timestamp);
1066                    if m > sent_max_pps {
1067                        sent_max_pps = m;
1068                    }
1069                }
1070                "r" | "rn" => {
1071                    // sent by server delay time ago
1072                    let sent = timestamp - network.delay;
1073                    // but reported to the Maybenot framework at the server with delay
1074                    let reporting_delay = server
1075                        .map(Integration::reporting_delay)
1076                        .unwrap_or(Duration::from_micros(0));
1077                    let reported = sent + reporting_delay;
1078                    sq.push(
1079                        TriggerEvent::NormalSent,
1080                        false,
1081                        false,
1082                        reported,
1083                        reporting_delay,
1084                    );
1085
1086                    let m = recv_window.add(&timestamp);
1087                    if m > recv_max_pps {
1088                        recv_max_pps = m;
1089                    }
1090                }
1091                "sp" | "rp" => {
1092                    // TODO: figure out of ignoring is the right thing to do
1093                }
1094                _ => {
1095                    panic!("invalid direction")
1096                }
1097            }
1098        }
1099    }
1100
1101    sq.max_pps = Some(sent_max_pps.max(recv_max_pps) * 10);
1102
1103    sq
1104}