Skip to main content

ectoplasm/
ectoplasm.rs

1use futures::stream::{SplitSink, SplitStream, StreamExt};
2use futures::SinkExt;
3use std::collections::VecDeque;
4use std::sync::Arc;
5pub use tokio;
6use tokio::sync::RwLock;
7use tokio::task;
8use tokio_tungstenite::connect_async;
9use tokio_tungstenite::tungstenite::Message;
10pub use url;
11
12type WebSocketStream = tokio_tungstenite::WebSocketStream<
13    tokio_tungstenite::stream::Stream<
14        tokio::net::TcpStream,
15        tokio_native_tls::TlsStream<tokio::net::TcpStream>,
16    >,
17>;
18
19pub struct Canvas {
20    buffer: [[u8; 256]; 256],
21}
22
23impl Canvas {
24    pub fn new() -> Self {
25        Self {
26            buffer: [[0; 256]; 256],
27        }
28    }
29
30    pub fn get_pixel(&self, x: u8, y: u8) -> u8 {
31        self.buffer[y as usize][x as usize]
32    }
33
34    pub fn set_pixel(&mut self, x: u8, y: u8, c: u8) {
35        self.buffer[y as usize][x as usize] = c;
36    }
37}
38
39pub struct RemoteCanvas {
40    local_copy: Arc<RwLock<Canvas>>,
41    _socket_read: Arc<RwLock<SplitStream<WebSocketStream>>>,
42    socket_write: Arc<RwLock<SplitSink<WebSocketStream, Message>>>,
43    _updater_task: task::JoinHandle<()>,
44    fetch_triggers: Arc<RwLock<VecDeque<triggered::Trigger>>>,
45}
46
47impl RemoteCanvas {
48    pub async fn new(server_url: url::Url) -> Result<Self, String> {
49        let (socket, _) = connect_async(server_url)
50            .await
51            .map_err(|err| err.to_string())?;
52        let (socket_write, socket_read) = {
53            let (write, read) = socket.split();
54            (Arc::new(RwLock::new(write)), Arc::new(RwLock::new(read)))
55        };
56        let local_copy = Arc::new(RwLock::new(Canvas::new()));
57        let fetch_triggers = Arc::new(RwLock::new(VecDeque::<triggered::Trigger>::new()));
58        let updater_task = {
59            let local_copy_clone = local_copy.clone();
60            let socket_read_clone = socket_read.clone();
61            let fetch_triggers_clone = fetch_triggers.clone();
62            task::spawn(async move {
63                println!("started update listener");
64                loop {
65                    println!("waiting for message in update listener loop");
66                    let message = socket_read_clone
67                        .write()
68                        .await
69                        .next()
70                        .await
71                        .unwrap()
72                        .unwrap();
73                    println!("received message");
74                    let mut canvas = local_copy_clone.write().await;
75                    let is_key_frame = process_update(&mut canvas, &message);
76                    if is_key_frame {
77                        if let Some(trigger) = fetch_triggers_clone.write().await.pop_front() {
78                            trigger.trigger();
79                        }
80                    }
81                }
82            })
83        };
84        let remote_canvas = Self {
85            local_copy,
86            _socket_read: socket_read,
87            socket_write,
88            _updater_task: updater_task,
89            fetch_triggers,
90        };
91        Ok(remote_canvas)
92    }
93
94    pub async fn set_pixel(&mut self, x: u8, y: u8, c: u8) {
95        self.local_copy.write().await.set_pixel(x, y, c);
96        let message = Message::binary(vec![x, y, c]);
97        self.socket_write.write().await.send(message).await.unwrap()
98    }
99
100    pub async fn get_pixel(&self, x: u8, y: u8) -> u8 {
101        self.local_copy.read().await.get_pixel(x, y)
102    }
103
104    pub async fn fetch(&mut self) {
105        let (trigger, listener) = triggered::trigger();
106        self.fetch_triggers.write().await.push_back(trigger);
107        let message = Message::text("fetch");
108        self.socket_write.write().await.send(message).await.unwrap();
109        println!("sent fetch command");
110        listener.await;
111        println!("done awaiting the matching key-frame");
112    }
113}
114
115// NOTE(mkovacs): Return value indicates whether the update was a key-frame
116fn process_update(canvas: &mut Canvas, message: &Message) -> bool {
117    println!("received message: {}", message);
118    if let Message::Binary(data) = message {
119        if !data.is_empty() {
120            match data[0] {
121                0 => {
122                    if (data.len() - 1) % 3 == 0 {
123                        let n = (data.len() - 1) / 3;
124                        for i in 0..n {
125                            let x = data[3 * i + 1];
126                            let y = data[3 * i + 2];
127                            let c = data[3 * i + 3];
128                            canvas.set_pixel(x, y, c);
129                        }
130                    } else {
131                        println!(
132                            "[WARN] expected message length of 3n+1, got {}",
133                            message.len()
134                        );
135                    }
136                    false
137                }
138                1 => {
139                    if data.len() == 65537 {
140                        for y in 0..=255 {
141                            for x in 0..=255 {
142                                canvas.set_pixel(x as u8, y as u8, data[1 + x + 256 * y]);
143                            }
144                        }
145                    } else {
146                        println!(
147                            "[WARN] expected message length of 65537, got {}",
148                            message.len()
149                        );
150                    }
151                    true
152                }
153                2 => {
154                    // TODO(mkovacs): Handle palette updates
155                    false
156                }
157                _ => false,
158            }
159        } else {
160            println!("[WARN] unexpected empty message: {}", message);
161            false
162        }
163    } else {
164        println!("[WARN] unexpected message: {}", message);
165        false
166    }
167}