use std::io::BufReader;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::{lines, write_all};
use tokio::net::TcpListener; use tokio::prelude::*;
use uuid::Uuid;
use crate::prelude::*;
pub fn start_local(db: Arc<Mycelium>) -> std::io::Result<()> {
let port = db.config.tcp_port;
let ip = "0.0.0.0";
let addr = std::env::args()
.nth(1)
.unwrap_or_else(|| format!("{}:{}", ip, port));
let addr = addr.parse::<SocketAddr>().expect("socket error");
let listener = TcpListener::bind(&addr)
.map_err(|_| "Failed to bind socket.")
.expect("Failed to bind listener");
let done = listener
.incoming()
.map_err(|e| println!("Error accepting socket: error = {:?}", e))
.for_each(move |socket| {
let (reader, writer) = socket.split();
let lines = lines(BufReader::new(reader));
let db = db.clone();
let responses = lines.map(move |line| {
let request = match Request::parse(&line) {
Ok(req) => req,
Err(e) => return Response::Error { msg: e },
};
match request {
Request::Command { cmd } => {
let res = db.execute_command(cmd).unwrap();
match res {
Result::Some((None, None, None, Some(list), None)) => {
let str = serde_json::to_string(&list).unwrap();
Response::String { str }
}
Result::Some((Some(id), None, None, None, None)) => {
let id = Uuid::from_bytes(id);
Response::String {
str: id.to_string(),
}
}
Result::Some((None, None, None, None, Some(list))) => {
let str = serde_json::to_string(&list).unwrap();
Response::String { str }
}
_ => Response::String {
str: "Command not yet available.".to_string(),
},
}
}
Request::ParseSql { cmd } => {
let res = db.execute_command(cmd).unwrap();
match res {
Result::Some((None, None, None, Some(list), None)) => {
let str = serde_json::to_string(&list).unwrap();
Response::String { str }
}
_ => unimplemented!(),
}
}
}
});
let writes = responses.fold(writer, |writer, response| {
let mut response = response.serialize();
response.push('\r');
response.push('\n');
write_all(writer, response.into_bytes()).map(|(w, _)| w)
});
let msg = writes.then(move |_| Ok(()));
tokio::spawn(msg)
});
tokio::run(done);
Ok(())
}
#[allow(dead_code)]
enum ResponseFormat {
BIN,
JSON,
RON,
}
#[allow(dead_code)]
enum Request {
Command { cmd: Command },
ParseSql { cmd: Command },
}
impl Request {
fn parse(input: &str) -> std::result::Result<Request, String> {
if input.len() > 3 {
let head = input.split_at(4);
match head.0.trim() {
"cmd" => match serde_json::from_str(head.1) {
Ok(cmd) => Ok(Request::Command { cmd }),
Err(_) => Err(From::from("Bad Command.")),
},
"msql" => match mycelium_command::Command::parse(head.1) {
Ok(cmd) => Ok(Request::Command { cmd }),
Err(e) => Err(format!("Parse Error: {}", e)),
},
_ => Err(From::from("Unknown statement.")),
}
} else {
Err(From::from("no head"))
}
}
}
#[allow(dead_code)]
enum Response {
Value { id: DbId, value: String },
String { str: String },
Error { msg: String },
}
impl Response {
fn serialize(&self) -> String {
match *self {
Response::Value { ref id, ref value } => format!("{:?} = {}", id, value),
Response::String { ref str } => format!("{} ", str),
Response::Error { ref msg } => format!("error: {}", msg),
}
}
}