1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use bronzedb_engine::Engine;
use bronzedb_protocol::request::Request::{self, *};
use bronzedb_protocol::response::Response;
use bronzedb_util::status::StatusCode::*;
use bronzedb_util::status::{Error, Result};
use log::info;
use std::io::ErrorKind;
use std::net::{TcpListener, TcpStream};
use std::thread::spawn;

pub struct Server<T: Engine> {
    engine: T,
}

impl<T: Engine + Clone + Sync + Send + 'static> Server<T> {
    pub fn new(engine: T) -> Self {
        Self { engine }
    }

    pub fn serve(&mut self, listener: TcpListener) -> Result<()> {
        for stream in listener.incoming() {
            let stream = stream?;
            info!("establish connection from {}", stream.peer_addr()?);
            let engine = self.engine.clone();
            spawn(move || {
                let addr = stream.peer_addr().unwrap();
                handle_client(stream, engine).unwrap();
                info!("close connection from {}", addr);
            });
        }
        Ok(())
    }
}

fn deal_engine_err<T, E: Into<Error>>(
    stream_ref: &mut TcpStream,
    result: std::result::Result<T, E>,
) -> Result<T> {
    match result {
        Ok(value) => Ok(value),
        Err(err) => {
            Response::Status(EngineError).write_to(stream_ref)?;
            Err(err.into())
        }
    }
}

fn handle_client<T: Engine>(mut stream: TcpStream, mut engine: T) -> Result<()> {
    loop {
        match Request::read_from(&mut stream) {
            Ok(request) => match request {
                Get(key) => {
                    let value = deal_engine_err(&mut stream, engine.get(key.into()))?;
                    match value {
                        Some(data) => Response::SingleValue(data).write_to(&mut stream)?,
                        None => Response::Status(NotFound).write_to(&mut stream)?,
                    };
                }
                Set(key, value) => {
                    deal_engine_err(&mut stream, engine.set(key.into(), value))?;
                    Response::Status(OK).write_to(&mut stream)?;
                }

                Delete(key) => {
                    deal_engine_err(&mut stream, engine.delete(key.into()))?;
                    Response::Status(OK).write_to(&mut stream)?;
                }
                Scan {
                    lower_bound,
                    upper_bound,
                } => {
                    let mut scanner =
                        deal_engine_err(&mut stream, engine.scan(lower_bound, upper_bound))?;
                    Response::Scanner(scanner.iter()).write_to(&mut stream)?;
                }

                Ping => {
                    Response::Status(OK).write_to(&mut stream)?;
                }
                NoResponse => continue,
                Unknown => {
                    Response::Status(UnknownAction).write_to(&mut stream)?;
                    break Err(Error::new(UnknownAction, "unknown action"));
                }
            },

            Err(ref err) if err.kind() == ErrorKind::UnexpectedEof => break Ok(()), // shutdown
            Err(err) => break Err(err.into()),
        }
    }
}