pixelflut_rs/
server.rs

1use core::fmt;
2use std::fmt::Formatter;
3use std::net::IpAddr;
4use std::sync::Arc;
5use std::time::Instant;
6
7use custom_error::custom_error;
8use log::{error, info, warn};
9use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader};
10use tokio::net::{TcpListener, TcpStream};
11use tokio::sync::{mpsc, RwLock};
12use tokio::sync::mpsc::{Receiver, Sender};
13use tokio::task;
14
15use crate::grid::{Grid, Size};
16use crate::pixel::Pixel;
17
18const PIXEL_BUFFER: usize = 1024;
19
20const HELP: &str = "\
21HELP Pixelflut Commands:\n\
22HELP - PX <x> <y> <RRGGBB[AA]>\n\
23HELP - PX <x> <y>   >>  PX <x> <y> <RRGGBB>\n\
24HELP - SIZE         >>  SIZE <width> <height>\n\
25HELP - HELP         >>  HELP ...";
26
27custom_error! { ServerError
28    UnknownCommand = "Unknown command send!"
29}
30
31/// The Pixelflut Server.
32///
33/// The Server is defined by an interface and a port where it should listen on. It
34/// also requires a Grid on which the Pixels should be drawn on. To start everything you just
35/// need to do the following:
36///
37/// ```compile_fail
38/// let server = Server::new("0.0.0.0".parse()?, 2342, grid);
39/// server.start().await
40/// ```
41pub struct Server<G: Grid + std::marker::Send + std::marker::Sync> {
42    interface: IpAddr,
43    port: u16,
44    grid: Arc<RwLock<G>>,
45}
46
47impl<G> Server<G>
48    where
49        G: 'static + Grid + std::marker::Send + std::marker::Sync,
50{
51    /// Creates a new Server for the given interface, port and Grid.
52    pub fn new(interface: IpAddr, port: u16, grid: G) -> Server<G> {
53        Server {
54            interface,
55            port,
56            grid: Arc::new(RwLock::new(grid)),
57        }
58    }
59
60    /// This method will start your server and will never return without an error.
61    pub async fn start(self) -> Result<(), Box<dyn std::error::Error>> {
62        // Bind the listener to the address
63        let listener = TcpListener::bind((self.interface, self.port)).await?;
64        let (tx, rx) = mpsc::channel(PIXEL_BUFFER);
65
66        // Start a dedicated task to draw the pixels in bulks to the grid
67        let write_grid = Arc::clone(&self.grid);
68        task::spawn(async move {
69            draw_pixels(rx, write_grid).await;
70        });
71
72        info!("Server is ready and listening to {}:{}", self.interface, self.port);
73        loop {
74            match listener.accept().await {
75                // The second item contains the IP and port of the new connection.
76                Ok((mut socket, addr)) => {
77                    info!("New connection from {}", addr);
78                    let grid = Arc::clone(&self.grid);
79                    let tx = tx.clone();
80                    task::spawn(async move {
81                        match process(&mut socket, grid, tx).await {
82                            Ok(()) => info!("{} disconnects", addr),
83                            Err(e) => warn!("{} disconnects because of: {}", addr, e),
84                        }
85                    });
86                }
87                Err(e) => error!("Error opening socket connection: {}", e),
88            };
89        }
90    }
91}
92
93async fn draw_pixels<G: Grid>(mut rx: Receiver<Pixel>, grid: Arc<RwLock<G>>) {
94    let buf: &mut Vec<Pixel> = &mut vec!();
95    let mut time = Instant::now();
96
97    loop {
98        if let Some(px) = rx.recv().await {
99            buf.push(px);
100        }
101
102        if !buf.is_empty() && (buf.len() > PIXEL_BUFFER || time.elapsed().as_micros() > 900) {
103            let mut grid = grid.write().await;
104            buf.iter().for_each(|px| grid.draw(px));
105            buf.clear();
106            time = Instant::now();
107        }
108    }
109}
110
111async fn process<G: Grid>(
112    socket: &mut TcpStream,
113    grid: Arc<RwLock<G>>,
114    tx: Sender<Pixel>,
115) -> Result<(), Box<dyn std::error::Error>> {
116    let (rd, mut wr) = io::split(socket);
117    let reader = BufReader::new(rd);
118    let mut lines = reader.lines();
119
120    while let Some(line) = lines.next_line().await? {
121        let mut parts = line.split_whitespace();
122        match parts.next() {
123            Some("PX") => {
124                match parts.count() {
125                    // PX <x> <y>
126                    2 => {
127                        let pixel: Option<Pixel>;
128                        {
129                            let grid = grid.read().await;
130                            pixel = grid.fetch(line.parse()?);
131                        }
132                        if pixel.is_some() {
133                            let pixel = format!("{}\n", pixel.unwrap());
134                            wr.write(pixel.as_bytes()).await?;
135                        }
136                    }
137                    // PX <x> <y> <RRGGBB[AA]>
138                    3 => {
139                        tx.send(line.parse()?).await?;
140                    }
141                    _ => return Err(Box::new(ServerError::UnknownCommand)),
142                }
143            }
144            Some("SIZE") => {
145                let size;
146                {
147                    let grid = grid.read().await;
148                    size = format!("{}\n", grid.size());
149                }
150                wr.write(size.as_bytes()).await?;
151            }
152            Some("HELP") => {
153                let help = format!("{}\n", HELP);
154                wr.write(help.as_bytes()).await?;
155            }
156            _ => return Err(Box::new(ServerError::UnknownCommand)),
157        }
158    }
159
160    Ok(())
161}
162
163impl fmt::Display for Size {
164    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
165        write!(f, "SIZE {} {}", self.x(), self.y())
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use crate::grid::Size;
172
173    #[test]
174    fn display_size() {
175        let size = Size::new(1024, 768);
176        assert_eq!(size.to_string(), "SIZE 1024 768");
177    }
178}