bronzedb_server/
lib.rs

1use bronzedb_engine::Engine;
2use bronzedb_protocol::request::Request::{self, *};
3use bronzedb_protocol::response::Response;
4use bronzedb_util::status::StatusCode::*;
5use bronzedb_util::status::{Error, Result};
6use log::info;
7use std::io::ErrorKind;
8use std::net::{TcpListener, TcpStream};
9use std::thread::spawn;
10
11pub struct Server<T: Engine> {
12    engine: T,
13}
14
15impl<T: Engine + Clone + Sync + Send + 'static> Server<T> {
16    pub fn new(engine: T) -> Self {
17        Self { engine }
18    }
19
20    pub fn serve(&mut self, listener: TcpListener) -> Result<()> {
21        for stream in listener.incoming() {
22            let stream = stream?;
23            info!("establish connection from {}", stream.peer_addr()?);
24            let engine = self.engine.clone();
25            spawn(move || {
26                let addr = stream.peer_addr().unwrap();
27                handle_client(stream, engine).unwrap();
28                info!("close connection from {}", addr);
29            });
30        }
31        Ok(())
32    }
33}
34
35fn deal_engine_err<T, E: Into<Error>>(
36    stream_ref: &mut TcpStream,
37    result: std::result::Result<T, E>,
38) -> Result<T> {
39    match result {
40        Ok(value) => Ok(value),
41        Err(err) => {
42            Response::Status(EngineError).write_to(stream_ref)?;
43            Err(err.into())
44        }
45    }
46}
47
48fn handle_client<T: Engine>(mut stream: TcpStream, mut engine: T) -> Result<()> {
49    loop {
50        match Request::read_from(&mut stream) {
51            Ok(request) => match request {
52                Get(key) => {
53                    let value = deal_engine_err(&mut stream, engine.get(key.into()))?;
54                    match value {
55                        Some(data) => Response::SingleValue(data).write_to(&mut stream)?,
56                        None => Response::Status(NotFound).write_to(&mut stream)?,
57                    };
58                }
59                Set(key, value) => {
60                    deal_engine_err(&mut stream, engine.set(key.into(), value))?;
61                    Response::Status(OK).write_to(&mut stream)?;
62                }
63
64                Delete(key) => {
65                    deal_engine_err(&mut stream, engine.delete(key.into()))?;
66                    Response::Status(OK).write_to(&mut stream)?;
67                }
68                Scan {
69                    lower_bound,
70                    upper_bound,
71                } => {
72                    let mut scanner =
73                        deal_engine_err(&mut stream, engine.scan(lower_bound, upper_bound))?;
74                    Response::Scanner(scanner.iter()).write_to(&mut stream)?;
75                }
76
77                Ping => {
78                    Response::Status(OK).write_to(&mut stream)?;
79                }
80                NoResponse => continue,
81                Unknown => {
82                    Response::Status(UnknownAction).write_to(&mut stream)?;
83                    break Err(Error::new(UnknownAction, "unknown action"));
84                }
85            },
86
87            Err(ref err) if err.kind() == ErrorKind::UnexpectedEof => break Ok(()), // shutdown
88            Err(err) => break Err(err.into()),
89        }
90    }
91}