mycelium_lib/
tcp_server.rs

1use std::io::BufReader;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use tokio::io::{lines, write_all};
6use tokio::net::TcpListener; //TcpStream
7use tokio::prelude::*;
8use uuid::Uuid;
9
10use crate::prelude::*;
11
12///
13/// # Local TCP server
14/// Either this (msql or serialized CommandAST over TCP loopback)
15///
16/// ```
17/// use mycelium_command::prelude::*;
18/// use mycelium_lib::{ prelude::*, Mycelium };
19/// use mycelium_index::prelude::Config;
20///
21/// let config = Config::fetch_or_default(&std::env::current_exe().expect("a path"))
22///     .expect("failed to get a default config");
23/// let db = Mycelium::init_db(config.0);
24///
25/// //match lib_mycelium::start_local(db) {
26/// //    Ok(_) => (),
27/// //    Err(e) => {
28/// //        panic!("Failed to start db server: {:?}" e);
29/// //    }
30/// //}
31///
32/// ```
33///
34pub fn start_local(db: Arc<Mycelium>) -> std::io::Result<()> {
35    let port = db.config.tcp_port;
36    let ip = "0.0.0.0";
37    let addr = std::env::args()
38        .nth(1)
39        .unwrap_or_else(|| format!("{}:{}", ip, port));
40    let addr = addr.parse::<SocketAddr>().expect("socket error");
41    let listener = TcpListener::bind(&addr)
42        .map_err(|_| "Failed to bind socket.")
43        .expect("Failed to bind listener");
44
45    let done = listener
46        .incoming()
47        .map_err(|e| println!("Error accepting socket: error = {:?}", e))
48        .for_each(move |socket| {
49            let (reader, writer) = socket.split();
50            let lines = lines(BufReader::new(reader));
51            let db = db.clone();
52            let responses = lines.map(move |line| {
53                let request = match Request::parse(&line) {
54                    Ok(req) => req,
55                    Err(e) => return Response::Error { msg: e },
56                };
57
58                match request {
59                    Request::Command { cmd } => {
60                        let res = db.execute_command(cmd).unwrap();
61                        match res {
62                            Result::Some((None, None, None, Some(list), None)) => {
63                                let str = serde_json::to_string(&list).unwrap();
64                                Response::String { str }
65                            }
66                            Result::Some((Some(id), None, None, None, None)) => {
67                                let id = Uuid::from_bytes(id);
68                                Response::String {
69                                    str: id.to_string(),
70                                }
71                            }
72                            Result::Some((None, None, None, None, Some(list))) => {
73                                let str = serde_json::to_string(&list).unwrap();
74                                Response::String { str }
75                            }
76                            _ => Response::String {
77                                str: "Command not yet available.".to_string(),
78                            },
79                        }
80                    }
81                    Request::ParseSql { cmd } => {
82                        let res = db.execute_command(cmd).unwrap();
83                        match res {
84                            Result::Some((None, None, None, Some(list), None)) => {
85                                let str = serde_json::to_string(&list).unwrap();
86                                Response::String { str }
87                            }
88                            _ => unimplemented!(),
89                        }
90                    }
91                }
92            });
93
94            let writes = responses.fold(writer, |writer, response| {
95                let mut response = response.serialize();
96                response.push('\r');
97                response.push('\n');
98                write_all(writer, response.into_bytes()).map(|(w, _)| w)
99            });
100
101            let msg = writes.then(move |_| Ok(()));
102            tokio::spawn(msg)
103        });
104
105    tokio::run(done);
106    Ok(())
107}
108
109#[allow(dead_code)]
110enum ResponseFormat {
111    BIN,
112    JSON,
113    RON,
114}
115
116#[allow(dead_code)]
117enum Request {
118    // Serialized mycelium_command::Command
119    Command { cmd: Command },
120    // Parse sql to mycelium_command::Command
121    ParseSql { cmd: Command },
122}
123
124impl Request {
125    fn parse(input: &str) -> std::result::Result<Request, String> {
126        if input.len() > 3 {
127            let head = input.split_at(4);
128            match head.0.trim() {
129                "cmd" => match serde_json::from_str(head.1) {
130                    Ok(cmd) => Ok(Request::Command { cmd }),
131                    Err(_) => Err(From::from("Bad Command.")),
132                },
133                "msql" => match mycelium_command::Command::parse(head.1) {
134                    Ok(cmd) => Ok(Request::Command { cmd }),
135                    Err(e) => Err(format!("Parse Error: {}", e)),
136                },
137                _ => Err(From::from("Unknown statement.")),
138            }
139        } else {
140            Err(From::from("no head"))
141        }
142    }
143}
144
145#[allow(dead_code)]
146enum Response {
147    Value { id: DbId, value: String },
148    String { str: String },
149    Error { msg: String },
150}
151
152impl Response {
153    fn serialize(&self) -> String {
154        match *self {
155            Response::Value { ref id, ref value } => format!("{:?} = {}", id, value),
156            Response::String { ref str } => format!("{} ", str),
157            Response::Error { ref msg } => format!("error: {}", msg),
158        }
159    }
160}