1use futures::stream::{SplitSink, SplitStream, StreamExt};
2use futures::SinkExt;
3use std::collections::VecDeque;
4use std::sync::Arc;
5pub use tokio;
6use tokio::sync::RwLock;
7use tokio::task;
8use tokio_tungstenite::connect_async;
9use tokio_tungstenite::tungstenite::Message;
10pub use url;
11
12type WebSocketStream = tokio_tungstenite::WebSocketStream<
13 tokio_tungstenite::stream::Stream<
14 tokio::net::TcpStream,
15 tokio_native_tls::TlsStream<tokio::net::TcpStream>,
16 >,
17>;
18
19pub struct Canvas {
20 buffer: [[u8; 256]; 256],
21}
22
23impl Canvas {
24 pub fn new() -> Self {
25 Self {
26 buffer: [[0; 256]; 256],
27 }
28 }
29
30 pub fn get_pixel(&self, x: u8, y: u8) -> u8 {
31 self.buffer[y as usize][x as usize]
32 }
33
34 pub fn set_pixel(&mut self, x: u8, y: u8, c: u8) {
35 self.buffer[y as usize][x as usize] = c;
36 }
37}
38
39pub struct RemoteCanvas {
40 local_copy: Arc<RwLock<Canvas>>,
41 _socket_read: Arc<RwLock<SplitStream<WebSocketStream>>>,
42 socket_write: Arc<RwLock<SplitSink<WebSocketStream, Message>>>,
43 _updater_task: task::JoinHandle<()>,
44 fetch_triggers: Arc<RwLock<VecDeque<triggered::Trigger>>>,
45}
46
47impl RemoteCanvas {
48 pub async fn new(server_url: url::Url) -> Result<Self, String> {
49 let (socket, _) = connect_async(server_url)
50 .await
51 .map_err(|err| err.to_string())?;
52 let (socket_write, socket_read) = {
53 let (write, read) = socket.split();
54 (Arc::new(RwLock::new(write)), Arc::new(RwLock::new(read)))
55 };
56 let local_copy = Arc::new(RwLock::new(Canvas::new()));
57 let fetch_triggers = Arc::new(RwLock::new(VecDeque::<triggered::Trigger>::new()));
58 let updater_task = {
59 let local_copy_clone = local_copy.clone();
60 let socket_read_clone = socket_read.clone();
61 let fetch_triggers_clone = fetch_triggers.clone();
62 task::spawn(async move {
63 println!("started update listener");
64 loop {
65 println!("waiting for message in update listener loop");
66 let message = socket_read_clone
67 .write()
68 .await
69 .next()
70 .await
71 .unwrap()
72 .unwrap();
73 println!("received message");
74 let mut canvas = local_copy_clone.write().await;
75 let is_key_frame = process_update(&mut canvas, &message);
76 if is_key_frame {
77 if let Some(trigger) = fetch_triggers_clone.write().await.pop_front() {
78 trigger.trigger();
79 }
80 }
81 }
82 })
83 };
84 let remote_canvas = Self {
85 local_copy,
86 _socket_read: socket_read,
87 socket_write,
88 _updater_task: updater_task,
89 fetch_triggers,
90 };
91 Ok(remote_canvas)
92 }
93
94 pub async fn set_pixel(&mut self, x: u8, y: u8, c: u8) {
95 self.local_copy.write().await.set_pixel(x, y, c);
96 let message = Message::binary(vec![x, y, c]);
97 self.socket_write.write().await.send(message).await.unwrap()
98 }
99
100 pub async fn get_pixel(&self, x: u8, y: u8) -> u8 {
101 self.local_copy.read().await.get_pixel(x, y)
102 }
103
104 pub async fn fetch(&mut self) {
105 let (trigger, listener) = triggered::trigger();
106 self.fetch_triggers.write().await.push_back(trigger);
107 let message = Message::text("fetch");
108 self.socket_write.write().await.send(message).await.unwrap();
109 println!("sent fetch command");
110 listener.await;
111 println!("done awaiting the matching key-frame");
112 }
113}
114
115fn process_update(canvas: &mut Canvas, message: &Message) -> bool {
117 println!("received message: {}", message);
118 if let Message::Binary(data) = message {
119 if !data.is_empty() {
120 match data[0] {
121 0 => {
122 if (data.len() - 1) % 3 == 0 {
123 let n = (data.len() - 1) / 3;
124 for i in 0..n {
125 let x = data[3 * i + 1];
126 let y = data[3 * i + 2];
127 let c = data[3 * i + 3];
128 canvas.set_pixel(x, y, c);
129 }
130 } else {
131 println!(
132 "[WARN] expected message length of 3n+1, got {}",
133 message.len()
134 );
135 }
136 false
137 }
138 1 => {
139 if data.len() == 65537 {
140 for y in 0..=255 {
141 for x in 0..=255 {
142 canvas.set_pixel(x as u8, y as u8, data[1 + x + 256 * y]);
143 }
144 }
145 } else {
146 println!(
147 "[WARN] expected message length of 65537, got {}",
148 message.len()
149 );
150 }
151 true
152 }
153 2 => {
154 false
156 }
157 _ => false,
158 }
159 } else {
160 println!("[WARN] unexpected empty message: {}", message);
161 false
162 }
163 } else {
164 println!("[WARN] unexpected message: {}", message);
165 false
166 }
167}