use std::{
path::{Path, PathBuf},
time::Duration,
};
use crate::{
hdp::get_timestamp,
ui_messages::{DownloadResponse, UiResponse, UiServerMessage},
wire_messages::{ReadQuery, Request},
wishlist::{DownloadRequest, RequestedFile, WishList},
};
use anyhow::anyhow;
use bincode::serialize;
use futures::{pin_mut, StreamExt};
use harddrive_party_shared::ui_messages::DownloadInfo;
use key_to_animal::key_to_name;
use log::{debug, error, warn};
use quinn::{Connection, RecvStream};
use speedometer::Speedometer;
use tokio::{
fs::{create_dir_all, File, OpenOptions},
io::AsyncWriteExt,
sync::mpsc::Sender,
};
const DOWNLOAD_BLOCK_SIZE: usize = 64 * 1024;
const UPDATE_EVERY: u64 = 10 * 1024;
#[derive(Debug)]
pub struct Peer {
pub connection: Connection,
pub public_key: [u8; 32],
}
impl Peer {
pub fn new(
connection: Connection,
response_tx: Sender<UiServerMessage>,
download_dir: PathBuf,
public_key: [u8; 32],
wishlist: WishList,
) -> Self {
let connection_clone = connection.clone();
let peer_name = key_to_name(&public_key);
tokio::spawn(async move {
if let Err(err) = process_requests(
public_key,
connection_clone,
peer_name,
wishlist,
download_dir,
response_tx,
)
.await
{
error!("Error when processing requests: {:?}", err);
}
});
Self {
connection,
public_key,
}
}
}
async fn process_requests(
public_key: [u8; 32],
connection: Connection,
peer_name: String,
wishlist: WishList,
download_dir: PathBuf,
response_tx: Sender<UiServerMessage>,
) -> anyhow::Result<()> {
let request_stream = wishlist.requests_for_peer(&public_key);
pin_mut!(request_stream);
while let Some(mut request) = request_stream.next().await {
let progress = wishlist
.get_download_progress_for_request(request.request_id)
.unwrap_or_default();
let associated_request = wishlist.get_request(request.request_id)?;
match download(
&request,
&connection,
&download_dir,
response_tx.clone(),
peer_name.clone(),
progress,
associated_request.clone(),
)
.await
{
Ok(()) => {
debug!("Download successfull");
request.downloaded = true;
let id = request.request_id;
match wishlist.file_completed(request) {
Ok(request_complete) => {
if request_complete
&& response_tx
.send(UiServerMessage::Response {
id,
response: Ok(UiResponse::Download(DownloadResponse {
path: associated_request.path.clone(),
peer_name: peer_name.clone(),
download_info: DownloadInfo::Completed(get_timestamp()),
})),
})
.await
.is_err()
{
warn!("Response channel closed");
};
}
Err(e) => {
warn!("Could not remove item from wishlist {:?}", e)
}
}
}
Err(e) => {
warn!("Error downloading {:?}", e);
}
}
}
Ok(())
}
async fn download(
requested_file: &RequestedFile,
connection: &Connection,
download_dir: &Path,
response_tx: Sender<UiServerMessage>,
peer_name: String,
progress_request: u64,
associated_request: DownloadRequest,
) -> anyhow::Result<()> {
let id = requested_file.request_id;
let output_path = download_dir.join(requested_file.path.clone());
let (mut file, start_offset) = setup_download(output_path, requested_file.size).await?;
let mut bytes_read: u64 = start_offset.unwrap_or_default();
let mut total_bytes_read = progress_request;
let mut final_speed = 0;
if start_offset >= Some(requested_file.size) {
debug!("File already downloaded");
} else {
debug!(
"Requesting {} from offset {:?}",
requested_file.path, start_offset
);
let mut recv = make_read_request(connection, requested_file, start_offset).await?;
let mut buf: [u8; DOWNLOAD_BLOCK_SIZE] = [0; DOWNLOAD_BLOCK_SIZE];
let mut bytes_read_since_last_ui_update = 0;
let mut speedometer = Speedometer::new(Duration::from_secs(5));
loop {
match recv.read(&mut buf).await {
Ok(Some(n)) => {
bytes_read_since_last_ui_update += n as u64;
speedometer.entry(n);
if let Err(error) = file.write(&buf[..n]).await {
warn!("Cannot write downloading file {:?}", error);
break;
}
if bytes_read_since_last_ui_update > UPDATE_EVERY {
bytes_read += bytes_read_since_last_ui_update;
total_bytes_read += bytes_read_since_last_ui_update;
if bytes_read > requested_file.size {
error!("Downloading file is bigger than expected!");
}
debug!(
"Read {} bytes - {} of {}",
bytes_read_since_last_ui_update, bytes_read, requested_file.size
);
bytes_read_since_last_ui_update = 0;
if response_tx
.send(UiServerMessage::Response {
id,
response: Ok(UiResponse::Download(DownloadResponse {
path: associated_request.path.clone(),
peer_name: peer_name.clone(),
download_info: DownloadInfo::Downloading {
path: requested_file.path.clone(),
bytes_read,
total_bytes_read,
speed: speedometer
.measure()
.unwrap_or_default()
.try_into()
.unwrap(),
},
})),
})
.await
.is_err()
{
warn!("Response channel closed");
break;
};
}
}
Ok(None) => {
debug!("Stream ended");
bytes_read += bytes_read_since_last_ui_update;
final_speed = speedometer
.measure()
.unwrap_or_default()
.try_into()
.unwrap();
break;
}
Err(error) => {
error!("Got error {:?}", error);
bytes_read += bytes_read_since_last_ui_update;
final_speed = speedometer
.measure()
.unwrap_or_default()
.try_into()
.unwrap();
break;
}
}
}
}
if response_tx
.send(UiServerMessage::Response {
id,
response: Ok(UiResponse::Download(DownloadResponse {
peer_name: peer_name.clone(),
path: associated_request.path.clone(),
download_info: DownloadInfo::Downloading {
path: requested_file.path.clone(),
bytes_read,
total_bytes_read,
speed: final_speed,
},
})),
})
.await
.is_err()
{
warn!("Response channel closed");
}
if bytes_read < requested_file.size {
return Err(anyhow!(
"Download incomplete - {} of {} bytes downloaded",
bytes_read,
requested_file.size
));
}
Ok(())
}
async fn make_read_request(
connection: &Connection,
requested_file: &RequestedFile,
start: Option<u64>,
) -> anyhow::Result<RecvStream> {
let request = Request::Read(ReadQuery {
path: requested_file.path.clone(),
start,
end: None,
});
let (mut send, recv) = connection.open_bi().await?;
let buf = serialize(&request)?;
send.write_all(&buf).await?;
send.finish().await?;
Ok(recv)
}
async fn setup_download(file_path: PathBuf, size: u64) -> anyhow::Result<(File, Option<u64>)> {
create_dir_all(
file_path
.parent()
.ok_or_else(|| anyhow!("Cannot get parent"))?,
)
.await?;
let file = OpenOptions::new()
.append(true)
.create(true)
.open(file_path)
.await?;
let metadata = file.metadata().await?;
let existing_file_size = metadata.len();
let start_offset = if existing_file_size > size {
error!("Existing file is bigger than the remote source");
Some(size)
} else {
match existing_file_size {
0 => None,
_ => Some(existing_file_size),
}
};
Ok((file, start_offset))
}