use crate::alluse::*;
pub struct ReadFilesProcess {
read: Enveloped,
reqs: Requests,
current: Option<mpsc::Sender<Result<Bytes, tokio::io::Error>>>,
}
impl ReadFilesProcess {
pub fn spawn(read: Enveloped, reqs: Requests) -> JoinHandle<Result<Enveloped>> {
tokio::spawn(
ReadFilesProcess {
read,
reqs,
current: None,
}
.run(),
)
}
pub async fn run(mut self) -> Result<Enveloped> {
let res = self.process().await;
log::debug!("Client exiting: {:?}", res);
use tokio::io::{Error, ErrorKind};
let remaining = self.reqs.lock().unwrap().requests.take();
let cex = |msg: &str| Err(Error::new(ErrorKind::ConnectionAborted, anyhow::anyhow!("{}", msg)));
if let Some(current) = self.current.take() {
if res.is_err() {
current.send(cex("Client exited while reading file")).await.ok();
} else {
current.send(cex("Internal panic!")).await.ok();
unreachable!();
}
};
for (_, cs) in remaining.unwrap().into_iter() {
for c in cs.into_iter() {
c.send(cex("Client exited")).await.ok();
}
}
res?;
Ok(self.read)
}
async fn process(&mut self) -> Result<()> {
loop {
RequestsInner::reset_timeout(&mut self.reqs);
log::trace!("Awaiting file");
let idx = self.read.read_i32_le().await?;
if idx == -1 {
break Ok(());
}
log::debug!("Receiving file {}", idx);
{
let mut buf = [0u8; 16];
self.read.read_exact(&mut buf).await?;
anyhow::ensure!(
buf == [0u8; 16],
"Protocol error: we requested a plain file, not blocks and checksums"
);
}
let idx = idx as usize;
self.current = {
let mut requests = self.reqs.lock().unwrap();
requests.disable_timeout();
let table = requests.requests.as_mut().unwrap();
let mut reqs = table.get_mut(&idx);
let req = reqs
.as_mut()
.and_then(|v| v.pop())
.context("Got file - no memory of requesting it (maybe timed out erroneously?)")?;
if reqs.map(|v| v.len()) == Some(0) {
table.remove(&idx).unwrap();
}
Some(req)
};
let mut size: usize = 0;
loop {
let chunklen = self.read.read_i32_le().await?;
if chunklen == 0 {
break;
}
anyhow::ensure!(chunklen > 0, "Protocol error: negative sized chunk");
let mut chunklen = chunklen as usize;
size += chunklen;
while chunklen > 0 {
let read = std::cmp::min(1 << 16, chunklen);
let mut buf = BytesMut::new();
buf.resize(read, 0);
chunklen -= self.read.read_exact(&mut buf).await?;
if let Some(backchan) = self.current.as_mut() {
log::trace!("File {}: got part {}, {} remaining in chunk", idx, buf.len(), chunklen);
if let Err(_e) = backchan.send(Ok(buf.into())).await {
self.current = None;
log::warn!("Internal close while receiving file: {} - ignoring", _e);
}
}
}
}
{
let mut buf = [0u8; 16];
self.read.read_exact(&mut buf).await?;
log::debug!(
"Finished {} successfully, {} B, checksum {:?} not checked",
idx,
size,
buf
);
}
self.current = None; }
}
}
pub struct RequestsInner {
pub requests: Option<HashMap<usize, Vec<mpsc::Sender<Result<Bytes, tokio::io::Error>>>>>,
pub timeout: Option<Instant>,
}
impl RequestsInner {
pub fn new_requests() -> Requests {
Arc::new(SyncMutex::new(RequestsInner {
requests: Some(default()),
timeout: None,
}))
}
fn timeout() -> Instant {
Instant::now() + Duration::from_secs_f64(30.)
}
pub fn refresh_timeout(&mut self) {
if let Some(to) = self.timeout.as_mut() {
*to = Self::timeout();
}
}
pub fn reset_timeout(reqs: &mut Requests) {
let mut reqs_inner = reqs.lock().unwrap();
let spawn = reqs_inner.timeout.is_none();
reqs_inner.timeout = Some(Self::timeout()); if spawn {
tokio::spawn(Self::timeout_proc(reqs.clone()));
}
}
pub fn disable_timeout(&mut self) {
self.timeout.take();
}
async fn timeout_proc(reqs: Requests) {
loop {
enum Action {
ReSleep(Instant),
Timeout(Vec<mpsc::Sender<Result<Bytes, tokio::io::Error>>>),
}
use Action::*;
let action = {
let mut reqs = reqs.lock().unwrap();
match reqs.timeout {
None => break, Some(timeout) if timeout > Instant::now() => ReSleep(timeout),
Some(_) => Timeout(
reqs.requests
.iter_mut()
.flat_map(|m| m.drain())
.flat_map(|(idx, rs)| {
log::debug!("Timeouting {} requests for file {}", rs.len(), idx);
rs.into_iter()
})
.collect(),
),
}
};
match action {
ReSleep(until) => tokio::time::sleep_until(until).await,
Timeout(requests) => {
for request in requests.into_iter() {
tokio::spawn(async move {
use std::io::{Error, ErrorKind};
request
.send(Err(Error::new(ErrorKind::TimedOut, anyhow::anyhow!("Timed out"))))
.await
.ok();
});
}
break;
}
}
}
}
}