cooklang_sync_server/chunks/
mod.rs

1use multer::Multipart;
2use rocket::data::{Data, Limits};
3use rocket::fairing::AdHoc;
4use rocket::response::content::RawText;
5use rocket::tokio::fs::{self, create_dir_all};
6use rocket::tokio::io::AsyncWriteExt;
7
8use rocket::async_stream::stream;
9
10use rocket::futures::Stream;
11use rocket::http::{ContentType, Header};
12use rocket::tokio::fs::File;
13
14use crate::auth::user::User;
15use crate::chunk_id::ChunkId;
16
17mod request;
18mod response;
19
20use crate::chunks::request::RawContentType;
21
22const EMPTY_CHUNK_ID: ChunkId = ChunkId(std::borrow::Cow::Borrowed(""));
23
24#[post("/", format = "multipart/form-data", data = "<upload>")]
25async fn upload_chunks_deprecated(
26    _user: User,
27    content_type: RawContentType<'_>,
28    limits: &Limits,
29    upload: Data<'_>,
30) -> std::io::Result<()> {
31    let boundary = multer::parse_boundary(content_type.0).unwrap();
32    let upload_stream = upload.open(limits.get("data-form").unwrap());
33    let mut multipart = Multipart::new(tokio_util::io::ReaderStream::new(upload_stream), boundary);
34
35    while let Ok(Some(mut field)) = multipart.next_field().await {
36        let field_name = field.name().unwrap();
37        let chunk_id = ChunkId::from(field_name);
38
39        if chunk_id == EMPTY_CHUNK_ID {
40            continue;
41        }
42
43        let full_path = chunk_id.file_path();
44
45        if let Some(parent) = full_path.parent() {
46            create_dir_all(parent).await?;
47        }
48
49        let mut file = File::create(full_path.clone()).await?;
50
51        while let Some(chunk) = match field.chunk().await {
52            Ok(v) => v,
53            Err(_e) => {
54                fs::remove_file(&full_path).await.ok();
55
56                // TODO
57                panic!("Error reading chunk");
58            }
59        } {
60            file.write_all(&chunk).await.map_err(|_| {
61                std::fs::remove_file(&full_path).ok();
62            });
63        }
64    }
65
66    Ok(())
67}
68
69#[post("/upload", format = "multipart/form-data", data = "<upload>")]
70async fn upload_chunks(
71    _user: User,
72    content_type: RawContentType<'_>,
73    limits: &Limits,
74    upload: Data<'_>,
75) -> std::io::Result<()> {
76    let boundary = multer::parse_boundary(content_type.0).unwrap();
77    let upload_stream = upload.open(limits.get("data-form").unwrap());
78    let mut multipart = Multipart::new(tokio_util::io::ReaderStream::new(upload_stream), boundary);
79
80    while let Ok(Some(mut field)) = multipart.next_field().await {
81        let field_name = field.name().unwrap();
82        let chunk_id = ChunkId::from(field_name);
83
84        if chunk_id == EMPTY_CHUNK_ID {
85            continue;
86        }
87
88        let full_path = chunk_id.file_path();
89
90        if let Some(parent) = full_path.parent() {
91            create_dir_all(parent).await?;
92        }
93
94        let mut file = File::create(full_path.clone()).await?;
95
96        while let Some(chunk) = match field.chunk().await {
97            Ok(v) => v,
98            Err(e) => {
99                error!("Error reading chunk {:?}", e);
100                fs::remove_file(&full_path).await.ok();
101
102                // TODO
103                panic!("Error reading chunk");
104            }
105        } {
106            file.write_all(&chunk).await.map_err(|_| {
107                std::fs::remove_file(&full_path).ok();
108            });
109        }
110    }
111
112    Ok(())
113}
114
115/// Downloads chunk from a storage
116// TODO batch download
117// TODO does it need to check that user can access chunk?
118#[get("/<id>")]
119async fn retrieve(_user: User, id: ChunkId<'_>) -> Option<RawText<File>> {
120    if id == EMPTY_CHUNK_ID {
121        None
122    } else {
123        File::open(id.file_path()).await.map(RawText).ok()
124    }
125}
126
127use rocket::form::Form;
128
129use rocket_multipart::{MultipartSection, MultipartStream};
130
131#[derive(FromForm, Debug)]
132struct ChunkIds<'a>(Vec<ChunkId<'a>>);
133
134#[post(
135    "/download",
136    format = "application/x-www-form-urlencoded",
137    data = "<chunk_ids>"
138)]
139async fn download_chunks(
140    chunk_ids: Form<ChunkIds<'_>>,
141) -> MultipartStream<impl Stream<Item = MultipartSection<'_>>> {
142    let cloned_chunk_ids: Vec<_> = chunk_ids.0.iter().map(|chunk_id| {
143        let id = chunk_id.id().to_string(); // Assuming id is of type &str
144        let file_path = chunk_id.file_path(); // Assuming file_path is of type &str
145        (id, file_path)
146    }).collect();
147
148    MultipartStream::new_random(stream! {
149        for (id, file_path) in cloned_chunk_ids {
150            let file = File::open(file_path).await.expect("file present");
151
152            let section = MultipartSection::new(file)
153                .add_header(ContentType::Text)
154                .add_header(Header::new("X-Chunk-ID", id));
155
156            yield section
157        }
158    })
159}
160
161pub fn stage() -> AdHoc {
162    AdHoc::on_ignite("Chunk Server Stage", |rocket| async {
163        rocket.mount(
164            "/chunks",
165            routes![
166                upload_chunks,
167                upload_chunks_deprecated,
168                retrieve,
169                download_chunks
170            ],
171        )
172    })
173}