1pub mod debug_frame;
2use debug_frame::DebugFrame;
3use uuid::Uuid;
4
5use std::io::{Error as IOError, ErrorKind};
6use std::thread::{self, JoinHandle};
7use std::time::Instant;
8use tokio::net::{TcpListener, TcpStream};
9
10use futures::join;
11use futures_util::{StreamExt, TryStreamExt};
12use tokio::runtime::Builder;
13use tokio::sync::broadcast::{channel as broadcast_channel, Receiver, Sender};
14
15use tokio_stream::wrappers::BroadcastStream;
16use tokio_tungstenite::tungstenite;
17
18use serde::{Deserialize, Serialize};
19use serde_repr::*;
20use std::fmt::Debug;
21
22#[derive(Serialize_repr, Deserialize_repr, Clone, Debug)]
23#[serde(rename_all = "camelCase")]
24#[repr(u8)]
25pub enum Command {
26 Reset,
27 Init,
28}
29
30#[derive(Deserialize, Serialize, Clone, Debug)]
31#[serde(rename_all = "camelCase")]
32pub struct InitData {
33 first_frame: DebugFrame,
34 session_id: Uuid,
35}
36
37#[derive(Deserialize, Serialize, Clone, Debug)]
38#[serde(rename_all = "camelCase")]
39pub enum Message {
40 Frames(Vec<DebugFrame>),
41 Command(Command),
42 Init(InitData),
43 Error(String),
44}
45
46pub struct DebugServer {
47 pub to_client: Sender<Message>,
48 pub from_client: Receiver<Message>,
49 pub session_id: Uuid,
50 pub current_frame: usize,
51 pub frame_queue: Vec<DebugFrame>,
52 last_sync: Instant,
53 _handle: JoinHandle<()>,
54}
55
56async fn accept_connection(
57 stream: TcpStream,
58 to_hotham: Sender<Message>,
59 from_hotham: Receiver<Message>,
60) {
61 let ws_stream = tokio_tungstenite::accept_async(stream)
62 .await
63 .expect("Error during the websocket handshake occurred");
64
65 let (to_client, from_client) = ws_stream.split();
66
67 let client_to_hotham = from_client
68 .filter_map(|msg| async move {
69 match msg {
70 Ok(tungstenite::Message::Text(m)) => match serde_json::from_str::<Message>(&m) {
71 Ok(message) => Some(Ok(message)),
72 Err(e) => {
73 let error_message = format!("Error deserialising: {:?}", e);
74 eprintln!("{:?}", error_message);
75 Some(Ok(Message::Error(error_message)))
76 }
77 },
78 _ => None,
79 }
80 })
81 .try_for_each(|v| futures::future::ready(to_hotham.send(v).map(|_| ()).map_err(|_| ())));
82
83 let from_hotham = BroadcastStream::new(from_hotham).map(|message| match message {
84 Ok(message) => {
85 let json = serde_json::to_string(&message)
86 .unwrap_or_else(|_| panic!("Unable to deserialize {:?}", message));
87 Ok(tungstenite::Message::Text(json))
88 }
89 Err(e) => Err(tokio_tungstenite::tungstenite::Error::Io(IOError::new(
90 ErrorKind::Other,
91 e.to_string(),
92 ))),
93 });
94 let hotham_to_client = from_hotham.forward(to_client);
95
96 let (r1, r2) = join!(client_to_hotham, hotham_to_client);
97 r1.expect("Problem!");
98 r2.expect("Problem 2!");
99}
100
101impl DebugServer {
102 pub fn new() -> DebugServer {
103 let (to_client, from_client) = broadcast_channel(16);
108 let to_client_clone = to_client.clone();
109
110 let handle = thread::spawn(move || {
111 let rt = Builder::new_current_thread().enable_all().build().unwrap();
112 rt.block_on(async {
113 let addr = "127.0.0.1:8000".to_string();
114 let try_socket = TcpListener::bind(&addr).await;
116 let listener = try_socket.expect("Failed to bind");
117 while let Ok((stream, _)) = listener.accept().await {
118 let to_hotham = to_client_clone.clone();
121 let from_hotham = to_hotham.subscribe();
122
123 tokio::spawn(accept_connection(stream, to_hotham, from_hotham));
124 }
125 })
126 });
127
128 DebugServer {
129 to_client,
130 from_client,
131 _handle: handle,
132 session_id: Uuid::new_v4(),
133 current_frame: 0,
134 frame_queue: Vec::new(),
135 last_sync: Instant::now(),
136 }
137 }
138
139 pub fn sync(&mut self) -> Option<Vec<DebugFrame>> {
140 let mut editable_data = None;
141 let frames = self.frame_queue.drain(..).collect::<Vec<_>>();
142 let response: Option<Message> = match self.from_client.try_recv() {
143 Ok(Message::Frames(debug_data_from_client)) => {
144 editable_data = Some(debug_data_from_client);
145 Some(Message::Frames(frames))
146 }
147 Ok(Message::Command(Command::Reset)) => Some(Message::Frames(frames)),
148 Ok(Message::Command(Command::Init)) => Some(Message::Init(InitData {
149 session_id: self.session_id,
150 first_frame: frames[0].clone(),
151 })),
152 Ok(error_message @ Message::Error(_)) => Some(error_message),
153 Ok(_) => None,
154 Err(_) => Some(Message::Frames(frames)),
155 };
156
157 if let Some(response) = response {
158 self.to_client
159 .send(response)
160 .expect("Unable to update value");
161 let _ = self.from_client.try_recv();
162 }
163
164 self.last_sync = Instant::now();
165
166 editable_data
167 }
168
169 pub fn time_since_last_sync(&self) -> u64 {
170 self.last_sync.elapsed().as_secs()
171 }
172}
173
174impl Default for DebugServer {
175 fn default() -> Self {
176 Self::new()
177 }
178}
179
180#[cfg(test)]
181#[allow(unused_assignments)]
182mod tests {
183 #[derive(Deserialize, Serialize, Clone, Debug, Default)]
184 struct Test {
185 name: String,
186 }
187
188 #[derive(Deserialize, Serialize, Clone, Debug)]
189 struct Info {
190 count: usize,
191 }
192
193 use tokio_tungstenite::{connect_async, tungstenite::protocol::Message as TungsteniteMessage};
194
195 use crate::debug_frame::{DebugEntity, DebugTransform};
196 use futures_util::sink::SinkExt;
197
198 use super::*;
199 #[test]
200 fn test_debug_server_smoke() {
201 let mut server: DebugServer = DebugServer::new();
203 let test_entity = DebugEntity {
204 name: "Environment".to_string(),
205 entity_id: 0,
206 id: "test".to_string(),
207 transform: Some(DebugTransform {
208 translation: [0., 0., 0.],
209 rotation: [0., 0., 0., 1.],
210 scale: [1., 1., 1.],
211 }),
212 collider: None,
213 };
214
215 let debug_frame = DebugFrame {
216 id: Uuid::new_v4(),
217 frame_number: 0,
218 entities: vec![test_entity.clone()],
219 session_id: Uuid::new_v4(),
220 };
221 server.frame_queue.push(debug_frame);
222
223 let tokio_rt = Builder::new_current_thread().enable_all().build().unwrap();
224 let mut stream = tokio_rt.block_on(async {
226 let (socket, _) = connect_async("ws://127.0.0.1:8000").await.unwrap();
227 let (mut write, read) = socket.split();
228 let _ = write
229 .send(TungsteniteMessage::Text(r#"{ "command": 1 }"#.to_string()))
230 .await;
231
232 read
233 });
234
235 server.sync();
236
237 let data = tokio_rt.block_on(async {
238 let message = stream.next().await.unwrap().unwrap();
239
240 match message {
243 TungsteniteMessage::Text(s) => match serde_json::from_str::<Message>(&s) {
244 Ok(Message::Frames(mut d)) => d.pop().unwrap(),
245 Ok(Message::Init(i)) => i.first_frame,
246 _ => panic!("Unexpected message: {}", s),
247 },
248 _ => panic!("Unexpected message {:?}", message),
249 }
250 });
251
252 assert_eq!(data.entities.get(0).unwrap(), &test_entity);
253 }
254}