Skip to main content

tako/
file_stream.rs

1//! File streaming utilities for efficient HTTP file delivery.
2//!
3//! This module provides `FileStream` for streaming files over HTTP with support for
4//! range requests, content-length headers, and proper MIME type detection. It enables
5//! efficient delivery of large files without loading them entirely into memory, making
6//! it suitable for serving media files, downloads, and other binary content.
7//!
8//! # Examples
9//!
10//! ```rust,ignore
11//! use tako::file_stream::FileStream;
12//! use tako::responder::Responder;
13//!
14//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
15//! // Stream a file from disk
16//! let file_stream = FileStream::from_path("./assets/video.mp4").await?;
17//! let response = file_stream.into_response();
18//! # Ok(())
19//! # }
20//! ```
21
22#![cfg_attr(docsrs, doc(cfg(feature = "file-stream")))]
23
24#[cfg(not(feature = "compio"))]
25use std::io::SeekFrom;
26use std::path::Path;
27
28use anyhow::Result;
29use bytes::Bytes;
30use futures_util::TryStream;
31use futures_util::TryStreamExt;
32use http::StatusCode;
33use http_body::Frame;
34#[cfg(not(feature = "compio"))]
35use tokio::fs::File;
36#[cfg(not(feature = "compio"))]
37use tokio::io::AsyncReadExt;
38#[cfg(not(feature = "compio"))]
39use tokio::io::AsyncSeekExt;
40#[cfg(not(feature = "compio"))]
41use tokio_util::io::ReaderStream;
42
43use crate::body::TakoBody;
44use crate::responder::Responder;
45use crate::types::BoxError;
46use crate::types::Response;
47
48/// HTTP file stream with metadata support for efficient file delivery.
49///
50/// `FileStream` wraps any stream that produces bytes and associates it with optional
51/// metadata like filename and content size. This enables proper HTTP headers to be
52/// set for file downloads, including Content-Disposition for filename suggestions
53/// and Content-Length for known file sizes. The implementation supports both
54/// regular responses and HTTP range requests for partial content delivery.
55#[doc(alias = "file_stream")]
56#[doc(alias = "stream")]
57pub struct FileStream<S> {
58  /// The underlying byte stream
59  pub stream: S,
60  /// Optional filename for Content-Disposition header
61  pub file_name: Option<String>,
62  /// Optional content size for Content-Length header
63  pub content_size: Option<u64>,
64}
65
66impl<S> FileStream<S>
67where
68  S: TryStream + Send + 'static,
69  S::Ok: Into<Bytes>,
70  S::Error: Into<BoxError>,
71{
72  /// Creates a new file stream with the provided metadata.
73  pub fn new(stream: S, file_name: Option<String>, content_size: Option<u64>) -> Self {
74    Self {
75      stream,
76      file_name,
77      content_size,
78    }
79  }
80
81  /// Creates a file stream from a file system path with automatic metadata detection.
82  #[cfg(not(feature = "compio"))]
83  pub async fn from_path<P>(path: P) -> Result<FileStream<ReaderStream<File>>>
84  where
85    P: AsRef<Path>,
86  {
87    let file = File::open(&path).await?;
88    let mut content_size = None;
89    let mut file_name = None;
90
91    if let Ok(metadata) = file.metadata().await {
92      content_size = Some(metadata.len());
93    }
94
95    if let Some(os_name) = path.as_ref().file_name()
96      && let Some(name) = os_name.to_str()
97    {
98      file_name = Some(name.to_owned());
99    }
100
101    Ok(FileStream {
102      stream: ReaderStream::new(file),
103      file_name,
104      content_size,
105    })
106  }
107
108  /// Creates a file stream from a file system path with automatic metadata detection (compio variant).
109  #[cfg(feature = "compio")]
110  pub async fn from_path<P>(
111    path: P,
112  ) -> Result<
113    FileStream<
114      futures_util::stream::Once<futures_util::future::Ready<Result<Bytes, std::io::Error>>>,
115    >,
116  >
117  where
118    P: AsRef<Path>,
119  {
120    let data = compio::fs::read(&path).await?;
121    let content_size = Some(data.len() as u64);
122    let file_name = path
123      .as_ref()
124      .file_name()
125      .and_then(|n| n.to_str())
126      .map(|n| n.to_owned());
127
128    Ok(FileStream {
129      stream: futures_util::stream::once(futures_util::future::ready(Ok(Bytes::from(data)))),
130      file_name,
131      content_size,
132    })
133  }
134
135  /// Creates an HTTP 206 Partial Content response for range requests.
136  pub fn into_range_response(self, start: u64, end: u64, total_size: u64) -> Response {
137    let mut response = http::Response::builder()
138      .status(http::StatusCode::PARTIAL_CONTENT)
139      .header(
140        http::header::CONTENT_TYPE,
141        mime::APPLICATION_OCTET_STREAM.as_ref(),
142      )
143      .header(
144        http::header::CONTENT_RANGE,
145        format!("bytes {}-{}/{}", start, end, total_size),
146      )
147      .header(http::header::CONTENT_LENGTH, (end - start + 1).to_string());
148
149    if let Some(ref name) = self.file_name {
150      response = response.header(
151        http::header::CONTENT_DISPOSITION,
152        format!("attachment; filename=\"{}\"", name),
153      );
154    }
155
156    let body = TakoBody::from_try_stream(
157      self
158        .stream
159        .map_ok(|chunk| Frame::data(Into::<Bytes>::into(chunk)))
160        .map_err(Into::into),
161    );
162
163    response.body(body).unwrap_or_else(|e| {
164      (
165        http::StatusCode::INTERNAL_SERVER_ERROR,
166        format!("FileStream range error: {}", e),
167      )
168        .into_response()
169    })
170  }
171
172  /// Try to create a range response for a file stream.
173  #[cfg(not(feature = "compio"))]
174  pub async fn try_range_response<P>(path: P, start: u64, mut end: u64) -> Result<Response>
175  where
176    P: AsRef<Path>,
177  {
178    let mut file = File::open(path).await?;
179    let meta = file.metadata().await?;
180    let total_size = meta.len();
181
182    if end == 0 {
183      end = total_size - 1;
184    }
185
186    if start > total_size || start > end || end >= total_size {
187      return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range not satisfiable").into_response());
188    }
189
190    file.seek(SeekFrom::Start(start)).await?;
191    let stream = ReaderStream::new(file.take(end - start + 1));
192    Ok(FileStream::new(stream, None, None).into_range_response(start, end, total_size))
193  }
194
195  /// Try to create a range response for a file stream (compio variant).
196  #[cfg(feature = "compio")]
197  pub async fn try_range_response<P>(path: P, start: u64, mut end: u64) -> Result<Response>
198  where
199    P: AsRef<Path>,
200  {
201    let data = compio::fs::read(&path).await?;
202    let total_size = data.len() as u64;
203
204    if end == 0 {
205      end = total_size - 1;
206    }
207
208    if start > total_size || start > end || end >= total_size {
209      return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range not satisfiable").into_response());
210    }
211
212    let slice = Bytes::from(data[(start as usize)..=(end as usize)].to_vec());
213    let stream = futures_util::stream::once(futures_util::future::ready(
214      Ok::<_, std::io::Error>(slice),
215    ));
216    Ok(FileStream::new(stream, None, None).into_range_response(start, end, total_size))
217  }
218}
219
220impl<S> Responder for FileStream<S>
221where
222  S: TryStream + Send + 'static,
223  S::Ok: Into<Bytes>,
224  S::Error: Into<BoxError>,
225{
226  /// Converts the file stream into an HTTP response with appropriate headers.
227  fn into_response(self) -> Response {
228    let mut response = http::Response::builder()
229      .status(http::StatusCode::OK)
230      .header(
231        http::header::CONTENT_TYPE,
232        mime::APPLICATION_OCTET_STREAM.as_ref(),
233      );
234
235    if let Some(size) = self.content_size {
236      response = response.header(http::header::CONTENT_LENGTH, size.to_string());
237    }
238
239    if let Some(ref name) = self.file_name {
240      response = response.header(
241        http::header::CONTENT_DISPOSITION,
242        format!("attachment; filename=\"{}\"", name),
243      );
244    }
245
246    let body = TakoBody::from_try_stream(
247      self
248        .stream
249        .map_ok(|chunk| Frame::data(Into::<Bytes>::into(chunk)))
250        .map_err(Into::into),
251    );
252
253    response.body(body).unwrap_or_else(|e| {
254      (
255        http::StatusCode::INTERNAL_SERVER_ERROR,
256        format!("FileStream error: {}", e),
257      )
258        .into_response()
259    })
260  }
261}