vfs_https/
server.rs

1use async_stream::stream;
2use chrono::prelude::*;
3use core::task::{Context, Poll};
4use futures_util::stream::Stream;
5use hyper::header::{AUTHORIZATION, COOKIE, SET_COOKIE, WWW_AUTHENTICATE};
6use hyper::service::{make_service_fn, service_fn};
7use hyper::{Body, Method, Request, Response, Server, StatusCode};
8use rand::prelude::*;
9use rustls::internal::pemfile;
10use std::collections::HashMap;
11use std::io::{Error, ErrorKind, Read, Seek, Write};
12use std::pin::Pin;
13use std::sync;
14use tokio::net::TcpListener;
15use tokio::net::TcpStream;
16use tokio_rustls::server::TlsStream;
17use tokio_rustls::TlsAcceptor;
18use vfs::FileSystem;
19
20use crate::error::{HttpsFSError, HttpsFSResult};
21use crate::protocol::*;
22
23/// A https server providing a interface for HttpsFS
24pub struct HttpsFSServer<T: FileSystem> {
25    port: u16,
26    certs: Vec<rustls::Certificate>,
27    private_key: rustls::PrivateKey,
28    file_system: std::sync::Arc<std::sync::Mutex<T>>,
29    client_data: std::sync::Arc<std::sync::Mutex<HashMap<String, HttpsFSServerClientData>>>,
30    credential_validator: fn(user: &str, password: &str) -> bool,
31}
32
33/// Helper structure for building HttpsFS structs
34pub struct HttpsFSServerBuilder<T: FileSystem> {
35    port: u16,
36    certs: Option<String>,
37    private_key: Option<String>,
38    file_system: T,
39    credential_validator: Option<fn(user: &str, password: &str) -> bool>,
40}
41
42#[derive(Debug)]
43struct HttpsFSServerClientData {
44    last_use: DateTime<Local>,
45    authorized: bool,
46}
47
48impl HttpsFSServerClientData {
49    fn new() -> Self {
50        HttpsFSServerClientData {
51            last_use: Local::now(),
52            authorized: false,
53        }
54    }
55}
56
57impl<T: FileSystem> HttpsFSServer<T> {
58    /// Starts a builder of a [HttpsFSServer] with an object implementing the [FileSystem](vfs::filesystem::FileSystem) trait.
59    pub fn builder(fs: T) -> HttpsFSServerBuilder<T> {
60        HttpsFSServerBuilder::new(fs)
61    }
62
63    fn new(
64        port: u16,
65        certs: &str,
66        private_key: &str,
67        file_system: T,
68        credential_validator: fn(user: &str, password: &str) -> bool,
69    ) -> Self {
70        // Initially i tried to store a hyper::server::Server object in HttpsFSServer.
71        // I failed, since this type is a very complicated generic and i could
72        // not figure out, how to write down the type.
73        // The type definition is:
74        //
75        // impl<I, IO, IE, S, E, B> Server<I, S, E>
76        //   where
77        //     I: Accept<Conn = IO, Error = IE>,
78        //     IE: Into<Box<dyn StdError + Send + Sync>>,
79        //     IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
80        //     S: MakeServiceRef<IO, Body, ResBody = B>,
81        //     S::Error: Into<Box<dyn StdError + Send + Sync>>,
82        //     B: HttpBody + Send + Sync + 'static,
83        //     B::Error: Into<Box<dyn StdError + Send + Sync>>,
84        //     E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
85        //     E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
86        //
87        // This makes this struct almost impossible to use in situation, where one can not
88        // rely on rust type inference system. Currently i consider this as bad API design.
89        let private_key = load_private_key(private_key).unwrap();
90        let certs = load_certs(certs).unwrap();
91        HttpsFSServer {
92            port,
93            certs,
94            private_key,
95            file_system: std::sync::Arc::new(std::sync::Mutex::new(file_system)),
96            client_data: std::sync::Arc::new(std::sync::Mutex::new(HashMap::new())),
97            credential_validator,
98        }
99    }
100
101    /// Start the server
102    #[tokio::main]
103    pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
104        let addr = format!("127.0.0.1:{}", self.port);
105        let fs = self.file_system.clone();
106        let cd = self.client_data.clone();
107        let cv = self.credential_validator;
108
109        let mut cfg = rustls::ServerConfig::new(rustls::NoClientAuth::new());
110        cfg.set_single_cert(self.certs.clone(), self.private_key.clone())
111            .map_err(|e| Error::new(ErrorKind::Other, format!("{}", e)))?;
112        cfg.set_protocols(&[b"http/2".to_vec(), b"http/1.1".to_vec()]);
113        let tls_conf = sync::Arc::new(cfg);
114
115        let tcp = TcpListener::bind(&addr).await?;
116        let tls_acceptor = TlsAcceptor::from(tls_conf);
117
118        let incoming_tls_stream = stream! {
119            loop {
120                let (socket, _) = tcp.accept().await?;
121                let stream = tls_acceptor.accept(socket);
122                let res = stream.await;
123                if let Err(e) = res {
124                    println!("TLS Error: {:?}", e);
125                    continue;
126                }
127                yield res;
128            }
129        };
130
131        // The next let statement is rather complicated:
132        // It is a variant of the [Factory method pattern](https://en.wikipedia.org/wiki/Factory_method_pattern)
133        // implemented by two closures. In this case, i named the first closure 'factory' and the
134        // second closure 'product' (see comments). This is needed, since 'hyper' serves each
135        // connection with a different instance of a service. Since we don't know, how many
136        // connections have to be served in the future, we give 'hyper' this factory and than it
137        // can create services on demand.  But our factory is not producing the service immediately.
138        // If we call our factory, it only creates an instruction book and the needed materials, so
139        // that we can build the service by ourself later. That means, we get a
140        // [future](https://docs.rs/futures/0.3.12/futures/) from our factory, which can be
141        // executed later to create our service. Even the service method is a future.
142        //
143        // The tricky part is, that a closure can be moved out of the current contest.
144        // Therefore, we can not borrow any values from the current context, since the values
145        // of the current context might have a shorter lifetime than our 'factory'. In this
146        // example, since we wait until the server finishes its execution in the same
147        // context ("server.await?;"). I'm not sure, whether the lifetime analysis of the rust
148        // does not under stand that or whether a 'static lifetime is required by some types
149        // provided by hyper.
150        // The result of this is, that we cannot have an object which implements FileSystem
151        // in the HttpsFSServer and than borrow it the factory and than to the service even
152        // if we know, that HttpsFSServer lives as long as the hyper instance.
153        //
154        // 'hyper' also forces us, to use types, which have implemented the 'Send' trait. Therefor
155        // we can not use a single-threaded reference count (std::rc:Rc) but have to use a
156        // thread save variant (std::sync::Arc) instead.
157        let service_factory = make_service_fn(
158            // factory closure
159            move |_| {
160                let fs = fs.clone();
161                let cd = cd.clone();
162                async move {
163                    // return a future (instruction book to create or)
164                    Ok::<_, Error>(service_fn(
165                        // product closure
166                        move |request| {
167                            let fs = fs.clone();
168                            let cd = cd.clone();
169                            HttpsFSServer::https_fs_service(fs, cd, cv, request)
170                        },
171                    ))
172                }
173            },
174        );
175
176        let server = Server::builder(HyperAcceptor {
177            acceptor: Box::pin(incoming_tls_stream),
178        })
179        .serve(service_factory);
180
181        println!("Starting to serve on https://{}.", addr);
182
183        server.await?;
184
185        Ok(())
186    }
187
188    async fn https_fs_service(
189        file_system: std::sync::Arc<std::sync::Mutex<T>>,
190        client_data: std::sync::Arc<std::sync::Mutex<HashMap<String, HttpsFSServerClientData>>>,
191        credential_validator: fn(user: &str, pass: &str) -> bool,
192        req: Request<Body>,
193    ) -> Result<Response<Body>, hyper::Error> {
194        // TODO: Separate Session, authorization and content handling in different methods.
195        let mut response = Response::new(Body::empty());
196
197        HttpsFSServer::<T>::clean_up_client_data(&client_data);
198        let sess_id = HttpsFSServer::<T>::get_session_id(&client_data, &req, &mut response);
199        let auth_res =
200            HttpsFSServer::<T>::try_auth(&client_data, &sess_id, &credential_validator, &req);
201        match auth_res {
202            Err(()) => {
203                *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
204                return Ok(response);
205            }
206            Ok(value) => {
207                if !value {
208                    *response.status_mut() = StatusCode::UNAUTHORIZED;
209                    response.headers_mut().insert(
210                        WWW_AUTHENTICATE,
211                        "Basic realm=\"PME\", charset=\"UTF-8\"".parse().unwrap(),
212                    );
213                    return Ok(response);
214                }
215            }
216        }
217
218        match (req.method(), req.uri().path()) {
219            (&Method::POST, "/") => {
220                let body = hyper::body::to_bytes(req.into_body()).await?;
221                let req: Result<Command, serde_json::Error> = serde_json::from_slice(&body);
222                //println!("Server request: {:?}", req);
223
224                match req {
225                    // TODO: Add more logging for debug
226                    Err(_) => *response.status_mut() = StatusCode::BAD_REQUEST,
227                    Ok(value) => {
228                        let res;
229                        {
230                            let file_system = file_system.lock().unwrap();
231                            res = HttpsFSServer::<T>::handle_command(&value, &*file_system);
232                        }
233                        let res = serde_json::to_string(&res);
234                        //println!("Server response: {:?}", res);
235                        match res {
236                            // TODO: Add more logging for debug
237                            Err(_) => *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR,
238                            Ok(value) => *response.body_mut() = Body::from(value),
239                        }
240                    }
241                }
242            }
243            _ => {
244                *response.status_mut() = StatusCode::NOT_FOUND;
245            }
246        };
247        Ok(response)
248    }
249
250    fn handle_command(command: &Command, file_system: &dyn FileSystem) -> CommandResponse {
251        match command {
252            Command::Exists(param) => CommandResponse::Exists({
253                file_system
254                    .exists(&param.path)
255                    .map_err(CommandResponseError::from)
256            }),
257            Command::Metadata(param) => CommandResponse::Metadata(meta_res_convert_vfs_cmd(
258                file_system.metadata(&param.path),
259            )),
260            Command::CreateFile(param) => CommandResponse::CreateFile(
261                CommandResponseCreateFile::from(file_system.create_file(&param.path)),
262            ),
263            Command::RemoveFile(param) => CommandResponse::RemoveFile({
264                file_system
265                    .remove_file(&param.path)
266                    .map_err(CommandResponseError::from)
267            }),
268            Command::Write(param) => {
269                CommandResponse::Write(HttpsFSServer::<T>::write(&param, file_system))
270            }
271            Command::Read(param) => {
272                CommandResponse::Read(HttpsFSServer::<T>::read(&param, file_system))
273            }
274            Command::CreateDir(param) => CommandResponse::CreateDir(
275                CommandResponseCreateDir::from(file_system.create_dir(&param.path)),
276            ),
277            Command::ReadDir(param) => CommandResponse::ReadDir(CommandResponseReadDir::from(
278                file_system.read_dir(&param.path),
279            )),
280            Command::RemoveDir(param) => CommandResponse::RemoveDir(
281                file_system
282                    .remove_dir(&param.path)
283                    .map_err(CommandResponseError::from),
284            ),
285        }
286    }
287
288    fn write(
289        cmd: &CommandWrite,
290        file_system: &dyn FileSystem,
291    ) -> Result<usize, CommandResponseError> {
292        let mut file = file_system.append_file(&cmd.path)?;
293        let data = base64::decode(&cmd.data);
294        if let Err(e) = data {
295            return Err(CommandResponseError::Other {
296                message: format!("Faild to decode data: {:?}", e),
297            });
298        }
299        let data = data.unwrap();
300        Ok(file.write(&data)?)
301    }
302
303    fn read(
304        cmd: &CommandRead,
305        file_system: &dyn FileSystem,
306    ) -> Result<(usize, String), CommandResponseError> {
307        let mut file = file_system.open_file(&cmd.path)?;
308
309        let mut data: Vec<u8> = vec![0; cmd.len as usize];
310
311        let seek_res = file.seek(std::io::SeekFrom::Start(cmd.pos));
312        if let Err(e) = seek_res {
313            return Err(CommandResponseError::IoError(format!("{:?}", e)));
314        }
315
316        let len = file.read(data.as_mut_slice())?;
317        let data = base64::encode(&mut data.as_mut_slice()[..len]);
318
319        Ok((len, data))
320    }
321
322    fn clean_up_client_data(
323        client_data: &std::sync::Arc<std::sync::Mutex<HashMap<String, HttpsFSServerClientData>>>,
324    ) {
325        let mut client_data = client_data.lock().unwrap();
326        let now = Local::now();
327        let dur = chrono::Duration::minutes(15);
328        let mut dummy = HashMap::new();
329
330        std::mem::swap(&mut *client_data, &mut dummy);
331
332        dummy = dummy
333            .into_iter()
334            .filter(|(_, v)| (now - v.last_use) <= dur)
335            .collect();
336
337        std::mem::swap(&mut *client_data, &mut dummy);
338    }
339
340    fn get_session_id(
341        client_data: &std::sync::Arc<std::sync::Mutex<HashMap<String, HttpsFSServerClientData>>>,
342        request: &Request<Body>,
343        response: &mut Response<Body>,
344    ) -> String {
345        let mut sess_id = String::new();
346        let headers = request.headers();
347        if headers.contains_key(COOKIE) {
348            // session is already established
349            let cookie = headers[COOKIE].as_bytes();
350            if cookie.starts_with(b"session=") {
351                sess_id = match cookie.get("session=".len()..) {
352                    None => String::new(),
353                    Some(value) => match std::str::from_utf8(value) {
354                        Err(_) => String::new(),
355                        Ok(value) => String::from(value),
356                    },
357                };
358                let mut client_data = client_data.lock().unwrap();
359                match client_data.get_mut(&sess_id) {
360                    // we didn't found the session id in our database,
361                    // therefore we delete the id and a new one will be created.
362                    None => sess_id = String::new(),
363                    Some(value) => value.last_use = Local::now(),
364                };
365            }
366        }
367
368        if sess_id.is_empty() {
369            let mut client_data = client_data.lock().unwrap();
370            while sess_id.is_empty() || client_data.contains_key(&sess_id) {
371                let mut sess_id_raw = [0_u8; 30];
372                let mut rng = thread_rng();
373                for x in &mut sess_id_raw {
374                    *x = rng.gen();
375                }
376                // to ensure, that session id is printable
377                sess_id = base64::encode(sess_id_raw);
378            }
379            let cookie = format!("session={}", sess_id);
380            response
381                .headers_mut()
382                .insert(SET_COOKIE, cookie.parse().unwrap());
383            client_data.insert(sess_id.clone(), HttpsFSServerClientData::new());
384        }
385
386        sess_id
387    }
388
389    fn try_auth(
390        client_data: &std::sync::Arc<std::sync::Mutex<HashMap<String, HttpsFSServerClientData>>>,
391        sess_id: &str,
392        credential_validator: &fn(user: &str, pass: &str) -> bool,
393        request: &Request<Body>,
394    ) -> Result<bool, ()> {
395        let mut client_data = client_data.lock().unwrap();
396        let sess_data = client_data.get_mut(sess_id);
397        if sess_data.is_none() {
398            return Err(());
399        }
400        let sess_data = sess_data.unwrap();
401
402        // try to authenticate client
403        if !sess_data.authorized {
404            let headers = request.headers();
405            let auth = headers.get(AUTHORIZATION);
406            if auth.is_none() {
407                return Ok(false);
408            }
409            let auth = auth.unwrap().to_str();
410            if auth.is_err() {
411                return Ok(false);
412            }
413            let auth = auth.unwrap();
414            let starts = "Basic ";
415            if !auth.starts_with(starts) {
416                return Ok(false);
417            }
418            let auth = base64::decode(&auth[starts.len()..]);
419            if auth.is_err() {
420                return Ok(false);
421            }
422            let auth = auth.unwrap();
423            let auth = String::from_utf8(auth);
424            if auth.is_err() {
425                return Ok(false);
426            }
427            let auth = auth.unwrap();
428            let mut auth_it = auth.split(':');
429            let username = auth_it.next();
430            if username.is_none() {
431                return Ok(false);
432            }
433            let username = username.unwrap();
434            let pass = auth_it.next();
435            if pass.is_none() {
436                return Ok(false);
437            }
438            let pass = pass.unwrap();
439            if credential_validator(username, pass) {
440                sess_data.authorized = true;
441            }
442        }
443
444        // if not authenticated, than inform client about it.
445        if sess_data.authorized {
446            return Ok(true);
447        }
448
449        Ok(false)
450    }
451}
452
453impl<T: FileSystem> HttpsFSServerBuilder<T> {
454    /// Creates a new builder for a [HttpsFSServer].
455    ///
456    /// Takes a FileSystem as argument, which will exposed via HTTPS.
457    pub fn new(fs: T) -> Self {
458        HttpsFSServerBuilder {
459            port: 443,
460            certs: None,
461            private_key: None,
462            file_system: fs,
463            credential_validator: None,
464        }
465    }
466
467    /// Sets the port on which the server will listen.
468    pub fn set_port(mut self, port: u16) -> Self {
469        self.port = port;
470        self
471    }
472
473
474    /// Loads a private key from file.
475    ///
476    /// The argument 'private_key' is the path to the file containing the private key.
477    pub fn load_private_key(mut self, private_key: &str) -> Self {
478        self.private_key = Some(String::from(private_key));
479        self
480    }
481
482    /// Loads the certificate from a file.
483    ///
484    /// The argument 'certs' is the path to the file containing a certificate.
485    pub fn load_certificates(mut self, certs: &str) -> Self {
486        self.certs = Some(String::from(certs));
487        self
488    }
489
490    /// Sets a function, which serves as a credential validator.
491    pub fn set_credential_validator(
492        mut self,
493        credential_validator: fn(user: &str, password: &str) -> bool,
494    ) -> Self {
495        self.credential_validator = Some(credential_validator);
496        self
497    }
498
499    /// Starts listening on the configured port.
500    ///
501    /// # Panics
502    ///
503    /// This function panics if one of the following conditions is fulfilled.
504    /// - no certificate was set
505    /// - no private key was set
506    /// - no credential validator was not set
507    pub fn run(self) -> HttpsFSResult<()> {
508        if self.certs.is_none() {
509            panic!("Certificate file was not set. Use set_certificates().");
510        }
511        if self.private_key.is_none() {
512            panic!("Private key file was not set. Use set_private_key().");
513        }
514        if self.credential_validator.is_none() {
515            panic!("Credential validator was not set. Use set_credential_validator().");
516        }
517        let mut server = HttpsFSServer::new(
518            self.port,
519            &self.certs.unwrap(),
520            &self.private_key.unwrap(),
521            self.file_system,
522            self.credential_validator.unwrap(),
523        );
524        let res = server.run();
525        match res {
526            Err(e) => Err(HttpsFSError::Other {
527                message: e.to_string(),
528            }),
529            Ok(()) => Ok(()),
530        }
531    }
532}
533
534struct HyperAcceptor {
535    acceptor: Pin<Box<dyn Stream<Item = Result<TlsStream<TcpStream>, Error>>>>,
536}
537
538impl hyper::server::accept::Accept for HyperAcceptor {
539    type Conn = TlsStream<TcpStream>;
540    type Error = Error;
541
542    fn poll_accept(
543        mut self: Pin<&mut Self>,
544        cx: &mut Context,
545    ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
546        Pin::new(&mut self.acceptor).poll_next(cx)
547    }
548}
549
550/// Load public certificate from file
551fn load_certs(filename: &str) -> std::io::Result<Vec<rustls::Certificate>> {
552    // Open certificate file
553    let cert_file = std::fs::File::open(filename).map_err(|e| {
554        Error::new(
555            ErrorKind::Other,
556            format!("faild to open {}: {}", filename, e),
557        )
558    })?;
559    let mut cert_reader = std::io::BufReader::new(cert_file);
560    pemfile::certs(&mut cert_reader)
561        .map_err(|_| Error::new(ErrorKind::Other, "faild to load certificate"))
562}
563
564/// Load private key from file
565fn load_private_key(filename: &str) -> std::io::Result<rustls::PrivateKey> {
566    // Open keyfile
567    let key_file = std::fs::File::open(filename).map_err(|e| {
568        Error::new(
569            ErrorKind::Other,
570            format!("faild to open {}: {}", filename, e),
571        )
572    })?;
573    let mut key_reader = std::io::BufReader::new(key_file);
574
575    // Load and return a single private key
576    let keys = pemfile::pkcs8_private_keys(&mut key_reader)
577        .map_err(|_| Error::new(ErrorKind::Other, "failed to load private pkcs8 key"))?;
578    if keys.len() == 1 {
579        return Ok(keys[0].clone());
580    }
581
582    let keys = pemfile::rsa_private_keys(&mut key_reader)
583        .map_err(|_| Error::new(ErrorKind::Other, "failed to load private rsa key"))?;
584    if keys.len() != 1 {
585        println!("len: {}", keys.len());
586        return Err(Error::new(
587            ErrorKind::Other,
588            "expected a single private key",
589        ));
590    }
591    Ok(keys[0].clone())
592}