1use core::fmt;
2use std::fmt::Formatter;
3use std::net::IpAddr;
4use std::sync::Arc;
5use std::time::Instant;
6
7use custom_error::custom_error;
8use log::{error, info, warn};
9use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader};
10use tokio::net::{TcpListener, TcpStream};
11use tokio::sync::{mpsc, RwLock};
12use tokio::sync::mpsc::{Receiver, Sender};
13use tokio::task;
14
15use crate::grid::{Grid, Size};
16use crate::pixel::Pixel;
17
18const PIXEL_BUFFER: usize = 1024;
19
20const HELP: &str = "\
21HELP Pixelflut Commands:\n\
22HELP - PX <x> <y> <RRGGBB[AA]>\n\
23HELP - PX <x> <y> >> PX <x> <y> <RRGGBB>\n\
24HELP - SIZE >> SIZE <width> <height>\n\
25HELP - HELP >> HELP ...";
26
27custom_error! { ServerError
28 UnknownCommand = "Unknown command send!"
29}
30
31pub struct Server<G: Grid + std::marker::Send + std::marker::Sync> {
42 interface: IpAddr,
43 port: u16,
44 grid: Arc<RwLock<G>>,
45}
46
47impl<G> Server<G>
48 where
49 G: 'static + Grid + std::marker::Send + std::marker::Sync,
50{
51 pub fn new(interface: IpAddr, port: u16, grid: G) -> Server<G> {
53 Server {
54 interface,
55 port,
56 grid: Arc::new(RwLock::new(grid)),
57 }
58 }
59
60 pub async fn start(self) -> Result<(), Box<dyn std::error::Error>> {
62 let listener = TcpListener::bind((self.interface, self.port)).await?;
64 let (tx, rx) = mpsc::channel(PIXEL_BUFFER);
65
66 let write_grid = Arc::clone(&self.grid);
68 task::spawn(async move {
69 draw_pixels(rx, write_grid).await;
70 });
71
72 info!("Server is ready and listening to {}:{}", self.interface, self.port);
73 loop {
74 match listener.accept().await {
75 Ok((mut socket, addr)) => {
77 info!("New connection from {}", addr);
78 let grid = Arc::clone(&self.grid);
79 let tx = tx.clone();
80 task::spawn(async move {
81 match process(&mut socket, grid, tx).await {
82 Ok(()) => info!("{} disconnects", addr),
83 Err(e) => warn!("{} disconnects because of: {}", addr, e),
84 }
85 });
86 }
87 Err(e) => error!("Error opening socket connection: {}", e),
88 };
89 }
90 }
91}
92
93async fn draw_pixels<G: Grid>(mut rx: Receiver<Pixel>, grid: Arc<RwLock<G>>) {
94 let buf: &mut Vec<Pixel> = &mut vec!();
95 let mut time = Instant::now();
96
97 loop {
98 if let Some(px) = rx.recv().await {
99 buf.push(px);
100 }
101
102 if !buf.is_empty() && (buf.len() > PIXEL_BUFFER || time.elapsed().as_micros() > 900) {
103 let mut grid = grid.write().await;
104 buf.iter().for_each(|px| grid.draw(px));
105 buf.clear();
106 time = Instant::now();
107 }
108 }
109}
110
111async fn process<G: Grid>(
112 socket: &mut TcpStream,
113 grid: Arc<RwLock<G>>,
114 tx: Sender<Pixel>,
115) -> Result<(), Box<dyn std::error::Error>> {
116 let (rd, mut wr) = io::split(socket);
117 let reader = BufReader::new(rd);
118 let mut lines = reader.lines();
119
120 while let Some(line) = lines.next_line().await? {
121 let mut parts = line.split_whitespace();
122 match parts.next() {
123 Some("PX") => {
124 match parts.count() {
125 2 => {
127 let pixel: Option<Pixel>;
128 {
129 let grid = grid.read().await;
130 pixel = grid.fetch(line.parse()?);
131 }
132 if pixel.is_some() {
133 let pixel = format!("{}\n", pixel.unwrap());
134 wr.write(pixel.as_bytes()).await?;
135 }
136 }
137 3 => {
139 tx.send(line.parse()?).await?;
140 }
141 _ => return Err(Box::new(ServerError::UnknownCommand)),
142 }
143 }
144 Some("SIZE") => {
145 let size;
146 {
147 let grid = grid.read().await;
148 size = format!("{}\n", grid.size());
149 }
150 wr.write(size.as_bytes()).await?;
151 }
152 Some("HELP") => {
153 let help = format!("{}\n", HELP);
154 wr.write(help.as_bytes()).await?;
155 }
156 _ => return Err(Box::new(ServerError::UnknownCommand)),
157 }
158 }
159
160 Ok(())
161}
162
163impl fmt::Display for Size {
164 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
165 write!(f, "SIZE {} {}", self.x(), self.y())
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use crate::grid::Size;
172
173 #[test]
174 fn display_size() {
175 let size = Size::new(1024, 768);
176 assert_eq!(size.to_string(), "SIZE 1024 768");
177 }
178}