1use bronzedb_engine::Engine;
2use bronzedb_protocol::request::Request::{self, *};
3use bronzedb_protocol::response::Response;
4use bronzedb_util::status::StatusCode::*;
5use bronzedb_util::status::{Error, Result};
6use log::info;
7use std::io::ErrorKind;
8use std::net::{TcpListener, TcpStream};
9use std::thread::spawn;
10
11pub struct Server<T: Engine> {
12 engine: T,
13}
14
15impl<T: Engine + Clone + Sync + Send + 'static> Server<T> {
16 pub fn new(engine: T) -> Self {
17 Self { engine }
18 }
19
20 pub fn serve(&mut self, listener: TcpListener) -> Result<()> {
21 for stream in listener.incoming() {
22 let stream = stream?;
23 info!("establish connection from {}", stream.peer_addr()?);
24 let engine = self.engine.clone();
25 spawn(move || {
26 let addr = stream.peer_addr().unwrap();
27 handle_client(stream, engine).unwrap();
28 info!("close connection from {}", addr);
29 });
30 }
31 Ok(())
32 }
33}
34
35fn deal_engine_err<T, E: Into<Error>>(
36 stream_ref: &mut TcpStream,
37 result: std::result::Result<T, E>,
38) -> Result<T> {
39 match result {
40 Ok(value) => Ok(value),
41 Err(err) => {
42 Response::Status(EngineError).write_to(stream_ref)?;
43 Err(err.into())
44 }
45 }
46}
47
48fn handle_client<T: Engine>(mut stream: TcpStream, mut engine: T) -> Result<()> {
49 loop {
50 match Request::read_from(&mut stream) {
51 Ok(request) => match request {
52 Get(key) => {
53 let value = deal_engine_err(&mut stream, engine.get(key.into()))?;
54 match value {
55 Some(data) => Response::SingleValue(data).write_to(&mut stream)?,
56 None => Response::Status(NotFound).write_to(&mut stream)?,
57 };
58 }
59 Set(key, value) => {
60 deal_engine_err(&mut stream, engine.set(key.into(), value))?;
61 Response::Status(OK).write_to(&mut stream)?;
62 }
63
64 Delete(key) => {
65 deal_engine_err(&mut stream, engine.delete(key.into()))?;
66 Response::Status(OK).write_to(&mut stream)?;
67 }
68 Scan {
69 lower_bound,
70 upper_bound,
71 } => {
72 let mut scanner =
73 deal_engine_err(&mut stream, engine.scan(lower_bound, upper_bound))?;
74 Response::Scanner(scanner.iter()).write_to(&mut stream)?;
75 }
76
77 Ping => {
78 Response::Status(OK).write_to(&mut stream)?;
79 }
80 NoResponse => continue,
81 Unknown => {
82 Response::Status(UnknownAction).write_to(&mut stream)?;
83 break Err(Error::new(UnknownAction, "unknown action"));
84 }
85 },
86
87 Err(ref err) if err.kind() == ErrorKind::UnexpectedEof => break Ok(()), Err(err) => break Err(err.into()),
89 }
90 }
91}