use md5::compute as compute_md5;
use tempfile::NamedTempFile;
use tiny_http::{Request, Response};
use std::fs::{create_dir_all, remove_file, File};
use std::io::{copy, Error, ErrorKind, Read};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread;
use crate::{Respond, Service};
struct Volume {
data_dir: Arc<String>,
}
enum ResponseKind {
FilePath(PathBuf),
Created,
Deleted,
ServerError,
NotAllowed,
}
impl Default for ResponseKind {
fn default() -> Self {
ResponseKind::NotAllowed
}
}
impl Respond for ResponseKind {
fn respond(self, req: Request) {
use ResponseKind::*;
let _ = match self {
FilePath(path) => match File::open(path) {
Ok(file) => req.respond(Response::from_file(file)),
Err(_) => req.respond(resp!("Server Error", 500)),
},
Created => req.respond(resp!("Created", 201)),
Deleted => req.respond(resp!("Deleted", 204)),
ServerError => req.respond(resp!("Server error", 500)),
NotAllowed => req.respond(resp!("Method not allowd", 405)),
};
}
}
impl Volume {
fn new(data_dir: String) -> Self {
Self {
data_dir: Arc::new(data_dir),
}
}
fn key_to_path(&self, key: &str) -> PathBuf {
let path = format!("{:x}", compute_md5(key.as_bytes()));
let mut dest_path = PathBuf::from(self.data_dir.as_ref());
dest_path.push(path.get(0..1).unwrap());
dest_path.push(path.get(1..2).unwrap());
dest_path.push(path.get(2..).unwrap());
dest_path
}
}
impl Service for Volume {
type Response = ResponseKind;
fn get(&self, key: String) -> Self::Response {
let dest_path = self.key_to_path(&key);
ResponseKind::FilePath(dest_path)
}
fn save(&self, key: String, mut value: impl Read) -> Self::Response {
let tmpdir = Path::new(self.data_dir.as_ref()).join("tmp");
let dest_path = self.key_to_path(&key);
match NamedTempFile::new_in(tmpdir) {
Ok(mut tmpfile) => {
match copy(&mut value, &mut tmpfile)
.and(create_dir_all(dest_path.parent().unwrap()))
.and(
tmpfile
.persist(dest_path)
.map_err(|_| Error::new(ErrorKind::Other, "")),
) {
Ok(_) => ResponseKind::Created,
_ => ResponseKind::ServerError,
}
}
Err(_) => ResponseKind::ServerError,
}
}
fn delete(&self, key: String) -> Self::Response {
let dest_path = self.key_to_path(&key);
match remove_file(dest_path) {
Ok(_) => ResponseKind::Deleted,
Err(_) => ResponseKind::ServerError,
}
}
}
pub fn start(
port: u16,
data_dir: String,
threads: u16,
master: Option<String>,
base: Option<String>,
) {
let addr: SocketAddr = ([0, 0, 0, 0], port).into();
let server = Arc::new(tiny_http::Server::http(addr).unwrap());
let mut handles = Vec::new();
if create_dir_all(Path::new(&data_dir).join("tmp")).is_err() {
panic!("Could not create data dir. exiting\n");
}
match (master, base) {
(Some(master), Some(base)) => {
let resp = minreq::post(format!("{}{}", master, "/admin/add-volume"))
.with_body(base)
.send();
match resp {
Ok(ref res) if res.status_code == 200 => {
println!("Successfully registered with master");
}
_ => {
panic!("Could not register with master");
}
};
}
(Some(_), _) => panic!("Host required"),
(_, _) => {} };
let volume = Arc::new(Volume::new(data_dir));
for _ in 0..threads {
let server = server.clone();
let handler = volume.clone();
handles.push(thread::spawn(move || {
for rq in server.incoming_requests() {
handler.dispatch(rq);
}
}));
}
for h in handles {
h.join().unwrap();
}
}