1use std::{
2 path::PathBuf,
3 pin::Pin,
4 sync::{mpsc::Receiver, Arc, Mutex},
5 task::{Context, Poll},
6};
7
8use anyhow::Result;
9use futures::{executor::block_on, FutureExt, Stream};
10use pros_simulator_interface::{SimulatorEvent, SimulatorMessage};
11use tokio::{
12 sync::{
13 mpsc::{self, UnboundedReceiver},
14 oneshot,
15 },
16 task::JoinHandle,
17};
18
19use crate::simulate;
20
21pub struct StreamedSimulatorEvent {
22 pub inner: SimulatorEvent,
23 pub unpause: Option<oneshot::Sender<()>>,
24}
25
26pub fn start_simulator(
28 robot_code: PathBuf,
29 require_unpause: bool,
30 messages: Receiver<SimulatorMessage>,
31) -> impl Stream<Item = Result<StreamedSimulatorEvent>> {
32 let (tx, rx) = mpsc::unbounded_channel();
33
34 SimulatorStream {
35 finished: false,
36 rx,
37 future: tokio::task::spawn_blocking(move || {
38 let tx = Arc::new(Mutex::new(tx));
39 let res = block_on(simulate(
40 &robot_code,
41 {
42 let tx = tx.clone();
43 move |inner| {
44 if require_unpause {
45 let (tx_unpause, rx_unpause) = oneshot::channel();
46 let event = StreamedSimulatorEvent {
47 inner,
48 unpause: Some(tx_unpause),
49 };
50 tx.lock().unwrap().send(Ok(event)).unwrap();
51 _ = rx_unpause.blocking_recv();
52 } else {
53 let event = StreamedSimulatorEvent {
54 inner,
55 unpause: None,
56 };
57 tx.lock().unwrap().send(Ok(event)).unwrap();
58 }
59 }
60 },
61 messages,
62 ));
63 if let Err(e) = res {
64 tx.lock().unwrap().send(Err(e)).unwrap();
65 }
66 }),
67 }
68}
69
70struct SimulatorStream {
71 rx: UnboundedReceiver<Result<StreamedSimulatorEvent>>,
72 finished: bool,
73 future: JoinHandle<()>,
74}
75
76impl Stream for SimulatorStream {
77 type Item = Result<StreamedSimulatorEvent>;
78 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79 let sim = self.get_mut();
80
81 if !sim.finished {
82 if let Poll::Ready(res) = sim.future.poll_unpin(cx) {
83 if let Err(err) = res {
84 return Poll::Ready(Some(Err(err.into())));
85 }
86 sim.finished = true;
87 }
88 }
89
90 sim.rx.poll_recv(cx)
91 }
92}