tako-rs 1.1.2

Multi-transport Rust framework for modern network services.
Documentation
//! File streaming utilities for efficient HTTP file delivery.
//!
//! This module provides `FileStream` for streaming files over HTTP with support for
//! range requests, content-length headers, and proper MIME type detection. It enables
//! efficient delivery of large files without loading them entirely into memory, making
//! it suitable for serving media files, downloads, and other binary content.
//!
//! # Examples
//!
//! ```rust,ignore
//! use tako::file_stream::FileStream;
//! use tako::responder::Responder;
//!
//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
//! // Stream a file from disk
//! let file_stream = FileStream::from_path("./assets/video.mp4").await?;
//! let response = file_stream.into_response();
//! # Ok(())
//! # }
//! ```

#![cfg_attr(docsrs, doc(cfg(feature = "file-stream")))]

#[cfg(not(feature = "compio"))]
use std::io::SeekFrom;
use std::path::Path;

use anyhow::Result;
use bytes::Bytes;
use futures_util::TryStream;
use futures_util::TryStreamExt;
use http::StatusCode;
use http_body::Frame;
#[cfg(not(feature = "compio"))]
use tokio::fs::File;
#[cfg(not(feature = "compio"))]
use tokio::io::AsyncReadExt;
#[cfg(not(feature = "compio"))]
use tokio::io::AsyncSeekExt;
#[cfg(not(feature = "compio"))]
use tokio_util::io::ReaderStream;

use crate::body::TakoBody;
use crate::responder::Responder;
use crate::types::BoxError;
use crate::types::Response;

/// HTTP file stream with metadata support for efficient file delivery.
///
/// `FileStream` wraps any stream that produces bytes and associates it with optional
/// metadata like filename and content size. This enables proper HTTP headers to be
/// set for file downloads, including Content-Disposition for filename suggestions
/// and Content-Length for known file sizes. The implementation supports both
/// regular responses and HTTP range requests for partial content delivery.
#[doc(alias = "file_stream")]
#[doc(alias = "stream")]
pub struct FileStream<S> {
  /// The underlying byte stream
  pub stream: S,
  /// Optional filename for Content-Disposition header
  pub file_name: Option<String>,
  /// Optional content size for Content-Length header
  pub content_size: Option<u64>,
}

