cooklang_sync_server/chunks/
mod.rs1use multer::Multipart;
2use rocket::data::{Data, Limits};
3use rocket::fairing::AdHoc;
4use rocket::response::content::RawText;
5use rocket::tokio::fs::{self, create_dir_all};
6use rocket::tokio::io::AsyncWriteExt;
7
8use rocket::async_stream::stream;
9
10use rocket::futures::Stream;
11use rocket::http::{ContentType, Header};
12use rocket::tokio::fs::File;
13
14use crate::auth::user::User;
15use crate::chunk_id::ChunkId;
16
17mod request;
18mod response;
19
20use crate::chunks::request::RawContentType;
21
22const EMPTY_CHUNK_ID: ChunkId = ChunkId(std::borrow::Cow::Borrowed(""));
23
24#[post("/", format = "multipart/form-data", data = "<upload>")]
25async fn upload_chunks_deprecated(
26 _user: User,
27 content_type: RawContentType<'_>,
28 limits: &Limits,
29 upload: Data<'_>,
30) -> std::io::Result<()> {
31 let boundary = multer::parse_boundary(content_type.0).unwrap();
32 let upload_stream = upload.open(limits.get("data-form").unwrap());
33 let mut multipart = Multipart::new(tokio_util::io::ReaderStream::new(upload_stream), boundary);
34
35 while let Ok(Some(mut field)) = multipart.next_field().await {
36 let field_name = field.name().unwrap();
37 let chunk_id = ChunkId::from(field_name);
38
39 if chunk_id == EMPTY_CHUNK_ID {
40 continue;
41 }
42
43 let full_path = chunk_id.file_path();
44
45 if let Some(parent) = full_path.parent() {
46 create_dir_all(parent).await?;
47 }
48
49 let mut file = File::create(full_path.clone()).await?;
50
51 while let Some(chunk) = match field.chunk().await {
52 Ok(v) => v,
53 Err(_e) => {
54 fs::remove_file(&full_path).await.ok();
55
56 panic!("Error reading chunk");
58 }
59 } {
60 file.write_all(&chunk).await.map_err(|_| {
61 std::fs::remove_file(&full_path).ok();
62 });
63 }
64 }
65
66 Ok(())
67}
68
69#[post("/upload", format = "multipart/form-data", data = "<upload>")]
70async fn upload_chunks(
71 _user: User,
72 content_type: RawContentType<'_>,
73 limits: &Limits,
74 upload: Data<'_>,
75) -> std::io::Result<()> {
76 let boundary = multer::parse_boundary(content_type.0).unwrap();
77 let upload_stream = upload.open(limits.get("data-form").unwrap());
78 let mut multipart = Multipart::new(tokio_util::io::ReaderStream::new(upload_stream), boundary);
79
80 while let Ok(Some(mut field)) = multipart.next_field().await {
81 let field_name = field.name().unwrap();
82 let chunk_id = ChunkId::from(field_name);
83
84 if chunk_id == EMPTY_CHUNK_ID {
85 continue;
86 }
87
88 let full_path = chunk_id.file_path();
89
90 if let Some(parent) = full_path.parent() {
91 create_dir_all(parent).await?;
92 }
93
94 let mut file = File::create(full_path.clone()).await?;
95
96 while let Some(chunk) = match field.chunk().await {
97 Ok(v) => v,
98 Err(e) => {
99 error!("Error reading chunk {:?}", e);
100 fs::remove_file(&full_path).await.ok();
101
102 panic!("Error reading chunk");
104 }
105 } {
106 file.write_all(&chunk).await.map_err(|_| {
107 std::fs::remove_file(&full_path).ok();
108 });
109 }
110 }
111
112 Ok(())
113}
114
115#[get("/<id>")]
119async fn retrieve(_user: User, id: ChunkId<'_>) -> Option<RawText<File>> {
120 if id == EMPTY_CHUNK_ID {
121 None
122 } else {
123 File::open(id.file_path()).await.map(RawText).ok()
124 }
125}
126
127use rocket::form::Form;
128
129use rocket_multipart::{MultipartSection, MultipartStream};
130
131#[derive(FromForm, Debug)]
132struct ChunkIds<'a>(Vec<ChunkId<'a>>);
133
134#[post(
135 "/download",
136 format = "application/x-www-form-urlencoded",
137 data = "<chunk_ids>"
138)]
139async fn download_chunks(
140 chunk_ids: Form<ChunkIds<'_>>,
141) -> MultipartStream<impl Stream<Item = MultipartSection<'_>>> {
142 let cloned_chunk_ids: Vec<_> = chunk_ids.0.iter().map(|chunk_id| {
143 let id = chunk_id.id().to_string(); let file_path = chunk_id.file_path(); (id, file_path)
146 }).collect();
147
148 MultipartStream::new_random(stream! {
149 for (id, file_path) in cloned_chunk_ids {
150 let file = File::open(file_path).await.expect("file present");
151
152 let section = MultipartSection::new(file)
153 .add_header(ContentType::Text)
154 .add_header(Header::new("X-Chunk-ID", id));
155
156 yield section
157 }
158 })
159}
160
161pub fn stage() -> AdHoc {
162 AdHoc::on_ignite("Chunk Server Stage", |rocket| async {
163 rocket.mount(
164 "/chunks",
165 routes![
166 upload_chunks,
167 upload_chunks_deprecated,
168 retrieve,
169 download_chunks
170 ],
171 )
172 })
173}