1use actix_files::NamedFile;
2use actix_web::{Error, HttpResponse, Result, HttpRequest};
3use actix_multipart::Multipart;
4use actix_web::http::{header, Method};
5use futures_util::{StreamExt, TryStreamExt};
6use std::path::Path;
7use tokio::fs::File;
8use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, BufWriter}; use bytes::Bytes;
10use brotli::CompressorWriter;
11use tokio_util::io::ReaderStream;
12use lz4_flex::{compress_prepend_size};
14
15#[inline]
19pub fn http_ok_static(msg: &'static str) -> HttpResponse {
20 HttpResponse::Ok()
21 .content_type("text/plain")
22 .body(msg)
23}
24
25#[inline]
27pub fn http_ok_stream(stream: impl StreamExt<Item = Result<Bytes, Error>> + 'static) -> HttpResponse {
28 HttpResponse::Ok()
29 .content_type("application/octet-stream")
30 .streaming(stream)
31}
32
33#[inline]
35pub fn http_no_content() -> HttpResponse {
36 HttpResponse::NoContent().finish()
37}
38
39#[inline]
41pub fn http_created(location: &str) -> HttpResponse {
42 HttpResponse::Created()
43 .insert_header((header::LOCATION, location))
44 .finish()
45}
46
47#[inline]
49pub fn http_accepted() -> HttpResponse {
50 HttpResponse::Accepted().finish()
51}
52
53#[inline]
55pub fn http_partial_content(data: Bytes, range: &str, total_len: u64) -> HttpResponse {
56 HttpResponse::PartialContent()
57 .insert_header((header::CONTENT_RANGE, format!("bytes {}/{}", range, total_len)))
58 .content_type("application/octet-stream")
59 .body(data)
60}
61
62#[inline]
64pub fn http_bad_static(msg: &'static str) -> HttpResponse {
65 HttpResponse::BadRequest()
66 .content_type("text/plain")
67 .body(msg)
68}
69
70#[inline]
72pub fn http_unauthorized(realm: &str) -> HttpResponse {
73 HttpResponse::Unauthorized()
74 .insert_header((header::WWW_AUTHENTICATE, format!("Bearer realm=\"{}\"", realm)))
75 .finish()
76}
77
78#[inline]
80pub fn http_forbidden(msg: &str) -> HttpResponse {
81 HttpResponse::Forbidden().body(msg.to_string())
82}
83
84#[inline]
86pub fn http_not_found(msg: &str) -> HttpResponse {
87 HttpResponse::NotFound().body(msg.to_string())
88}
89
90#[inline]
92pub fn http_method_not_allowed(allowed_methods: &[Method]) -> HttpResponse {
93 let methods = allowed_methods.iter()
94 .map(|m| m.as_str())
95 .collect::<Vec<_>>()
96 .join(", ");
97
98 HttpResponse::MethodNotAllowed()
99 .insert_header((header::ALLOW, methods))
100 .finish()
101}
102
103#[inline]
105pub fn http_conflict(msg: &str) -> HttpResponse {
106 HttpResponse::Conflict().body(msg.to_string())
107}
108
109#[inline]
111pub fn http_unsupported_media(msg: &str) -> HttpResponse {
112 HttpResponse::UnsupportedMediaType().body(msg.to_string())
113}
114
115#[inline]
117pub fn http_too_many_requests(retry_after_secs: u64) -> HttpResponse {
118 HttpResponse::TooManyRequests()
119 .insert_header((header::RETRY_AFTER, retry_after_secs))
120 .finish()
121}
122
123#[inline]
125pub fn http_server_error(msg: &str) -> HttpResponse {
126 HttpResponse::InternalServerError().body(msg.to_string())
127}
128
129#[inline]
131pub fn http_service_unavailable(retry_after_secs: u64) -> HttpResponse {
132 HttpResponse::ServiceUnavailable()
133 .insert_header((header::RETRY_AFTER, retry_after_secs))
134 .finish()
135}
136
137pub async fn send_file_fast(path: &str, req: &HttpRequest) -> Result<NamedFile> {
141 let file = NamedFile::open_async(path).await?;
142
143 if let Some(accept_encoding) = req.headers().get(header::ACCEPT_ENCODING) {
144 if accept_encoding.to_str().unwrap_or("").contains("br") {
145 let br_path = format!("{}.br", path);
146 if Path::new(&br_path).exists() {
147 return Ok(NamedFile::open_async(br_path).await?
149 .set_content_encoding(header::ContentEncoding::Brotli));
150 }
151 }
152 }
153
154 Ok(file.use_etag(true).use_last_modified(true))
155}
156
157pub async fn stream_file_chunked(path: &str, chunk_size: usize) -> Result<HttpResponse, Error> {
160 let file = File::open(path).await?;
161
162 let stream = ReaderStream::with_capacity(file, chunk_size)
165 .map_err(|e| actix_web::error::ErrorInternalServerError(e));
167
168 Ok(HttpResponse::Ok()
169 .content_type("application/octet-stream")
170 .streaming(stream))
171}
172
173pub async fn send_file_range(path: &str, range: Option<&str>) -> Result<HttpResponse, Error> {
175 let file = tokio::fs::File::open(path).await?;
176 let metadata = file.metadata().await?;
177 let file_size = metadata.len();
178
179 if let Some(range_str) = range {
180 if let Some((start, end)) = parse_range(range_str, file_size) {
181 let len = end - start + 1;
182 let mut buf = vec![0u8; len as usize];
184
185 let mut reader = tokio::io::BufReader::new(file);
186 reader.seek(std::io::SeekFrom::Start(start)).await?;
187 reader.read_exact(&mut buf).await?;
188
189 return Ok(HttpResponse::PartialContent()
190 .insert_header((header::CONTENT_RANGE, format!("bytes {}-{}/{}", start, end, file_size)))
191 .insert_header((header::CONTENT_LENGTH, len))
192 .content_type("application/octet-stream")
193 .body(buf));
194 }
195 }
196
197 Ok(HttpResponse::Ok()
198 .insert_header((header::CONTENT_LENGTH, file_size))
199 .content_type("application/octet-stream")
200 .body(())
201 )
202}
203
204pub fn http_brotli(data: &[u8], quality: u32) -> HttpResponse {
208 let mut writer = CompressorWriter::new(Vec::with_capacity(data.len() / 2), 4096, quality as u32, 22);
211 use std::io::Write;
212 writer.write_all(data).unwrap();
213 writer.flush().unwrap();
214
215 let compressed = writer.into_inner();
216
217 HttpResponse::Ok()
218 .insert_header((header::CONTENT_ENCODING, "br"))
219 .content_type("application/octet-stream")
220 .body(compressed)
221}
222
223pub fn http_lz4(data: &[u8]) -> HttpResponse {
225 let compressed = compress_prepend_size(data);
227
228 HttpResponse::Ok()
229 .insert_header((header::CONTENT_ENCODING, "lz4"))
230 .content_type("application/octet-stream")
231 .body(compressed)
232}
233
234pub async fn parse_multipart_stream<F>(mut payload: Multipart, mut handler: F) -> Result<Vec<Bytes>, Error>
238where
239 F: FnMut(String, Bytes) -> Result<(), Error>,
240{
241 let mut results = Vec::new();
242
243 while let Ok(Some(mut field)) = payload.try_next().await {
244 let name = field.name().map_or("unknown", |n| n).to_string();
245 let mut data = Vec::new();
246
247 while let Some(chunk) = field.next().await {
248 let chunk = chunk?;
249 data.extend_from_slice(&chunk);
250 handler(name.clone(), chunk)?;
251 }
252
253 results.push(Bytes::from(data));
254 }
255
256 Ok(results)
257}
258
259pub fn parse_json_fast<T: serde::de::DeserializeOwned>(data: &Bytes) -> Result<T, Error> {
261 serde_json::from_slice(data).map_err(|e| actix_web::error::ErrorBadRequest(e))
262}
263
264#[inline]
267pub fn parse_range(range_str: &str, file_size: u64) -> Option<(u64, u64)> {
268 let range_str = range_str.trim_start_matches("bytes=");
269 let parts: Vec<&str> = range_str.split('-').collect();
270
271 if parts.len() != 2 {
272 return None;
273 }
274
275 let start = parts[0].parse::<u64>().ok()?;
276 let end = if parts[1].is_empty() {
277 file_size - 1
278 } else {
279 parts[1].parse::<u64>().ok()?
280 };
281
282 if start > end || end >= file_size {
283 None
284 } else {
285 Some((start, end))
286 }
287}
288
289pub async fn upload_with_progress<P: AsRef<Path>, F>(
293 mut payload: Multipart,
294 target_dir: P,
295 mut progress_cb: F
296) -> Result<Vec<String>, Error>
297where
298 F: FnMut(&str, u64, u64),
299{
300 let mut uploaded_files = Vec::new();
301 let base_path = target_dir.as_ref();
302
303 if !base_path.exists() {
304 tokio::fs::create_dir_all(base_path).await?;
305 }
306
307 while let Some(mut field) = payload.try_next().await? {
308 let filename = field
309 .content_disposition()
310 .expect("Sending File Failed!")
311 .get_filename()
312 .map_or_else(|| "unknown".to_string(), |f| f.to_string());
313
314 let filepath = base_path.join(&filename);
315 let mut file = File::create(&filepath).await?;
316 let mut writer = BufWriter::new(&mut file);
317 let mut total_bytes = 0u64;
318
319 while let Some(chunk) = field.next().await {
320 let data = chunk?;
321 writer.write_all(&data).await?;
322 total_bytes += data.len() as u64;
323 progress_cb(&filename, total_bytes, 0);
324 }
325
326 writer.flush().await?;
327 uploaded_files.push(filename);
328 }
329
330 Ok(uploaded_files)
331}
332
333pub async fn upload_streaming<P: AsRef<Path>>(payload: Multipart, target_dir: P) -> Result<Vec<String>, Error> {
335 let mut uploaded_files = Vec::new();
336 let base_path = target_dir.as_ref();
337
338 if !base_path.exists() {
339 tokio::fs::create_dir_all(base_path).await?;
340 }
341
342 let mut stream = payload;
343 while let Some(mut field) = stream.try_next().await? {
344 let filename = field
345 .content_disposition()
346 .and_then(|cd| cd.get_filename())
347 .unwrap_or("unknown")
348 .to_string();
349
350 let filepath = base_path.join(&filename);
351 let mut file = File::create(filepath).await?;
352
353 while let Some(chunk) = field.next().await {
354 file.write_all(&chunk?).await?;
355 }
356
357 uploaded_files.push(filename);
358 }
359
360 Ok(uploaded_files)
361}