rhombus/
local_upload_provider.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
use std::{io, path::PathBuf, sync::Arc};

use axum::{
    body::Bytes,
    routing::{get, post},
    Router,
};
use futures::{Stream, TryStreamExt};
use rand::{
    distributions::{Alphanumeric, DistString},
    thread_rng,
};
use tokio::{fs::File, io::BufWriter};
use tokio_util::io::StreamReader;

use crate::{
    errors::RhombusError,
    internal::{
        local_upload_provider::{route_local_download, slice_to_hex_string, HashRead},
        upload_provider::route_upload_file,
    },
    upload_provider::UploadProvider,
    Result,
};

#[derive(Clone)]
pub struct LocalUploadProvider {
    pub base_path: PathBuf,
}

impl LocalUploadProvider {
    pub fn new(base_filepath: PathBuf) -> LocalUploadProvider {
        LocalUploadProvider {
            base_path: base_filepath,
        }
    }
}

impl UploadProvider for LocalUploadProvider {
    fn routes(&self) -> Result<Router> {
        let provider_state = Arc::new(self.clone());
        let router = Router::new()
            .route("/uploads/:hash_filename", get(route_local_download))
            .route("/upload/:path", post(route_upload_file::<Self>))
            .with_state(provider_state);
        Ok(router)
    }

    async fn upload<S, E>(&self, filename: &str, stream: S) -> Result<String>
    where
        S: Stream<Item = std::result::Result<Bytes, E>> + Send,
        E: Into<axum::BoxError>,
    {
        async {
            let body_with_io_error =
                stream.map_err(|err| io::Error::new(io::ErrorKind::Other, err));
            let body_reader = StreamReader::new(body_with_io_error);
            futures::pin_mut!(body_reader);

            let mut src = HashRead::new(body_reader);

            let base_path = std::path::Path::new(&self.base_path);
            tokio::fs::create_dir_all(&base_path).await?;

            let temp_name = format!("{}.tmp", Alphanumeric.sample_string(&mut thread_rng(), 60));
            let filepath = base_path.join(temp_name);
            let mut file = BufWriter::new(File::create(&filepath).await?);

            tokio::io::copy(&mut src, &mut file).await?;
            let hash = slice_to_hex_string(&src.hash());
            let new_filename = format!("{}-{}", hash, filename);

            let new_filepath = std::path::Path::new(&self.base_path).join(&new_filename);
            tokio::fs::rename(&filepath, &new_filepath).await?;

            Ok::<_, io::Error>(format!("/uploads/{}", &new_filename))
        }
        .await
        .map_err(|_| RhombusError::Unknown())
    }
}