1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
//! # volume server
//!
//! Volume server stores values in file system. For atomicity temporary files are
//! first created in `destdir/tmp` directory and then moved to destination path.
//! For this approach to work, `destdir/tmp` and destination path should be in same
//! file system
//!
//! to start the volume server, run
//!
//! ```sh
//! volume -p 7000 -d /tmp/kalavarastore
//! ```
//!
//! if master server is not aware of this volume server, register with
//! ```sh
//! volume -p 7000 -d /tmp/kalavarastore -m http://master.server -b http://volume.server:7000
//! ```

use md5::compute as compute_md5;
use tempfile::NamedTempFile;
use tiny_http::{Request, Response};

use std::fs::{create_dir_all, remove_file, File};
use std::io::{copy, Error, ErrorKind, Read};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread;

use crate::{Respond, Service};

/// volume store
struct Volume {
    /// directory to store file blobs
    data_dir: Arc<String>,
}

/// Types of responses that master generates
enum ResponseKind {
    /// Path to file blob
    FilePath(PathBuf),

    /// Value saved
    Created,

    /// Value deleted
    Deleted,

    /// Error occured, 500
    ServerError,

    /// Method not allowed, 405
    NotAllowed,
}

impl Default for ResponseKind {
    fn default() -> Self {
        ResponseKind::NotAllowed
    }
}

impl Respond for ResponseKind {
    fn respond(self, req: Request) {
        use ResponseKind::*;

        let _ = match self {
            FilePath(path) => match File::open(path) {
                Ok(file) => req.respond(Response::from_file(file)),
                Err(_) => req.respond(resp!("Server Error", 500)),
            },
            Created => req.respond(resp!("Created", 201)),
            Deleted => req.respond(resp!("Deleted", 204)),
            ServerError => req.respond(resp!("Server error", 500)),
            NotAllowed => req.respond(resp!("Method not allowd", 405)),
        };
    }
}

impl Volume {
    /// Create new volume service
    fn new(data_dir: String) -> Self {
        Self {
            data_dir: Arc::new(data_dir),
        }
    }

    /// Calcualtes destination file path from key
    fn key_to_path(&self, key: &str) -> PathBuf {
        let path = format!("{:x}", compute_md5(key.as_bytes()));

        let mut dest_path = PathBuf::from(self.data_dir.as_ref());
        dest_path.push(path.get(0..1).unwrap());
        dest_path.push(path.get(1..2).unwrap());
        dest_path.push(path.get(2..).unwrap());

        dest_path
    }
}

impl Service for Volume {
    type Response = ResponseKind;

    /// Get value of a key from store
    fn get(&self, key: String) -> Self::Response {
        let dest_path = self.key_to_path(&key);
        ResponseKind::FilePath(dest_path)
    }

    /// Save/Update key in store
    fn save(&self, key: String, mut value: impl Read) -> Self::Response {
        let tmpdir = Path::new(self.data_dir.as_ref()).join("tmp");
        let dest_path = self.key_to_path(&key);

        match NamedTempFile::new_in(tmpdir) {
            Ok(mut tmpfile) => {
                // copy data to file
                match copy(&mut value, &mut tmpfile)
                    .and(create_dir_all(dest_path.parent().unwrap()))
                    .and(
                        tmpfile
                            .persist(dest_path)
                            .map_err(|_| Error::new(ErrorKind::Other, "")),
                    ) {
                    Ok(_) => ResponseKind::Created,
                    _ => ResponseKind::ServerError,
                }
            }
            Err(_) => ResponseKind::ServerError,
        }
    }

    /// Remove a key from store
    fn delete(&self, key: String) -> Self::Response {
        let dest_path = self.key_to_path(&key);

        match remove_file(dest_path) {
            Ok(_) => ResponseKind::Deleted,
            Err(_) => ResponseKind::ServerError,
        }
    }
}

/// starts a kalavara volume server
/// # Arguments
///
/// * `port` - Port name to listen at
/// * `data_dir` - Storage directory
/// * `threads` - Number of threads to spawn
/// * `master` - url of master server to register at
/// * `base` -  base url of server to register with master
///
pub fn start(
    port: u16,
    data_dir: String,
    threads: u16,
    master: Option<String>,
    base: Option<String>,
) {
    let addr: SocketAddr = ([0, 0, 0, 0], port).into();
    let server = Arc::new(tiny_http::Server::http(addr).unwrap());
    let mut handles = Vec::new();

    // creates data directory. files are initially created in tmp dir then moved to corresponding
    // path
    if create_dir_all(Path::new(&data_dir).join("tmp")).is_err() {
        panic!("Could not create data dir. exiting\n");
    }

    // register at master
    match (master, base) {
        (Some(master), Some(base)) => {
            let resp = minreq::post(format!("{}{}", master, "/admin/add-volume"))
                .with_body(base)
                .send();

            match resp {
                Ok(ref res) if res.status_code == 200 => {
                    println!("Successfully registered with master");
                }
                _ => {
                    panic!("Could not register with master");
                }
            };
        }
        (Some(_), _) => panic!("Host required"),
        (_, _) => {} // skip if only host is provided
    };

    let volume = Arc::new(Volume::new(data_dir));

    for _ in 0..threads {
        let server = server.clone();
        let handler = volume.clone();

        handles.push(thread::spawn(move || {
            for rq in server.incoming_requests() {
                handler.dispatch(rq);
            }
        }));
    }

    for h in handles {
        h.join().unwrap();
    }
}