1#![cfg_attr(docsrs, doc(cfg(feature = "file-stream")))]
23
24#[cfg(not(feature = "compio"))]
25use std::io::SeekFrom;
26use std::path::Path;
27
28use anyhow::Result;
29use bytes::Bytes;
30use futures_util::TryStream;
31use futures_util::TryStreamExt;
32use http::StatusCode;
33use http_body::Frame;
34#[cfg(not(feature = "compio"))]
35use tokio::fs::File;
36#[cfg(not(feature = "compio"))]
37use tokio::io::AsyncReadExt;
38#[cfg(not(feature = "compio"))]
39use tokio::io::AsyncSeekExt;
40#[cfg(not(feature = "compio"))]
41use tokio_util::io::ReaderStream;
42
43use crate::body::TakoBody;
44use crate::responder::Responder;
45use crate::types::BoxError;
46use crate::types::Response;
47
48#[doc(alias = "file_stream")]
56#[doc(alias = "stream")]
57pub struct FileStream<S> {
58 pub stream: S,
60 pub file_name: Option<String>,
62 pub content_size: Option<u64>,
64}
65
66impl<S> FileStream<S>
67where
68 S: TryStream + Send + 'static,
69 S::Ok: Into<Bytes>,
70 S::Error: Into<BoxError>,
71{
72 pub fn new(stream: S, file_name: Option<String>, content_size: Option<u64>) -> Self {
74 Self {
75 stream,
76 file_name,
77 content_size,
78 }
79 }
80
81 #[cfg(not(feature = "compio"))]
83 pub async fn from_path<P>(path: P) -> Result<FileStream<ReaderStream<File>>>
84 where
85 P: AsRef<Path>,
86 {
87 let file = File::open(&path).await?;
88 let mut content_size = None;
89 let mut file_name = None;
90
91 if let Ok(metadata) = file.metadata().await {
92 content_size = Some(metadata.len());
93 }
94
95 if let Some(os_name) = path.as_ref().file_name()
96 && let Some(name) = os_name.to_str()
97 {
98 file_name = Some(name.to_owned());
99 }
100
101 Ok(FileStream {
102 stream: ReaderStream::new(file),
103 file_name,
104 content_size,
105 })
106 }
107
108 #[cfg(feature = "compio")]
110 pub async fn from_path<P>(
111 path: P,
112 ) -> Result<
113 FileStream<
114 futures_util::stream::Once<futures_util::future::Ready<Result<Bytes, std::io::Error>>>,
115 >,
116 >
117 where
118 P: AsRef<Path>,
119 {
120 let data = compio::fs::read(&path).await?;
121 let content_size = Some(data.len() as u64);
122 let file_name = path
123 .as_ref()
124 .file_name()
125 .and_then(|n| n.to_str())
126 .map(|n| n.to_owned());
127
128 Ok(FileStream {
129 stream: futures_util::stream::once(futures_util::future::ready(Ok(Bytes::from(data)))),
130 file_name,
131 content_size,
132 })
133 }
134
135 pub fn into_range_response(self, start: u64, end: u64, total_size: u64) -> Response {
137 let mut response = http::Response::builder()
138 .status(http::StatusCode::PARTIAL_CONTENT)
139 .header(
140 http::header::CONTENT_TYPE,
141 mime::APPLICATION_OCTET_STREAM.as_ref(),
142 )
143 .header(
144 http::header::CONTENT_RANGE,
145 format!("bytes {}-{}/{}", start, end, total_size),
146 )
147 .header(http::header::CONTENT_LENGTH, (end - start + 1).to_string());
148
149 if let Some(ref name) = self.file_name {
150 response = response.header(
151 http::header::CONTENT_DISPOSITION,
152 format!("attachment; filename=\"{}\"", name),
153 );
154 }
155
156 let body = TakoBody::from_try_stream(
157 self
158 .stream
159 .map_ok(|chunk| Frame::data(Into::<Bytes>::into(chunk)))
160 .map_err(Into::into),
161 );
162
163 response.body(body).unwrap_or_else(|e| {
164 (
165 http::StatusCode::INTERNAL_SERVER_ERROR,
166 format!("FileStream range error: {}", e),
167 )
168 .into_response()
169 })
170 }
171
172 #[cfg(not(feature = "compio"))]
174 pub async fn try_range_response<P>(path: P, start: u64, mut end: u64) -> Result<Response>
175 where
176 P: AsRef<Path>,
177 {
178 let mut file = File::open(path).await?;
179 let meta = file.metadata().await?;
180 let total_size = meta.len();
181
182 if end == 0 {
183 end = total_size - 1;
184 }
185
186 if start > total_size || start > end || end >= total_size {
187 return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range not satisfiable").into_response());
188 }
189
190 file.seek(SeekFrom::Start(start)).await?;
191 let stream = ReaderStream::new(file.take(end - start + 1));
192 Ok(FileStream::new(stream, None, None).into_range_response(start, end, total_size))
193 }
194
195 #[cfg(feature = "compio")]
197 pub async fn try_range_response<P>(path: P, start: u64, mut end: u64) -> Result<Response>
198 where
199 P: AsRef<Path>,
200 {
201 let data = compio::fs::read(&path).await?;
202 let total_size = data.len() as u64;
203
204 if end == 0 {
205 end = total_size - 1;
206 }
207
208 if start > total_size || start > end || end >= total_size {
209 return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range not satisfiable").into_response());
210 }
211
212 let slice = Bytes::from(data[(start as usize)..=(end as usize)].to_vec());
213 let stream = futures_util::stream::once(futures_util::future::ready(
214 Ok::<_, std::io::Error>(slice),
215 ));
216 Ok(FileStream::new(stream, None, None).into_range_response(start, end, total_size))
217 }
218}
219
220impl<S> Responder for FileStream<S>
221where
222 S: TryStream + Send + 'static,
223 S::Ok: Into<Bytes>,
224 S::Error: Into<BoxError>,
225{
226 fn into_response(self) -> Response {
228 let mut response = http::Response::builder()
229 .status(http::StatusCode::OK)
230 .header(
231 http::header::CONTENT_TYPE,
232 mime::APPLICATION_OCTET_STREAM.as_ref(),
233 );
234
235 if let Some(size) = self.content_size {
236 response = response.header(http::header::CONTENT_LENGTH, size.to_string());
237 }
238
239 if let Some(ref name) = self.file_name {
240 response = response.header(
241 http::header::CONTENT_DISPOSITION,
242 format!("attachment; filename=\"{}\"", name),
243 );
244 }
245
246 let body = TakoBody::from_try_stream(
247 self
248 .stream
249 .map_ok(|chunk| Frame::data(Into::<Bytes>::into(chunk)))
250 .map_err(Into::into),
251 );
252
253 response.body(body).unwrap_or_else(|e| {
254 (
255 http::StatusCode::INTERNAL_SERVER_ERROR,
256 format!("FileStream error: {}", e),
257 )
258 .into_response()
259 })
260 }
261}