warp_range/
lib.rs

1//! # warp-range
2//! 
3//! A Rust library for creating a warp filter for serving file content with range like mp3 audio or mp4 video.
4//! This warp filter can be used in a HTTP server based on warp. 
5//! 
6//! The content is served like streaming. If you view a movie served by this filter, you can seek through it even if the file is not completely downloaded.
7//!
8//! Here is an easy example to add range to an existing warp server:
9//! ``` 
10//! use hyper::{Body, Response};
11//! use warp::{Filter, Reply, fs::{File, dir}};
12//! use warp_range::{filter_range, get_range};
13//! 
14//! #[tokio::main]
15//! async fn main() {
16//!     let test_video = "/home/uwe/Videos/Drive.mkv";
17//!     
18//!     let port = 9860;
19//!     println!("Running test server on http://localhost:{}", port);
20//! 
21//!     let route_get_range = 
22//!         warp::path("getvideo")
23//!         .and(warp::path::end())
24//!         .and(filter_range())
25//!         .and_then(move |range_header| get_range(range_header, test_video, "video/mp4"))
26//! 
27//!     let route_static = dir(".");
28//!     
29//!     let routes = route_get_range
30//!         .or(route_static);
31//! 
32//!     warp::serve(routes)
33//!         .run(([127, 0, 0, 1], port))
34//!         .await;    
35//! }
36//! ``` 
37
38use async_stream::stream;
39use std::{
40    cmp::min, io::SeekFrom, num::ParseIntError
41};
42use tokio::io::{
43    AsyncReadExt, AsyncSeekExt
44};
45use warp::{
46    Filter, http::StatusCode, Rejection, http::HeaderValue, hyper::HeaderMap
47};
48
49/// This function filters and extracts the "Range"-Header
50pub fn filter_range() -> impl Filter<Extract = (Option<String>,), Error = Rejection> + Copy {
51    warp::header::optional::<String>("Range")
52}
53
54/// This function retrives the range of bytes requested by the web client
55pub async fn get_range(range_header: Option<String>, file: &str, content_type: &str) -> Result<impl warp::Reply, Rejection> {
56    internal_get_range(range_header, file, content_type, None).await.map_err(|e| {
57        println!("Error in get_range: {}", e.message);
58        warp::reject()
59    })
60}
61
62/// This function retrives the range of bytes requested by the web client. You can define a callback function for logging purpose or media access control
63pub async fn get_range_with_cb(range_header: Option<String>, file: &str, content_type: &str, progress: fn(size: u64)) -> Result<impl warp::Reply, Rejection> {
64    internal_get_range(range_header, file, content_type, Some(progress)).await.map_err(|e| {
65        println!("Error in get_range: {}", e.message);
66        warp::reject()
67    })
68}
69
70fn get_range_params(range: &Option<String>, size: u64)->Result<(u64, u64), Error> {
71    match range {
72        Some(range) => {
73            let range: Vec<String> = range
74                .replace("bytes=", "")
75                .split("-")
76                .filter_map(|n| if n.len() > 0 {Some(n.to_string())} else {None})
77                .collect();
78            let start = if range.len() > 0 { 
79                range[0].parse::<u64>()? 
80            } else { 
81                0 
82            };
83            let end = if range.len() > 1 {
84                range[1].parse::<u64>()?
85            } else {
86                size-1 
87            };
88            Ok((start, end))
89        },
90        None => Ok((0, size-1))
91    }
92}
93
94#[derive(Debug)]
95struct Error {
96    message: String
97}
98
99impl From<std::io::Error> for Error {
100    fn from(err: std::io::Error) -> Self {
101        Error { message: err.to_string() }
102    }
103}
104impl From<ParseIntError> for Error {
105    fn from(err: ParseIntError) -> Self {
106        Error { message: err.to_string() }
107    }
108}
109
110async fn internal_get_range(range_header: Option<String>, file: &str, content_type: &str, cb: Option<fn(u64)>) -> Result<impl warp::Reply, Error> {
111    let mut file = tokio::fs::File::open(file).await?;
112    let metadata = file.metadata().await?;
113    let size = metadata.len();
114    let (start_range, end_range) = get_range_params(&range_header, size)?;
115    let byte_count = end_range - start_range + 1;
116    file.seek(SeekFrom::Start(start_range)).await?;
117
118    let stream = stream! {
119        let bufsize = 16384;
120        let cycles = byte_count / bufsize as u64 + 1;
121        let mut sent_bytes: u64 = 0;
122        for _ in 0..cycles {
123            let mut buffer: Vec<u8> = vec![0; min(byte_count - sent_bytes, bufsize) as usize];
124            let bytes_read = file.read_exact(&mut buffer).await.unwrap();
125            sent_bytes += bytes_read as u64;
126            if let Some(cb) = cb { 
127                cb(sent_bytes);
128            } 
129            yield Ok(buffer) as Result<Vec<u8>, warp::http::Error>;
130        }
131    };
132    let body = hyper::Body::wrap_stream(stream);
133    let mut response = warp::reply::Response::new(body);
134    
135    let headers = response.headers_mut();
136    let mut header_map = HeaderMap::new();
137    header_map.insert("Content-Type", HeaderValue::from_str(content_type).unwrap());
138    header_map.insert("Accept-Ranges", HeaderValue::from_str("bytes").unwrap());
139    header_map.insert("Content-Range", HeaderValue::from_str(&format!("bytes {}-{}/{}", start_range, end_range, size)).unwrap());
140    header_map.insert("Content-Length", HeaderValue::from(byte_count));
141    headers.extend(header_map);
142
143    if range_header.is_some() {
144        *response.status_mut() = StatusCode::PARTIAL_CONTENT;
145    }
146    Ok (response)
147}