hotham_debug_server/
lib.rs

1pub mod debug_frame;
2use debug_frame::DebugFrame;
3use uuid::Uuid;
4
5use std::io::{Error as IOError, ErrorKind};
6use std::thread::{self, JoinHandle};
7use std::time::Instant;
8use tokio::net::{TcpListener, TcpStream};
9
10use futures::join;
11use futures_util::{StreamExt, TryStreamExt};
12use tokio::runtime::Builder;
13use tokio::sync::broadcast::{channel as broadcast_channel, Receiver, Sender};
14
15use tokio_stream::wrappers::BroadcastStream;
16use tokio_tungstenite::tungstenite;
17
18use serde::{Deserialize, Serialize};
19use serde_repr::*;
20use std::fmt::Debug;
21
22#[derive(Serialize_repr, Deserialize_repr, Clone, Debug)]
23#[serde(rename_all = "camelCase")]
24#[repr(u8)]
25pub enum Command {
26    Reset,
27    Init,
28}
29
30#[derive(Deserialize, Serialize, Clone, Debug)]
31#[serde(rename_all = "camelCase")]
32pub struct InitData {
33    first_frame: DebugFrame,
34    session_id: Uuid,
35}
36
37#[derive(Deserialize, Serialize, Clone, Debug)]
38#[serde(rename_all = "camelCase")]
39pub enum Message {
40    Frames(Vec<DebugFrame>),
41    Command(Command),
42    Init(InitData),
43    Error(String),
44}
45
46pub struct DebugServer {
47    pub to_client: Sender<Message>,
48    pub from_client: Receiver<Message>,
49    pub session_id: Uuid,
50    pub current_frame: usize,
51    pub frame_queue: Vec<DebugFrame>,
52    last_sync: Instant,
53    _handle: JoinHandle<()>,
54}
55
56async fn accept_connection(
57    stream: TcpStream,
58    to_hotham: Sender<Message>,
59    from_hotham: Receiver<Message>,
60) {
61    let ws_stream = tokio_tungstenite::accept_async(stream)
62        .await
63        .expect("Error during the websocket handshake occurred");
64
65    let (to_client, from_client) = ws_stream.split();
66
67    let client_to_hotham = from_client
68        .filter_map(|msg| async move {
69            match msg {
70                Ok(tungstenite::Message::Text(m)) => match serde_json::from_str::<Message>(&m) {
71                    Ok(message) => Some(Ok(message)),
72                    Err(e) => {
73                        let error_message = format!("Error deserialising: {:?}", e);
74                        eprintln!("{:?}", error_message);
75                        Some(Ok(Message::Error(error_message)))
76                    }
77                },
78                _ => None,
79            }
80        })
81        .try_for_each(|v| futures::future::ready(to_hotham.send(v).map(|_| ()).map_err(|_| ())));
82
83    let from_hotham = BroadcastStream::new(from_hotham).map(|message| match message {
84        Ok(message) => {
85            let json = serde_json::to_string(&message)
86                .unwrap_or_else(|_| panic!("Unable to deserialize {:?}", message));
87            Ok(tungstenite::Message::Text(json))
88        }
89        Err(e) => Err(tokio_tungstenite::tungstenite::Error::Io(IOError::new(
90            ErrorKind::Other,
91            e.to_string(),
92        ))),
93    });
94    let hotham_to_client = from_hotham.forward(to_client);
95
96    let (r1, r2) = join!(client_to_hotham, hotham_to_client);
97    r1.expect("Problem!");
98    r2.expect("Problem 2!");
99}
100
101impl DebugServer {
102    pub fn new() -> DebugServer {
103        // These names are kind of confusing, so here's a little explanation:
104        //
105        // - to_client - use this to send a message from hotham to the websocket client
106        // - from_client - use this to receive a message from the websocket client to hotham
107        let (to_client, from_client) = broadcast_channel(16);
108        let to_client_clone = to_client.clone();
109
110        let handle = thread::spawn(move || {
111            let rt = Builder::new_current_thread().enable_all().build().unwrap();
112            rt.block_on(async {
113                let addr = "127.0.0.1:8000".to_string();
114                // Create the event loop and TCP listener we'll accept connections on.
115                let try_socket = TcpListener::bind(&addr).await;
116                let listener = try_socket.expect("Failed to bind");
117                while let Ok((stream, _)) = listener.accept().await {
118                    // - to_hotham - use this to send a message from the websocket client back to hotham
119                    // - from_hotham - use this to receive a message from hotham back to the websocket client
120                    let to_hotham = to_client_clone.clone();
121                    let from_hotham = to_hotham.subscribe();
122
123                    tokio::spawn(accept_connection(stream, to_hotham, from_hotham));
124                }
125            })
126        });
127
128        DebugServer {
129            to_client,
130            from_client,
131            _handle: handle,
132            session_id: Uuid::new_v4(),
133            current_frame: 0,
134            frame_queue: Vec::new(),
135            last_sync: Instant::now(),
136        }
137    }
138
139    pub fn sync(&mut self) -> Option<Vec<DebugFrame>> {
140        let mut editable_data = None;
141        let frames = self.frame_queue.drain(..).collect::<Vec<_>>();
142        let response: Option<Message> = match self.from_client.try_recv() {
143            Ok(Message::Frames(debug_data_from_client)) => {
144                editable_data = Some(debug_data_from_client);
145                Some(Message::Frames(frames))
146            }
147            Ok(Message::Command(Command::Reset)) => Some(Message::Frames(frames)),
148            Ok(Message::Command(Command::Init)) => Some(Message::Init(InitData {
149                session_id: self.session_id,
150                first_frame: frames[0].clone(),
151            })),
152            Ok(error_message @ Message::Error(_)) => Some(error_message),
153            Ok(_) => None,
154            Err(_) => Some(Message::Frames(frames)),
155        };
156
157        if let Some(response) = response {
158            self.to_client
159                .send(response)
160                .expect("Unable to update value");
161            let _ = self.from_client.try_recv();
162        }
163
164        self.last_sync = Instant::now();
165
166        editable_data
167    }
168
169    pub fn time_since_last_sync(&self) -> u64 {
170        self.last_sync.elapsed().as_secs()
171    }
172}
173
174impl Default for DebugServer {
175    fn default() -> Self {
176        Self::new()
177    }
178}
179
180#[cfg(test)]
181#[allow(unused_assignments)]
182mod tests {
183    #[derive(Deserialize, Serialize, Clone, Debug, Default)]
184    struct Test {
185        name: String,
186    }
187
188    #[derive(Deserialize, Serialize, Clone, Debug)]
189    struct Info {
190        count: usize,
191    }
192
193    use tokio_tungstenite::{connect_async, tungstenite::protocol::Message as TungsteniteMessage};
194
195    use crate::debug_frame::{DebugEntity, DebugTransform};
196    use futures_util::sink::SinkExt;
197
198    use super::*;
199    #[test]
200    fn test_debug_server_smoke() {
201        // This is simulating the inside of Hotham.
202        let mut server: DebugServer = DebugServer::new();
203        let test_entity = DebugEntity {
204            name: "Environment".to_string(),
205            entity_id: 0,
206            id: "test".to_string(),
207            transform: Some(DebugTransform {
208                translation: [0., 0., 0.],
209                rotation: [0., 0., 0., 1.],
210                scale: [1., 1., 1.],
211            }),
212            collider: None,
213        };
214
215        let debug_frame = DebugFrame {
216            id: Uuid::new_v4(),
217            frame_number: 0,
218            entities: vec![test_entity.clone()],
219            session_id: Uuid::new_v4(),
220        };
221        server.frame_queue.push(debug_frame);
222
223        let tokio_rt = Builder::new_current_thread().enable_all().build().unwrap();
224        // Send an init message to the server..
225        let mut stream = tokio_rt.block_on(async {
226            let (socket, _) = connect_async("ws://127.0.0.1:8000").await.unwrap();
227            let (mut write, read) = socket.split();
228            let _ = write
229                .send(TungsteniteMessage::Text(r#"{ "command": 1 }"#.to_string()))
230                .await;
231
232            read
233        });
234
235        server.sync();
236
237        let data = tokio_rt.block_on(async {
238            let message = stream.next().await.unwrap().unwrap();
239
240            // Note that we may not get an "init" here as the server might not have processed our message yet.
241            // We cover both bases just to be sure.
242            match message {
243                TungsteniteMessage::Text(s) => match serde_json::from_str::<Message>(&s) {
244                    Ok(Message::Frames(mut d)) => d.pop().unwrap(),
245                    Ok(Message::Init(i)) => i.first_frame,
246                    _ => panic!("Unexpected message: {}", s),
247                },
248                _ => panic!("Unexpected message {:?}", message),
249            }
250        });
251
252        assert_eq!(data.entities.get(0).unwrap(), &test_entity);
253    }
254}