use std::{error, time::Duration};
#[cfg(feature = "logging")]
use log::debug;
#[cfg(feature = "logging")]
use log::info;
use tokio::{fs::File, net::ToSocketAddrs, time};
use crate::{get_buf, punch_hole, read_position, send_unil_recv, u8s_to_u64, Source};
async fn get_file_buf_from_msg_num<Buf>(
msg: u64,
file: &File,
buf_size: u64,
buf: Buf,
) -> Result<(Buf, usize), Box<dyn error::Error + Send + Sync>>
where
Buf: AsMut<[u8]> + Send + 'static,
{
read_position(&file, buf, msg * buf_size).await
}
pub async fn send_file<T: Clone + 'static + ToSocketAddrs + Send + Copy + std::fmt::Display>(
source: Source,
file_name: &str,
reciever: T,
) -> Result<(), Box<dyn error::Error + Send + Sync>> {
#[cfg(feature = "logging")]
debug!("reciever ip: {}", reciever);
let sock = source.into_socket().await;
let sock_ = sock.clone();
let reciever_ = reciever.clone();
let holepuncher = tokio::task::spawn(async move {
let sock = sock_;
let mut holepunch_interval = time::interval(Duration::from_secs(5));
loop {
punch_hole(&sock, reciever_).await.unwrap();
holepunch_interval.tick().await;
}
});
time::sleep(Duration::from_millis(1000)).await;
let input_file = File::open(file_name).await?;
let file_len = input_file.metadata().await?.len();
let file_len_arr = file_len.to_be_bytes();
let msg = [
8,
file_len_arr[0],
file_len_arr[1],
file_len_arr[2],
file_len_arr[3],
file_len_arr[4],
file_len_arr[5],
file_len_arr[6],
file_len_arr[7],
];
#[cfg(feature = "logging")]
debug!("sending file size...");
let mut has_sent = false;
loop {
let mut buf = [0u8; 508];
let sleep = time::sleep(Duration::from_millis(1500));
tokio::select! {
_ = sleep => {
if has_sent {
break;
}
}
amt = sock.recv(&mut buf) => {
let amt = amt?;
let buf = &buf[0..amt];
#[cfg(feature = "logging")]
debug!("got msg: {:?}", buf);
if buf.len() == 1 && buf[0] == 9 {
sock.send_to(&msg, reciever).await?;
has_sent = true;
}
}
}
}
#[cfg(feature = "logging")]
debug!("has sent file size");
let mut offset = 0;
let mut msg_num: u64 = 0;
loop {
let (file_buf, amt) = read_position(&input_file, [0u8; 500], offset).await?;
let buf = get_buf(&msg_num, &file_buf[0..amt]);
sock.send_to(&buf, reciever).await?;
offset += 500;
if offset >= file_len {
break;
}
#[cfg(feature = "logging")]
info!("Progress: {}%", offset * 100 / file_len);
msg_num += 1;
}
loop {
#[cfg(feature = "logging")]
info!("Sending done, getting dropped messages");
let mut buf = [0u8; 508];
let amt = send_unil_recv(&sock, &[5], &reciever, &mut buf, 100).await?;
let buf = &buf[0..amt];
if buf[0] == 7 {
holepuncher.abort();
#[cfg(feature = "logging")]
info!("No dropped messages left, finishing...");
return Ok(());
} else if buf[0] != 6 {
continue;
}
let missed = &buf[1..];
#[cfg(feature = "logging")]
info!("Dropped messages: {}", missed.len() / 8);
for i in 0..(missed.len() / 8) {
let j = i * 8;
let missed_msg = u8s_to_u64(&missed[j..j + 8])?;
let (file_buf, amt) =
get_file_buf_from_msg_num(missed_msg, &input_file, 500, [0u8; 500]).await?;
let file_buf = &file_buf[0..amt];
let buf = get_buf(&missed_msg, file_buf);
sock.send_to(&buf, reciever).await?;
}
}
}