use std::{
num::NonZeroUsize,
path::{Path, PathBuf},
time::Duration,
};
use crate::{
connections::{get_timestamp, speedometer::Speedometer},
ui_messages::{DownloadEvent, DownloadInfo, UiEvent},
wire_messages::{ReadQuery, Request},
wishlist::{DownloadRequest, RequestedFile, WishList},
};
use anyhow::anyhow;
use bincode::serialize;
use futures::{pin_mut, StreamExt};
use harddrive_party_shared::wire_messages::{AnnounceAddress, Entry};
use key_to_animal::key_to_name;
use log::{debug, error, warn};
use lru::LruCache;
use quinn::{Connection, RecvStream};
use std::sync::{Arc, Mutex};
use tokio::{
fs::{create_dir_all, File, OpenOptions},
io::AsyncWriteExt,
sync::broadcast,
};
pub const DOWNLOAD_BLOCK_SIZE: usize = 64 * 1024;
const UPDATE_EVERY: u64 = 10 * 1024;
const CACHE_SIZE: usize = 64;
type IndexCache = LruCache<Request, Vec<Vec<Entry>>>;
#[derive(Debug)]
pub struct Peer {
pub connection: Connection,
pub public_key: [u8; 32],
pub announce_address: Option<AnnounceAddress>,
pub index_cache: Arc<Mutex<IndexCache>>,
}
impl Peer {
pub fn new(
connection: Connection,
event_broadcaster: broadcast::Sender<UiEvent>,
download_dir: PathBuf,
public_key: [u8; 32],
wishlist: WishList,
announce_address: Option<AnnounceAddress>,
) -> Self {
let connection_clone = connection.clone();
let peer_name = key_to_name(&public_key);
tokio::spawn(async move {
if let Err(err) = process_requests(
public_key,
connection_clone,
peer_name,
wishlist,
download_dir,
event_broadcaster,
)
.await
{
error!("Error when processing requests: {err:?}");
}
});
Self {
connection,
public_key,
announce_address,
index_cache: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(CACHE_SIZE).expect("Cache size to be non-zero"),
))),
}
}
}
async fn process_requests(
public_key: [u8; 32],
connection: Connection,
peer_name: String,
wishlist: WishList,
download_dir: PathBuf,
event_broadcaster: broadcast::Sender<UiEvent>,
) -> anyhow::Result<()> {
let request_stream = wishlist.requests_for_peer(&public_key);
pin_mut!(request_stream);
while let Some(mut request) = request_stream.next().await {
let progress = wishlist
.get_download_progress_for_request(request.request_id)
.unwrap_or_default();
let associated_request = wishlist.get_request(request.request_id)?;
match download(
&request,
&connection,
&download_dir,
event_broadcaster.clone(),
peer_name.clone(),
progress,
associated_request.clone(),
)
.await
{
Ok(()) => {
debug!("Download successfull");
request.downloaded = true;
let id = request.request_id;
match wishlist.file_completed(request) {
Ok(request_complete) => {
if request_complete
&& event_broadcaster
.send(UiEvent::Download(DownloadEvent {
request_id: id,
path: associated_request.path.clone(),
peer_name: peer_name.clone(),
download_info: DownloadInfo::Completed(get_timestamp()),
}))
.is_err()
{
warn!("Response channel closed");
};
}
Err(e) => {
warn!("Could not remove item from wishlist {e:?}")
}
}
}
Err(e) => {
warn!("Error downloading {e:?}");
}
}
}
Ok(())
}
async fn download(
requested_file: &RequestedFile,
connection: &Connection,
download_dir: &Path,
event_broadcaster: broadcast::Sender<UiEvent>,
peer_name: String,
progress_request: u64,
associated_request: DownloadRequest,
) -> anyhow::Result<()> {
let id = requested_file.request_id;
let output_path = download_dir.join(requested_file.path.clone());
let (mut file, start_offset) = setup_download(output_path, requested_file.size).await?;
let mut bytes_read: u64 = start_offset.unwrap_or_default();
let mut total_bytes_read = progress_request;
let mut final_speed = 0;
if start_offset >= Some(requested_file.size) {
debug!("File already downloaded");
} else {
debug!(
"Requesting {} from offset {:?}",
requested_file.path, start_offset
);
let mut recv = make_read_request(connection, requested_file, start_offset).await?;
let mut buf: [u8; DOWNLOAD_BLOCK_SIZE] = [0; DOWNLOAD_BLOCK_SIZE];
let mut bytes_read_since_last_ui_update = 0;
let mut speedometer = Speedometer::new(Duration::from_secs(5));
loop {
match recv.read(&mut buf).await {
Ok(Some(n)) => {
bytes_read_since_last_ui_update += n as u64;
speedometer.entry(n);
if let Err(error) = file.write(&buf[..n]).await {
warn!("Cannot write downloading file {error:?}");
break;
}
if bytes_read_since_last_ui_update > UPDATE_EVERY {
bytes_read += bytes_read_since_last_ui_update;
total_bytes_read += bytes_read_since_last_ui_update;
if bytes_read > requested_file.size {
error!("Downloading file is bigger than expected!");
}
debug!(
"Read {} bytes - {} of {}",
bytes_read_since_last_ui_update, bytes_read, requested_file.size
);
bytes_read_since_last_ui_update = 0;
if event_broadcaster
.send(UiEvent::Download(DownloadEvent {
request_id: id,
path: associated_request.path.clone(),
peer_name: peer_name.clone(),
download_info: DownloadInfo::Downloading {
path: requested_file.path.clone(),
bytes_read,
total_bytes_read,
speed: speedometer.measure().try_into().unwrap_or_default(),
},
}))
.is_err()
{
warn!("Response channel closed");
break;
};
}
}
Ok(None) => {
debug!("Stream ended");
bytes_read += bytes_read_since_last_ui_update;
final_speed = speedometer.measure().try_into().unwrap_or_default();
break;
}
Err(error) => {
error!("Got error {error:?}");
bytes_read += bytes_read_since_last_ui_update;
final_speed = speedometer.measure().try_into().unwrap_or_default();
break;
}
}
}
}
if event_broadcaster
.send(UiEvent::Download(DownloadEvent {
request_id: id,
peer_name: peer_name.clone(),
path: associated_request.path.clone(),
download_info: DownloadInfo::Downloading {
path: requested_file.path.clone(),
bytes_read,
total_bytes_read,
speed: final_speed,
},
}))
.is_err()
{
warn!("Response channel closed");
}
if bytes_read < requested_file.size {
return Err(anyhow!(
"Download incomplete - {} of {} bytes downloaded",
bytes_read,
requested_file.size
));
}
Ok(())
}
async fn make_read_request(
connection: &Connection,
requested_file: &RequestedFile,
start: Option<u64>,
) -> anyhow::Result<RecvStream> {
let request = Request::Read(ReadQuery {
path: requested_file.path.clone(),
start,
end: None,
});
let (mut send, recv) = connection.open_bi().await?;
let buf = serialize(&request)?;
send.write_all(&buf).await?;
send.finish()?;
Ok(recv)
}
async fn setup_download(file_path: PathBuf, size: u64) -> anyhow::Result<(File, Option<u64>)> {
create_dir_all(
file_path
.parent()
.ok_or_else(|| anyhow!("Cannot get parent"))?,
)
.await?;
let file = OpenOptions::new()
.append(true)
.create(true)
.open(file_path)
.await?;
let metadata = file.metadata().await?;
let existing_file_size = metadata.len();
let start_offset = if existing_file_size > size {
error!("Existing file is bigger than the remote source");
Some(size)
} else {
match existing_file_size {
0 => None,
_ => Some(existing_file_size),
}
};
Ok((file, start_offset))
}