use super::downloader::{join_all, join_all_futures};
use super::Client;
use crate::header::RANGE;
use crate::Hash;
use crate::{ManicError, Result};
use futures::StreamExt;
#[cfg(feature = "progress")]
use indicatif::ProgressBar;
use rayon::prelude::*;
use std::io::SeekFrom;
use std::path::Path;
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tracing::{info, instrument};
#[derive(Debug, Clone, Copy)]
pub struct Chunks {
low: u64,
hi: u64,
chunk_size: u64,
current_pos: u64,
}
#[derive(Debug, Clone)]
pub struct ChunkVec {
chunks: Arc<Vec<Chunk>>,
}
impl ChunkVec {
pub async fn save_to_file<T: AsRef<Path>>(&self, path: T) -> Result<()> {
let f = File::create(path).await?;
self.save(f).await
}
pub(crate) async fn save(&self, output: File) -> Result<()> {
let mut fut_vec = Vec::new();
for i in self.chunks.iter() {
let f = output.try_clone().await?;
let c = i.clone();
fut_vec.push(tokio::spawn(c.save(f)))
}
join_all(fut_vec).await?;
output.sync_all().await?;
Ok(())
}
pub async fn to_vec(&self) -> Vec<u8> {
self.chunks
.iter()
.flat_map(|x| x.buf.to_vec())
.collect::<Vec<u8>>()
}
pub(crate) async fn verify(&self, mut hash: Hash) -> Result<()> {
self.chunks
.iter()
.for_each(|x| hash.update(x.buf.as_slice()));
hash.verify()
}
}
impl From<Vec<Chunk>> for ChunkVec {
fn from(mut v: Vec<Chunk>) -> Self {
v.par_sort_unstable_by(|a, b| a.pos.cmp(&b.pos));
Self {
chunks: Arc::new(v),
}
}
}
#[derive(Debug, Clone)]
pub struct Chunk {
pub buf: Vec<u8>,
pub low: u64,
pub hi: u64,
pub pos: u64,
pub len: u64,
pub bytes: String,
}
impl AsRef<Chunk> for Chunk {
fn as_ref(&self) -> &Chunk {
self
}
}
impl Chunk {
#[instrument(skip(self, output), fields(low=%self.low, hi=%self.hi, range=%self.bytes, pos=%self.pos))]
pub(crate) async fn save(self, mut output: File) -> Result<()> {
output.seek(SeekFrom::Start(self.low)).await?;
info!("Seeked");
let n = output.write(self.buf.as_slice()).await?;
info!("Written {} bytes", n);
Ok(())
}
#[instrument(skip(self, client, pb), fields(range = %self.bytes))]
pub(crate) async fn download(
mut self,
client: Client,
url: String,
#[cfg(feature = "progress")] pb: Option<ProgressBar>,
) -> Result<Self> {
let resp = client
.get(url.to_string())
.header(RANGE, self.bytes.clone())
.send()
.await?;
let mut res: Vec<u8> = resp
.bytes_stream()
.filter_map(
|x: std::result::Result<bytes::Bytes, reqwest::Error>| async {
if let Ok(byt) = x {
#[cfg(feature = "progress")]
if let Some(bar) = &pb {
bar.inc(byt.len() as u64);
}
return Some(byt.to_vec());
}
None
},
)
.collect::<Vec<Vec<u8>>>()
.await
.into_iter()
.flatten()
.collect();
self.buf.append(&mut res);
Ok(self)
}
}
impl Chunks {
pub fn new(low: u64, hi: u64, chunk_size: u64) -> Result<Self> {
if chunk_size == 0 {
return Err(ManicError::BadChunkSize);
}
Ok(Chunks {
low,
hi,
chunk_size,
current_pos: 1,
})
}
pub async fn download(
&self,
client: Client,
url: String,
#[cfg(feature = "progress")] pb: Option<ProgressBar>,
) -> Result<ChunkVec> {
let fut_vec = self
.map(|x| {
x.download(
client.clone(),
url.clone(),
#[cfg(feature = "progress")]
pb.clone(),
)
})
.collect::<Vec<_>>();
let list = join_all_futures(fut_vec).await?;
Ok(ChunkVec::from(list))
}
}
impl Iterator for Chunks {
type Item = Chunk;
fn next(&mut self) -> Option<Self::Item> {
if self.low > self.hi {
None
} else {
let prev_low = self.low;
self.low += std::cmp::min(self.chunk_size, self.hi - self.low + 1);
let chunk_len = (self.low - 1) - prev_low;
let bytes = format!("bytes={}-{}", prev_low, self.low - 1);
let res = Chunk {
buf: Vec::new(),
low: prev_low,
hi: self.low - 1,
len: chunk_len,
pos: self.current_pos,
bytes,
};
self.current_pos += 1;
Some(res)
}
}
}