use std::fmt::Debug;
use std::io::Read;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use std::{
fs::{File, OpenOptions},
io::{Seek, SeekFrom, Write},
path::PathBuf,
};
use async_curl::dep::curl::easy::{Handler, ReadError, WriteError};
use derive_deref_rs::Deref;
use http::{HeaderMap, HeaderName, HeaderValue};
use log::trace;
use tokio::sync::mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender};
#[derive(Clone, Debug)]
pub struct TransferSpeed(f64);
impl TransferSpeed {
pub fn as_bytes_per_sec(&self) -> u64 {
self.0 as u64
}
}
impl From<u64> for TransferSpeed {
fn from(value: u64) -> Self {
Self(value as f64)
}
}
impl From<usize> for TransferSpeed {
fn from(value: usize) -> Self {
Self(value as f64)
}
}
impl From<i32> for TransferSpeed {
fn from(value: i32) -> Self {
Self(value as f64)
}
}
impl From<i64> for TransferSpeed {
fn from(value: i64) -> Self {
Self(value as f64)
}
}
impl From<f64> for TransferSpeed {
fn from(value: f64) -> Self {
Self(value)
}
}
#[derive(Deref, Clone, Debug)]
pub struct AbortPerform {
abort: Arc<Mutex<bool>>,
}
impl AbortPerform {
pub fn new() -> Self {
Self {
abort: Arc::new(Mutex::new(false)),
}
}
}
impl Default for AbortPerform {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug)]
pub struct FileInfo {
pub path: PathBuf,
send_speed_info: Option<Sender<TransferSpeed>>,
bytes_transferred: usize,
transfer_started: Instant,
transfer_speed: TransferSpeed,
abort: Option<AbortPerform>,
}
impl FileInfo {
pub fn path(path: PathBuf) -> Self {
Self {
path,
send_speed_info: None,
bytes_transferred: 0,
transfer_started: Instant::now(),
transfer_speed: TransferSpeed::from(0),
abort: None,
}
}
pub fn with_transfer_speed_sender(mut self, send_speed_info: Sender<TransferSpeed>) -> Self {
self.send_speed_info = Some(send_speed_info);
self
}
pub fn with_perform_aborter(mut self, abort: AbortPerform) -> Self {
self.abort = Some(abort);
self
}
fn update_bytes_transferred(&mut self, transferred: usize) {
self.bytes_transferred += transferred;
let now = Instant::now();
let difference = now.duration_since(self.transfer_started);
self.transfer_speed =
TransferSpeed::from((self.bytes_transferred) as f64 / difference.as_secs_f64());
}
fn bytes_transferred(&self) -> usize {
self.bytes_transferred
}
fn transfer_speed(&self) -> TransferSpeed {
self.transfer_speed.clone()
}
}
fn send_transfer_info(info: &FileInfo) {
if let Some(tx) = info.send_speed_info.clone() {
let transfer_speed = info.transfer_speed();
tokio::spawn(async move {
tx.send(transfer_speed).await.map_err(|e| {
trace!("{:?}", e);
})
});
}
}
#[derive(Clone, Debug)]
pub struct StreamHandler {
chunk_sender: UnboundedSender<Vec<u8>>,
abort: Option<AbortPerform>,
}
impl StreamHandler {
pub fn new() -> (Self, UnboundedReceiver<Vec<u8>>) {
let (tx, rx) = unbounded_channel();
(
Self {
chunk_sender: tx,
abort: None,
},
rx,
)
}
pub fn with_perform_aborter(mut self, abort: AbortPerform) -> Self {
self.abort = Some(abort);
self
}
}
fn send_stream_data(stream: &StreamHandler, data: Vec<u8>) {
let tx = stream.chunk_sender.clone();
let _ = tx.send(data).map_err(|e| {
trace!("{:?}", e);
});
}
pub trait ExtendedHandler: Handler {
fn get_response_body(&self) -> Option<Vec<u8>> {
None
}
fn get_response_body_and_headers(&self) -> (Option<Vec<u8>>, Option<HeaderMap>) {
(None, None)
}
}
#[derive(Clone, Debug)]
pub enum Collector {
File(FileInfo),
Ram(Vec<u8>),
RamAndHeaders(Vec<u8>, Vec<u8>),
FileAndHeaders(FileInfo, Vec<u8>),
Streaming(StreamHandler, Vec<u8>),
}
impl Handler for Collector {
fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
match self {
Collector::File(info) => {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(info.path.clone())
.map_err(|e| {
trace!("{}", e);
WriteError::Pause
})?;
file.write_all(data).map_err(|e| {
trace!("{}", e);
WriteError::Pause
})?;
info.update_bytes_transferred(data.len());
send_transfer_info(info);
Ok(data.len())
}
Collector::Ram(container) => {
container.extend_from_slice(data);
Ok(data.len())
}
Collector::RamAndHeaders(container, _) => {
container.extend_from_slice(data);
Ok(data.len())
}
Collector::FileAndHeaders(info, _) => {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(info.path.clone())
.map_err(|e| {
trace!("{}", e);
WriteError::Pause
})?;
file.write_all(data).map_err(|e| {
trace!("{}", e);
WriteError::Pause
})?;
info.update_bytes_transferred(data.len());
send_transfer_info(info);
Ok(data.len())
}
Collector::Streaming(stream, _) => {
send_stream_data(stream, data.to_vec());
Ok(data.len())
}
}
}
fn read(&mut self, data: &mut [u8]) -> Result<usize, ReadError> {
match self {
Collector::File(info) => {
let mut file = File::open(info.path.clone()).map_err(|e| {
trace!("{}", e);
ReadError::Abort
})?;
file.seek(SeekFrom::Start(info.bytes_transferred() as u64))
.map_err(|e| {
trace!("{}", e);
ReadError::Abort
})?;
let read_size = file.read(data).map_err(|e| {
trace!("{}", e);
ReadError::Abort
})?;
info.update_bytes_transferred(read_size);
send_transfer_info(info);
Ok(read_size)
}
Collector::Ram(_) => Ok(0),
Collector::RamAndHeaders(_, _) => Ok(0),
Collector::FileAndHeaders(info, _) => {
let mut file = File::open(info.path.clone()).map_err(|e| {
trace!("{}", e);
ReadError::Abort
})?;
file.seek(SeekFrom::Start(info.bytes_transferred() as u64))
.map_err(|e| {
trace!("{}", e);
ReadError::Abort
})?;
let read_size = file.read(data).map_err(|e| {
trace!("{}", e);
ReadError::Abort
})?;
info.update_bytes_transferred(read_size);
send_transfer_info(info);
Ok(read_size)
}
Collector::Streaming(_, _) => Ok(0),
}
}
fn header(&mut self, data: &[u8]) -> bool {
match self {
Collector::File(_) => {}
Collector::Ram(_) => {}
Collector::RamAndHeaders(_, headers) => {
headers.extend_from_slice(data);
}
Collector::FileAndHeaders(_, headers) => {
headers.extend_from_slice(data);
}
Collector::Streaming(_, headers) => {
headers.extend_from_slice(data);
}
}
true
}
fn progress(&mut self, dltotal: f64, dlnow: f64, ultotal: f64, ulnow: f64) -> bool {
trace!("dltotal: {dltotal} dlnow: {dlnow} ultotal: {ultotal} ulnow: {ulnow}");
match self {
Collector::File(file_info) | Collector::FileAndHeaders(file_info, _) => {
if let Some(abort) = &file_info.abort {
abort.lock().map(|a| !*a).unwrap_or_else(|err| {
log::error!("{:?}", err);
false
})
} else {
true
}
}
Collector::Ram(_) | Collector::RamAndHeaders(_, _) => true,
Collector::Streaming(stream, _) => {
if let Some(abort) = &stream.abort {
abort.lock().map(|a| !*a).unwrap_or_else(|err| {
log::error!("{:?}", err);
false
})
} else {
true
}
}
}
}
}
impl ExtendedHandler for Collector {
fn get_response_body(&self) -> Option<Vec<u8>> {
match self {
Collector::File(_) => None,
Collector::Ram(container) => {
if container.is_empty() {
None
} else {
Some(container.clone())
}
}
Collector::RamAndHeaders(container, _) => {
if container.is_empty() {
None
} else {
Some(container.clone())
}
}
Collector::FileAndHeaders(_, _) => None,
Collector::Streaming(_, _) => None,
}
}
fn get_response_body_and_headers(&self) -> (Option<Vec<u8>>, Option<HeaderMap>) {
match self {
Collector::File(_) => (None, None),
Collector::Ram(container) => {
if container.is_empty() {
(None, None)
} else {
(Some(container.clone()), None)
}
}
Collector::RamAndHeaders(container, headers) => {
let header_str = std::str::from_utf8(headers).unwrap();
let mut header_map = HeaderMap::new();
for line in header_str.lines() {
if let Some((key, value)) = line.split_once(": ").to_owned() {
if let Ok(header_name) = HeaderName::from_bytes(key.as_bytes()) {
if let Ok(header_value) = HeaderValue::from_str(value) {
header_map.insert(header_name, header_value);
}
}
}
}
if container.is_empty() {
(None, Some(header_map))
} else {
(Some(container.clone()), Some(header_map))
}
}
Collector::FileAndHeaders(_, headers) => {
let header_str = std::str::from_utf8(headers).unwrap();
let mut header_map = HeaderMap::new();
for line in header_str.lines() {
if let Some((key, value)) = line.split_once(": ").to_owned() {
if let Ok(header_name) = HeaderName::from_bytes(key.as_bytes()) {
if let Ok(header_value) = HeaderValue::from_str(value) {
header_map.insert(header_name, header_value);
}
}
}
}
(None, Some(header_map))
}
Collector::Streaming(_, headers) => {
let header_str = std::str::from_utf8(headers).unwrap();
let mut header_map = HeaderMap::new();
for line in header_str.lines() {
if let Some((key, value)) = line.split_once(": ").to_owned() {
if let Ok(header_name) = HeaderName::from_bytes(key.as_bytes()) {
if let Ok(header_value) = HeaderValue::from_str(value) {
header_map.insert(header_name, header_value);
}
}
}
}
(None, Some(header_map))
}
}
}
}