use std::cmp::min;
use std::path::Path;
use actix_http::header;
use actix_web::HttpRequest;
use ant_core::data::{DataChunk, XorName};
use bytes::{BufMut, BytesMut};
use chunk_streamer::chunk_receiver::ChunkReceiver;
use chunk_streamer::chunk_streamer::ChunkStreamer;
use futures_util::StreamExt;
use hex::ToHex;
use log::{debug, info};
use mockall::mock;
use mockall_double::double;
#[double]
use crate::client::ChunkCachingClient;
use crate::error::{GetError, GetStreamError};
use crate::error::chunk_error::ChunkError;
use crate::service::resolver_service::ResolvedAddress;
#[derive(Debug, Clone)]
pub struct RangeProps {
range_from: Option<u64>,
range_to: Option<u64>,
content_length: u64,
extension: String,
}
impl RangeProps {
pub fn new(range_from: Option<u64>, range_to: Option<u64>, content_length: u64, extension: String) -> Self {
Self { range_from, range_to, content_length, extension }
}
pub fn is_range(&self) -> bool {
self.range_from.is_some() && self.range_to.is_some()
}
pub fn range_from(&self) -> Option<u64> {
self.range_from
}
pub fn range_to(&self) -> Option<u64> {
self.range_to
}
pub fn content_length(&self) -> u64 {
self.content_length
}
pub fn extension(&self) -> &str {
&self.extension
}
}
pub struct Range {
pub start: u64,
pub end: u64,
}
#[derive(Clone)]
pub struct FileService {
chunk_caching_client: ChunkCachingClient,
download_threads: usize,
}
mock! {
pub FileService {
pub fn new(chunk_caching_client: ChunkCachingClient, download_threads: usize) -> Self;
pub async fn get_data(&self, request: &HttpRequest, resolved_address: &ResolvedAddress) -> Result<(ChunkReceiver, RangeProps), ChunkError>;
pub async fn download_data_request(&self, request: &HttpRequest, path_str: String, xor_name: XorName, offset_modifier: u64, size_modifier: u64) -> Result<(ChunkReceiver, RangeProps), ChunkError>;
pub async fn download_data_bytes(&self, xor_name: XorName, range_from: u64, size_modifier: u64) -> Result<BytesMut, ChunkError>;
}
impl Clone for FileService {
fn clone(&self) -> Self;
}
}
impl FileService {
pub fn new(chunk_caching_client: ChunkCachingClient, download_threads: usize) -> Self {
FileService { chunk_caching_client, download_threads }
}
pub async fn get_data(&self, request: &HttpRequest, resolved_address: &ResolvedAddress) -> Result<(ChunkReceiver, RangeProps), ChunkError> {
self.download_data_request(request, resolved_address.file_path.clone(), resolved_address.xor_name, 0, 0).await
}
pub async fn download_data_request(
&self,
request: &HttpRequest,
path_str: String,
xor_name: XorName,
offset_modifier: u64,
size_modifier: u64,
) -> Result<(ChunkReceiver, RangeProps), ChunkError> {
let data_map_chunk: DataChunk = self.chunk_caching_client.chunk_get_internal(&xor_name).await?;
let chunk_streamer = ChunkStreamer::new(xor_name.encode_hex(), data_map_chunk.content, self.chunk_caching_client.clone(), self.download_threads);
let content_length = self.get_content_length(&chunk_streamer, size_modifier).await;
let (range_from, range_to, range_length, is_range_request) = self.get_range(Some(&request), offset_modifier, content_length);
if is_range_request && range_length == 0 {
return Err(GetStreamError::BadRange(format!("bad range length: [{}]", range_length)).into());
}
let chunk_receiver = match chunk_streamer.open(range_from, range_to).await {
Ok(chunk_receiver) => chunk_receiver,
Err(e) => return Err(GetStreamError::BadReceiver(format!("failed to open chunk stream: {}", e)).into()),
};
let extension = Path::new(&path_str).extension().unwrap_or_default().to_str().unwrap_or_default().to_string();
let (maybe_response_range_from, maybe_response_range_to) =
self.get_response_range(range_from, range_to, is_range_request, offset_modifier);
let xor_name_hex: String = xor_name.encode_hex();
info!("streaming item [{}] at addr [{}], range_from: [{}], range_to: [{}], offset_modifier: [{}], size_modifier: [{}], content_length: [{}], range_length: [{}], response_range_from: [{}], response_range_to: [{}]",
path_str, xor_name_hex, range_from, range_to, offset_modifier, size_modifier, content_length, range_length, maybe_response_range_from.unwrap_or(0), maybe_response_range_to.unwrap_or(0));
Ok((chunk_receiver, RangeProps::new(maybe_response_range_from, maybe_response_range_to, content_length, extension)))
}
async fn get_content_length(&self, chunk_streamer: &ChunkStreamer<ChunkCachingClient>, size_modifier: u64) -> u64 {
if size_modifier > 0 {
size_modifier
} else {
let total_size = chunk_streamer.get_stream_size().await;
u64::try_from(total_size).unwrap_or(0)
}
}
pub fn get_range(&self, request: Option<&HttpRequest>, offset_modifier: u64, size_modifier: u64) -> (u64, u64, u64, bool) {
debug!("get_range - offset_modifier [{}], size_modifier [{}]", offset_modifier, size_modifier);
let length = if size_modifier > 0 { size_modifier - 1 } else { 0 }; let range_to= offset_modifier + length;
if request.is_some() && let Some(range) = request.unwrap().headers().get(header::RANGE) {
let range_value = range.to_str()
.unwrap_or("")
.split_once("=")
.unwrap_or(("", "")).1;
if let Some((range_from_str, range_to_str)) = range_value.split_once("-") {
let range_to_header = min(range_to_str.parse::<u64>().unwrap_or(length), length);
let range_to_override = offset_modifier + range_to_header;
let range_from_header = min(range_from_str.parse::<u64>().unwrap_or(0), range_to_header);
let range_from_override = offset_modifier + range_from_header;
let range_length = range_to_override - range_from_override;
(range_from_override, range_to_override, range_length, true)
} else {
(offset_modifier, range_to, length, true)
}
} else {
(offset_modifier, range_to, length, false)
}
}
fn get_response_range(&self, range_from: u64, range_to: u64, is_range_request: bool, offset_modifier: u64) -> (Option<u64>, Option<u64>) {
if is_range_request {
(Some(range_from - offset_modifier), Some(range_to - offset_modifier))
} else {
(None, None)
}
}
pub async fn download_data_bytes(&self, xor_name: XorName, range_from: u64, size_modifier: u64) -> Result<BytesMut, ChunkError> {
match self.download_data(xor_name, range_from, size_modifier).await {
Ok(mut chunk_receiver) => {
let mut buf = BytesMut::new();
let mut has_data = true;
while has_data {
match chunk_receiver.next().await {
Some(item) => match item {
Ok(bytes) => buf.put(bytes),
Err(e) => {
return Err(ChunkError::GetError(GetError::RecordNotFound(e.to_string())));
},
},
None => has_data = false
};
}
Ok(buf)
}
Err(e) => Err(e)
}
}
async fn download_data(&self, xor_name: XorName, range_from: u64, size_modifier: u64) -> Result<ChunkReceiver, ChunkError> {
let xor_name_hex: String = xor_name.encode_hex();
debug!("download data xor_name: [{}], offset: [{}], size: [{}]", xor_name_hex, range_from, size_modifier);
let data_map_chunk: DataChunk = match self.chunk_caching_client.chunk_get_internal(&xor_name).await {
Ok(chunk) => chunk,
Err(e) => return Err(e),
};
let chunk_streamer = ChunkStreamer::new(xor_name_hex, data_map_chunk.content, self.chunk_caching_client.clone(), self.download_threads);
let content_length = self.get_content_length(&chunk_streamer, size_modifier).await;
let (range_from, range_to, _, _) = self.get_range(None, range_from, content_length);
match chunk_streamer.open(range_from, range_to).await {
Ok(chunk_receiver) => Ok(chunk_receiver),
Err(e) => Err(GetStreamError::BadReceiver(format!("failed to open chunk stream: {}", e)).into()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use actix_web::test::TestRequest;
use ant_core::data::DataChunk;
use crate::client::MockChunkCachingClient;
fn create_test_service(mock_chunk_caching_client: MockChunkCachingClient) -> FileService {
let mut mock_chunk_caching_client = mock_chunk_caching_client;
mock_chunk_caching_client.expect_clone()
.returning(|| {
let mut m = MockChunkCachingClient::default();
m.expect_clone().returning(MockChunkCachingClient::default);
m
});
FileService {
chunk_caching_client: mock_chunk_caching_client,
download_threads: 8,
}
}
#[test]
fn test_range_props() {
let props = RangeProps::new(Some(0), Some(100), 200, "txt".to_string());
assert!(props.is_range());
assert_eq!(props.range_from(), Some(0));
assert_eq!(props.range_to(), Some(100));
assert_eq!(props.content_length(), 200);
assert_eq!(props.extension(), "txt");
let props_no_range = RangeProps::new(None, None, 200, "txt".to_string());
assert!(!props_no_range.is_range());
}
#[actix_web::test]
async fn test_get_range_no_header() {
let service = create_test_service(MockChunkCachingClient::default());
let (start, end, length, is_range) = service.get_range(None, 0, 100);
assert_eq!(start, 0);
assert_eq!(end, 99);
assert_eq!(length, 99); assert!(!is_range);
}
#[actix_web::test]
async fn test_get_range_with_header() {
let service = create_test_service(MockChunkCachingClient::default());
let req = TestRequest::default().insert_header((header::RANGE, "bytes=10-50")).to_http_request();
let (start, end, length, is_range) = service.get_range(Some(&req), 0, 100);
assert_eq!(start, 10);
assert_eq!(end, 50);
assert_eq!(length, 40);
assert!(is_range);
}
#[actix_web::test]
async fn test_get_range_with_header_open_end() {
let service = create_test_service(MockChunkCachingClient::default());
let req = TestRequest::default().insert_header((header::RANGE, "bytes=10-")).to_http_request();
let (start, end, length, is_range) = service.get_range(Some(&req), 0, 100);
assert_eq!(start, 10);
assert_eq!(end, 99);
assert_eq!(length, 89);
assert!(is_range);
}
#[actix_web::test]
async fn test_get_range_with_header_end_over_length() {
let service = create_test_service(MockChunkCachingClient::default());
let req = TestRequest::default().insert_header((header::RANGE, "bytes=10-120")).to_http_request();
let (start, end, length, is_range) = service.get_range(Some(&req), 0, 100);
assert_eq!(start, 10);
assert_eq!(end, 99);
assert_eq!(length, 89);
assert!(is_range);
}
#[actix_web::test]
async fn test_get_response_range() {
let service = create_test_service(MockChunkCachingClient::default());
let (start, end) = service.get_response_range(10, 50, true, 0);
assert_eq!(start, Some(10));
assert_eq!(end, Some(50));
let (start, end) = service.get_response_range(10, 50, false, 0);
assert_eq!(start, None);
assert_eq!(end, None);
let (start, end) = service.get_response_range(15, 55, true, 5);
assert_eq!(start, Some(10));
assert_eq!(end, Some(50));
}
#[actix_web::test]
async fn test_download_data_bytes_success() {
let xor_name = XorName::default();
let data = vec![1, 2, 3, 4, 5];
let chunk = DataChunk::from_content(data.clone().into());
let mut mock_chunk_client_for_service = MockChunkCachingClient::default();
mock_chunk_client_for_service.expect_chunk_get_internal()
.with(mockall::predicate::eq(xor_name))
.times(1)
.returning(move |_| Ok(chunk.clone()));
let data_for_clone_2 = data.clone();
mock_chunk_client_for_service.expect_clone()
.times(1..)
.returning(move || {
let mut mock = MockChunkCachingClient::default();
let data_cloned = data_for_clone_2.clone();
mock.expect_chunk_get()
.returning(move |_| Ok(Some(DataChunk::from_content(data_cloned.clone().into()))));
mock.expect_clone()
.returning(|| MockChunkCachingClient::default());
mock
});
let service = FileService {
chunk_caching_client: mock_chunk_client_for_service,
download_threads: 8,
};
let result = service.download_data_bytes(xor_name, 0, 5).await;
assert!(result.is_ok());
let bytes = result.unwrap();
assert_eq!(bytes.as_ref(), &data);
}
#[actix_web::test]
async fn test_get_data_success() {
let xor_name = XorName::default();
let data = vec![1, 2, 3, 4, 5];
let chunk = DataChunk::from_content(data.clone().into());
let mut mock_chunk_client = MockChunkCachingClient::default();
mock_chunk_client.expect_chunk_get_internal()
.with(mockall::predicate::eq(xor_name))
.times(1)
.returning(move |_| Ok(chunk.clone()));
let data_for_clone = data.clone();
mock_chunk_client.expect_clone()
.times(1..)
.returning(move || {
let mut mock = MockChunkCachingClient::default();
let data_cloned = data_for_clone.clone();
mock.expect_chunk_get()
.returning(move |_| Ok(Some(DataChunk::from_content(data_cloned.clone().into()))));
mock.expect_clone()
.returning(|| MockChunkCachingClient::default());
mock
});
let service = FileService {
chunk_caching_client: mock_chunk_client,
download_threads: 8,
};
let req = TestRequest::default().to_http_request();
let resolved_address = ResolvedAddress {
is_found: true,
xor_name,
file_path: "test.txt".to_string(),
is_resolved_from_mutable: false,
is_modified: false,
is_allowed: true,
archive: None,
ttl: 0,
};
let result = service.get_data(&req, &resolved_address).await;
assert!(result.is_ok());
let (mut receiver, props) = result.unwrap();
assert_eq!(props.extension(), "txt");
let mut received_data = Vec::new();
while let Some(res) = receiver.next().await {
received_data.extend_from_slice(&res.unwrap());
}
assert_eq!(received_data, data);
}
#[actix_web::test]
async fn test_get_data_range_success() {
let xor_name = XorName::default();
let data = vec![1, 2, 3, 4, 5];
let chunk = DataChunk::from_content(data.clone().into());
let mut mock_chunk_client = MockChunkCachingClient::default();
mock_chunk_client.expect_chunk_get_internal()
.with(mockall::predicate::eq(xor_name))
.times(1)
.returning(move |_| Ok(chunk.clone()));
let data_for_clone = data.clone();
mock_chunk_client.expect_clone()
.times(1..)
.returning(move || {
let mut mock = MockChunkCachingClient::default();
let data_cloned = data_for_clone.clone();
mock.expect_chunk_get()
.returning(move |_| Ok(Some(DataChunk::from_content(data_cloned.clone().into()))));
mock.expect_clone()
.returning(|| MockChunkCachingClient::default());
mock
});
let service = FileService {
chunk_caching_client: mock_chunk_client,
download_threads: 8,
};
let req = TestRequest::default()
.insert_header((header::RANGE, "bytes=1-3"))
.to_http_request();
let result = service.download_data_request(&req, "test.txt".to_string(), xor_name, 0, 5).await;
assert!(result.is_ok());
let (mut receiver, props) = result.unwrap();
assert_eq!(props.range_from(), Some(1));
assert_eq!(props.range_to(), Some(3));
assert_eq!(props.content_length(), 5);
let mut received_data = Vec::new();
while let Some(res) = receiver.next().await {
received_data.extend_from_slice(&res.unwrap());
}
assert!(!received_data.is_empty());
}
}