#![allow(unused_imports)]
use std::io;
use std::io::prelude::*;
use std::io::ErrorKind;
use std::path::Path;
use std::process::{Child, Command, Stdio};
use anyhow::{bail, Context, Result};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::flist::{read_file_list, FileList};
use crate::mux::DemuxRead;
use crate::varint::{ReadVarint, WriteVarint};
use crate::{Options, ServerStatistics};
const MY_PROTOCOL_VERSION: i32 = 27;
pub(crate) struct Connection {
rv: ReadVarint,
wv: WriteVarint,
protocol_version: i32,
#[allow(unused)]
salt: i32,
child: Child,
#[allow(unused)]
options: Options,
}
impl Connection {
pub(crate) fn handshake(
r: Box<dyn Read>,
w: Box<dyn Write>,
child: Child,
options: Options,
) -> Result<Connection> {
let mut wv = WriteVarint::new(w);
let mut rv = ReadVarint::new(r);
wv.write_i32(MY_PROTOCOL_VERSION)?;
let remote_protocol_version = rv.read_i32().unwrap();
if remote_protocol_version < MY_PROTOCOL_VERSION {
bail!(
"server protocol version {} is too old",
remote_protocol_version
);
}
let salt = rv.read_i32().unwrap();
debug!(
"Connected to server version {}, salt {:#x}",
remote_protocol_version, salt
);
let protocol_version = std::cmp::min(MY_PROTOCOL_VERSION, remote_protocol_version);
debug!("Agreed protocol version {}", protocol_version);
let rv = ReadVarint::new(Box::new(DemuxRead::new(rv.take())));
Ok(Connection {
rv,
wv,
protocol_version,
salt,
child,
options,
})
}
pub(crate) fn list_files(mut self) -> Result<(FileList, ServerStatistics)> {
let max_phase = 2;
self.send_exclusions()?;
let mut file_list = read_file_list(&mut self.rv)?;
crate::flist::sort(&mut file_list);
if self.protocol_version < 30 {
let io_error_count = self
.rv
.read_i32()
.context("Failed to read server error count")?;
if io_error_count > 0 {
warn!("Server reports {} IO errors", io_error_count);
}
}
for phase in 1..=max_phase {
debug!("Start phase {}", phase);
self.wv
.write_i32(-1)
.context("Failed to send phase transition")?;
if file_list.is_empty() {
info!("Server returned no files, so we're done");
self.shutdown()?;
return Ok((file_list, ServerStatistics::default()));
}
assert_eq!(
self.rv
.read_i32()
.context("Failed to read phase transition")?,
-1
);
}
debug!("Send end of sequence");
self.wv
.write_i32(-1)
.context("Failed to send end-of-sequence marker")?;
let server_stats = self
.read_server_statistics()
.context("Failed to read server statistics")?;
info!("{:#?}", server_stats);
self.shutdown()?;
Ok((file_list, server_stats))
}
fn shutdown(self) -> Result<()> {
let Connection {
rv,
wv,
protocol_version: _,
salt: _,
mut child,
options: _,
} = self;
rv.check_for_eof()?;
drop(wv);
let child_result = child.wait()?;
info!("Child process exited: {}", child_result);
Ok(())
}
fn send_exclusions(&mut self) -> Result<()> {
self.wv
.write_i32(0)
.context("Failed to send exclusion list")
}
fn read_server_statistics(&mut self) -> Result<ServerStatistics> {
Ok(ServerStatistics {
total_bytes_read: self.rv.read_i64()?,
total_bytes_written: self.rv.read_i64()?,
total_file_size: self.rv.read_i64()?,
flist_build_time: if self.protocol_version >= 29 {
Some(self.rv.read_i64()?)
} else {
None
},
flist_xfer_time: if self.protocol_version >= 29 {
Some(self.rv.read_i64()?)
} else {
None
},
})
}
}