pros_simulator/
stream.rs

1use std::{
2    path::PathBuf,
3    pin::Pin,
4    sync::{mpsc::Receiver, Arc, Mutex},
5    task::{Context, Poll},
6};
7
8use anyhow::Result;
9use futures::{executor::block_on, FutureExt, Stream};
10use pros_simulator_interface::{SimulatorEvent, SimulatorMessage};
11use tokio::{
12    sync::{
13        mpsc::{self, UnboundedReceiver},
14        oneshot,
15    },
16    task::JoinHandle,
17};
18
19use crate::simulate;
20
21pub struct StreamedSimulatorEvent {
22    pub inner: SimulatorEvent,
23    pub unpause: Option<oneshot::Sender<()>>,
24}
25
26/// Start a simulator in a new tokio task and stream the events from it.
27pub fn start_simulator(
28    robot_code: PathBuf,
29    require_unpause: bool,
30    messages: Receiver<SimulatorMessage>,
31) -> impl Stream<Item = Result<StreamedSimulatorEvent>> {
32    let (tx, rx) = mpsc::unbounded_channel();
33
34    SimulatorStream {
35        finished: false,
36        rx,
37        future: tokio::task::spawn_blocking(move || {
38            let tx = Arc::new(Mutex::new(tx));
39            let res = block_on(simulate(
40                &robot_code,
41                {
42                    let tx = tx.clone();
43                    move |inner| {
44                        if require_unpause {
45                            let (tx_unpause, rx_unpause) = oneshot::channel();
46                            let event = StreamedSimulatorEvent {
47                                inner,
48                                unpause: Some(tx_unpause),
49                            };
50                            tx.lock().unwrap().send(Ok(event)).unwrap();
51                            _ = rx_unpause.blocking_recv();
52                        } else {
53                            let event = StreamedSimulatorEvent {
54                                inner,
55                                unpause: None,
56                            };
57                            tx.lock().unwrap().send(Ok(event)).unwrap();
58                        }
59                    }
60                },
61                messages,
62            ));
63            if let Err(e) = res {
64                tx.lock().unwrap().send(Err(e)).unwrap();
65            }
66        }),
67    }
68}
69
70struct SimulatorStream {
71    rx: UnboundedReceiver<Result<StreamedSimulatorEvent>>,
72    finished: bool,
73    future: JoinHandle<()>,
74}
75
76impl Stream for SimulatorStream {
77    type Item = Result<StreamedSimulatorEvent>;
78    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79        let sim = self.get_mut();
80
81        if !sim.finished {
82            if let Poll::Ready(res) = sim.future.poll_unpin(cx) {
83                if let Err(err) = res {
84                    return Poll::Ready(Some(Err(err.into())));
85                }
86                sim.finished = true;
87            }
88        }
89
90        sim.rx.poll_recv(cx)
91    }
92}