1use async_stream::stream;
39use std::{
40 cmp::min, io::SeekFrom, num::ParseIntError
41};
42use tokio::io::{
43 AsyncReadExt, AsyncSeekExt
44};
45use warp::{
46 Filter, http::StatusCode, Rejection, http::HeaderValue, hyper::HeaderMap
47};
48
49pub fn filter_range() -> impl Filter<Extract = (Option<String>,), Error = Rejection> + Copy {
51 warp::header::optional::<String>("Range")
52}
53
54pub async fn get_range(range_header: Option<String>, file: &str, content_type: &str) -> Result<impl warp::Reply, Rejection> {
56 internal_get_range(range_header, file, content_type, None).await.map_err(|e| {
57 println!("Error in get_range: {}", e.message);
58 warp::reject()
59 })
60}
61
62pub async fn get_range_with_cb(range_header: Option<String>, file: &str, content_type: &str, progress: fn(size: u64)) -> Result<impl warp::Reply, Rejection> {
64 internal_get_range(range_header, file, content_type, Some(progress)).await.map_err(|e| {
65 println!("Error in get_range: {}", e.message);
66 warp::reject()
67 })
68}
69
70fn get_range_params(range: &Option<String>, size: u64)->Result<(u64, u64), Error> {
71 match range {
72 Some(range) => {
73 let range: Vec<String> = range
74 .replace("bytes=", "")
75 .split("-")
76 .filter_map(|n| if n.len() > 0 {Some(n.to_string())} else {None})
77 .collect();
78 let start = if range.len() > 0 {
79 range[0].parse::<u64>()?
80 } else {
81 0
82 };
83 let end = if range.len() > 1 {
84 range[1].parse::<u64>()?
85 } else {
86 size-1
87 };
88 Ok((start, end))
89 },
90 None => Ok((0, size-1))
91 }
92}
93
94#[derive(Debug)]
95struct Error {
96 message: String
97}
98
99impl From<std::io::Error> for Error {
100 fn from(err: std::io::Error) -> Self {
101 Error { message: err.to_string() }
102 }
103}
104impl From<ParseIntError> for Error {
105 fn from(err: ParseIntError) -> Self {
106 Error { message: err.to_string() }
107 }
108}
109
110async fn internal_get_range(range_header: Option<String>, file: &str, content_type: &str, cb: Option<fn(u64)>) -> Result<impl warp::Reply, Error> {
111 let mut file = tokio::fs::File::open(file).await?;
112 let metadata = file.metadata().await?;
113 let size = metadata.len();
114 let (start_range, end_range) = get_range_params(&range_header, size)?;
115 let byte_count = end_range - start_range + 1;
116 file.seek(SeekFrom::Start(start_range)).await?;
117
118 let stream = stream! {
119 let bufsize = 16384;
120 let cycles = byte_count / bufsize as u64 + 1;
121 let mut sent_bytes: u64 = 0;
122 for _ in 0..cycles {
123 let mut buffer: Vec<u8> = vec![0; min(byte_count - sent_bytes, bufsize) as usize];
124 let bytes_read = file.read_exact(&mut buffer).await.unwrap();
125 sent_bytes += bytes_read as u64;
126 if let Some(cb) = cb {
127 cb(sent_bytes);
128 }
129 yield Ok(buffer) as Result<Vec<u8>, warp::http::Error>;
130 }
131 };
132 let body = hyper::Body::wrap_stream(stream);
133 let mut response = warp::reply::Response::new(body);
134
135 let headers = response.headers_mut();
136 let mut header_map = HeaderMap::new();
137 header_map.insert("Content-Type", HeaderValue::from_str(content_type).unwrap());
138 header_map.insert("Accept-Ranges", HeaderValue::from_str("bytes").unwrap());
139 header_map.insert("Content-Range", HeaderValue::from_str(&format!("bytes {}-{}/{}", start_range, end_range, size)).unwrap());
140 header_map.insert("Content-Length", HeaderValue::from(byte_count));
141 headers.extend(header_map);
142
143 if range_header.is_some() {
144 *response.status_mut() = StatusCode::PARTIAL_CONTENT;
145 }
146 Ok (response)
147}