1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
use multer::Multipart;
use rocket::data::{Data, Limits};
use rocket::fairing::AdHoc;
use rocket::response::content::RawText;
use rocket::tokio::fs::{self, create_dir_all};
use rocket::tokio::io::{AsyncWriteExt};

use rocket::async_stream::stream;


use rocket::futures::stream::{StreamExt};
use rocket::http::{ContentType};
use rocket::futures::Stream;
use rocket::tokio::fs::File;



use crate::auth::user::User;
use crate::chunk_id::ChunkId;

mod request;
mod response;

use crate::chunks::request::RawContentType;

const EMPTY_CHUNK_ID: ChunkId = ChunkId(std::borrow::Cow::Borrowed(""));

#[post("/", format = "multipart/form-data", data = "<upload>")]
async fn upload_chunks_deprecated(
    _user: User,
    content_type: RawContentType<'_>,
    limits: &Limits,
    upload: Data<'_>,
) -> std::io::Result<()> {
    let boundary = multer::parse_boundary(content_type.0).unwrap();
    let upload_stream = upload.open(limits.get("data-form").unwrap());
    let mut multipart = Multipart::new(tokio_util::io::ReaderStream::new(upload_stream), boundary);

    while let Ok(Some(mut field)) = multipart.next_field().await {
        let field_name = field.name().unwrap();
        let chunk_id = ChunkId::from(field_name);

        if chunk_id == EMPTY_CHUNK_ID {
            continue;
        }

        let full_path = chunk_id.file_path();

        if let Some(parent) = full_path.parent() {
            create_dir_all(parent).await?;
        }

        let mut file = File::create(full_path.clone()).await?;

        while let Some(chunk) = match field.chunk().await {
            Ok(v) => v,
            Err(_e) => {
                fs::remove_file(&full_path).await.ok();

                // TODO
                panic!("Error reading chunk");
            }
        } {
            file.write_all(&chunk).await.map_err(|_| {
                std::fs::remove_file(&full_path).ok();
            });
        }
    }

    Ok(())
}


#[post("/upload", format = "multipart/form-data", data = "<upload>")]
async fn upload_chunks(
    _user: User,
    content_type: RawContentType<'_>,
    limits: &Limits,
    upload: Data<'_>,
) -> std::io::Result<()> {
    let boundary = multer::parse_boundary(content_type.0).unwrap();
    let upload_stream = upload.open(limits.get("data-form").unwrap());
    let mut multipart = Multipart::new(tokio_util::io::ReaderStream::new(upload_stream), boundary);

    while let Ok(Some(mut field)) = multipart.next_field().await {
        let field_name = field.name().unwrap();
        let chunk_id = ChunkId::from(field_name);

        if chunk_id == EMPTY_CHUNK_ID {
            continue;
        }

        let full_path = chunk_id.file_path();

        if let Some(parent) = full_path.parent() {
            create_dir_all(parent).await?;
        }

        let mut file = File::create(full_path.clone()).await?;

        while let Some(chunk) = match field.chunk().await {
            Ok(v) => v,
            Err(_e) => {
                fs::remove_file(&full_path).await.ok();

                // TODO
                panic!("Error reading chunk");
            }
        } {
            file.write_all(&chunk).await.map_err(|_| {
                std::fs::remove_file(&full_path).ok();
            });
        }
    }

    Ok(())
}

/// Downloads chunk from a storage
// TODO batch download
// TODO does it need to check that user can access chunk?
#[get("/<id>")]
async fn retrieve(_user: User, id: ChunkId<'_>) -> Option<RawText<File>> {
    if id == EMPTY_CHUNK_ID {
        None
    } else {
        File::open(id.file_path()).await.map(RawText).ok()
    }
}

use rocket::form::Form;

use rocket_multipart::{MultipartStream, MultipartSection};



#[derive(FromForm, Debug)]
struct ChunkIds<'a>(Vec<ChunkId<'a>>);

#[post("/download", format = "application/x-www-form-urlencoded", data = "<chunk_ids>")]
async fn download_chunks(chunk_ids: Form<ChunkIds<'_>>) -> MultipartStream<impl Stream<Item = MultipartSection<'_>>> {

    MultipartStream::new_random(
            stream! {
                for chunk_id in &chunk_ids.0 {
                    let file = File::open(chunk_id.file_path()).await.expect("jojo");

                    yield MultipartSection {
                        content_type: Some(ContentType::Text),
                        content_encoding: None,
                        content: Box::pin(file)
                    };
                }
            },
        )
}

pub fn stage() -> AdHoc {
    AdHoc::on_ignite("Chunk Server Stage", |rocket| async {
        rocket.mount("/chunks", routes![upload_chunks, upload_chunks_deprecated, retrieve, download_chunks])
    })
}