impl<S> FileStream<S>
where
  S: TryStream + Send + 'static,
  S::Ok: Into<Bytes>,
  S::Error: Into<BoxError>,
{
  /// Creates a new file stream with the provided metadata.
  pub fn new(stream: S, file_name: Option<String>, content_size: Option<u64>) -> Self {
    Self {
      stream,
      file_name,
      content_size,
    }
  }

  /// Creates a file stream from a file system path with automatic metadata detection.
  #[cfg(not(feature = "compio"))]
  pub async fn from_path<P>(path: P) -> Result<FileStream<ReaderStream<File>>>
  where
    P: AsRef<Path>,
  {
    let file = File::open(&path).await?;
    let mut content_size = None;
    let mut file_name = None;

    if let Ok(metadata) = file.metadata().await {
      content_size = Some(metadata.len());
    }

    if let Some(os_name) = path.as_ref().file_name()
      && let Some(name) = os_name.to_str()
    {
      file_name = Some(name.to_owned());
    }

    Ok(FileStream {
      stream: ReaderStream::new(file),
      file_name,
      content_size,
    })
  }

  /// Creates a file stream from a file system path with automatic metadata detection (compio variant).
  #[cfg(feature = "compio")]
  pub async fn from_path<P>(
    path: P,
  ) -> Result<
    FileStream<
      futures_util::stream::Once<futures_util::future::Ready<Result<Bytes, std::io::Error>>>,
    >,
  >
  where
    P: AsRef<Path>,
  {
    let data = compio::fs::read(&path).await?;
    let content_size = Some(data.len() as u64);
    let file_name = path
      .as_ref()
      .file_name()
      .and_then(|n| n.to_str())
      .map(|n| n.to_owned());

    Ok(FileStream {
      stream: futures_util::stream::once(futures_util::future::ready(Ok(Bytes::from(data)))),
      file_name,
      content_size,
    })
  }

  /// Creates an HTTP 206 Partial Content response for range requests.
  pub fn into_range_response(self, start: u64, end: u64, total_size: u64) -> Response {
    let mut response = http::Response::builder()
      .status(http::StatusCode::PARTIAL_CONTENT)
      .header(
        http::header::CONTENT_TYPE,
        mime::APPLICATION_OCTET_STREAM.as_ref(),
      )
      .header(
        http::header::CONTENT_RANGE,
        format!("bytes {}-{}/{}", start, end, total_size),
      )
      .header(http::header::CONTENT_LENGTH, (end - start + 1).to_string());

    if let Some(ref name) = self.file_name {
      response = response.header(
        http::header::CONTENT_DISPOSITION,
        format!("attachment; filename=\"{}\"", name),
      );
    }

    let body = TakoBody::from_try_stream(
      self
        .stream
        .map_ok(|chunk| Frame::data(Into::<Bytes>::into(chunk)))
        .map_err(Into::into),
    );

    response.body(body).unwrap_or_else(|e| {
      (
        http::StatusCode::INTERNAL_SERVER_ERROR,
        format!("FileStream range error: {}", e),
      )
        .into_response()
    })
  }

  /// Try to create a range response for a file stream.
  #[cfg(not(feature = "compio"))]
  pub async fn try_range_response<P>(path: P, start: u64, mut end: u64) -> Result<Response>
  where
    P: AsRef<Path>,
  {
    let mut file = File::open(path).await?;
    let meta = file.metadata().await?;
    let total_size = meta.len();

    if end == 0 {
      end = total_size - 1;
    }

    if start > total_size || start > end || end >= total_size {
      return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range not satisfiable").into_response());
    }

    file.seek(SeekFrom::Start(start)).await?;
    let stream = ReaderStream::new(file.take(end - start + 1));
    Ok(FileStream::new(stream, None, None).into_range_response(start, end, total_size))
  }

  /// Try to create a range response for a file stream (compio variant).
  #[cfg(feature = "compio")]
  pub async fn try_range_response<P>(path: P, start: u64, mut end: u64) -> Result<Response>
  where
    P: AsRef<Path>,
  {
    let data = compio::fs::read(&path).await?;
    let total_size = data.len() as u64;

    if end == 0 {
      end = total_size - 1;
    }

    if start > total_size || start > end || end >= total_size {
      return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range not satisfiable").into_response());
    }

    let slice = Bytes::from(data[(start as usize)..=(end as usize)].to_vec());
    let stream = futures_util::stream::once(futures_util::future::ready(
      Ok::<_, std::io::Error>(slice),
    ));
    Ok(FileStream::new(stream, None, None).into_range_response(start, end, total_size))
  }
}

impl<S> Responder for FileStream<S>
where
  S: TryStream + Send + 'static,
  S::Ok: Into<Bytes>,
  S::Error: Into<BoxError>,
{
  /// Converts the file stream into an HTTP response with appropriate headers.
  fn into_response(self) -> Response {
    let mut response = http::Response::builder()
      .status(http::StatusCode::OK)
      .header(
        http::header::CONTENT_TYPE,
        mime::APPLICATION_OCTET_STREAM.as_ref(),
      );

    if let Some(size) = self.content_size {
      response = response.header(http::header::CONTENT_LENGTH, size.to_string());
    }

    if let Some(ref name) = self.file_name {
      response = response.header(
        http::header::CONTENT_DISPOSITION,
        format!("attachment; filename=\"{}\"", name),
      );
    }

    let body = TakoBody::from_try_stream(
      self
        .stream
        .map_ok(|chunk| Frame::data(Into::<Bytes>::into(chunk)))
        .map_err(Into::into),
    );

    response.body(body).unwrap_or_else(|e| {
      (
        http::StatusCode::INTERNAL_SERVER_ERROR,
        format!("FileStream error: {}", e),
      )
        .into_response()
    })
  }
}