extern crate anyhow;
extern crate url;
use crate::handshake::*;
use crate::message::*;
use crate::peer::*;
use crate::piece::*;
use anyhow::{anyhow, Result};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::io::{Cursor, Read, Write};
use std::net::{IpAddr, SocketAddr, TcpStream};
use std::time::Duration;
pub struct Client {
peer: Peer,
peer_id: Vec<u8>,
info_hash: Vec<u8>,
conn: TcpStream,
bitfield: Vec<u8>,
choked: bool,
}
impl Client {
pub fn new(peer: Peer, peer_id: Vec<u8>, info_hash: Vec<u8>) -> Result<Client> {
let peer_socket = SocketAddr::new(IpAddr::V4(peer.get_ip()), peer.get_port());
let conn = match TcpStream::connect_timeout(&peer_socket, Duration::from_secs(15)) {
Ok(conn) => conn,
Err(_) => return Err(anyhow!("could not connect to peer")),
};
info!("Connected to peer {:?}", peer.get_id());
let client = Client {
peer,
peer_id,
info_hash,
conn,
bitfield: vec![],
choked: true,
};
Ok(client)
}
pub fn is_choked(&self) -> bool {
self.choked
}
pub fn has_piece(&self, index: u32) -> bool {
let byte_index = index / 8;
let offset = index % 8;
if byte_index < self.bitfield.len() as u32 {
return self.bitfield[byte_index as usize] >> (7 - offset) as u8 & 1 != 0;
}
false
}
pub fn set_piece(&mut self, index: u32) {
let byte_index = index / 8;
let offset = index % 8;
let mut bitfield: Vec<u8> = self.bitfield.to_vec();
if byte_index < self.bitfield.len() as u32 {
bitfield[byte_index as usize] |= (1 << (7 - offset)) as u8;
self.bitfield = bitfield;
}
}
pub fn set_connection_timeout(&self, secs: u64) -> Result<()> {
if self
.conn
.set_write_timeout(Some(Duration::from_secs(secs)))
.is_err()
{
return Err(anyhow!("could not set write timeout"));
}
if self
.conn
.set_read_timeout(Some(Duration::from_secs(secs)))
.is_err()
{
return Err(anyhow!("could not set read timeout"));
}
Ok(())
}
pub fn handshake_with_peer(&mut self) -> Result<()> {
let peer_id = self.peer_id.clone();
let info_hash = self.info_hash.clone();
let handshake = Handshake::new(peer_id, info_hash);
let handshake_encoded: Vec<u8> = handshake.serialize()?;
if self.conn.write(&handshake_encoded).is_err() {
return Err(anyhow!("could not send handshake to peer"));
}
let handshake_len: usize = self.read_handshake_len()?;
let mut handshake_buf: Vec<u8> = vec![0; 48 + handshake_len];
if self.conn.read_exact(&mut handshake_buf).is_err() {
return Err(anyhow!("could not read handshake received from peer"));
}
let handshake_decoded: Handshake = deserialize_handshake(&handshake_buf, handshake_len)?;
if handshake_decoded.get_info_hash() != self.info_hash {
return Err(anyhow!("invalid handshake received from peer"));
}
Ok(())
}
fn read_handshake_len(&mut self) -> Result<usize> {
let mut buf = [0; 1];
if self.conn.read_exact(&mut buf).is_err() {
return Err(anyhow!(
"could not read handshake length received from peer"
));
}
let handshake_len = buf[0];
if handshake_len == 0 {
return Err(anyhow!("invalid handshake length received from peer"));
}
Ok(handshake_len as usize)
}
pub fn read_message(&mut self) -> Result<Message> {
let message_len: usize = self.read_message_len()?;
if message_len == 0 {
info!("Receive KEEP_ALIVE from peer {:?}", self.peer.get_id());
return Err(anyhow!("keep-alive"));
}
let mut message_buf: Vec<u8> = vec![0; message_len];
if self.conn.read_exact(&mut message_buf).is_err() {
return Err(anyhow!("could not read message received from peer"));
}
let message: Message = deserialize_message(&message_buf, message_len)?;
Ok(message)
}
fn read_message_len(&mut self) -> Result<usize> {
let mut buf = vec![0; 4];
if self.conn.read_exact(&mut buf).is_err() {
return Err(anyhow!("could not read message length received from peer"));
}
let mut cursor = Cursor::new(buf);
let message_len = cursor.read_u32::<BigEndian>()?;
Ok(message_len as usize)
}
pub fn read_choke(&mut self) {
info!("Receive MESSAGE_CHOKE from peer {:?}", self.peer.get_id());
self.choked = true
}
pub fn send_unchoke(&mut self) -> Result<()> {
let message: Message = Message::new(MESSAGE_UNCHOKE);
let message_encoded = message.serialize()?;
info!("Send MESSAGE_UNCHOKE to peer {:?}", self.peer.get_id());
if self.conn.write(&message_encoded).is_err() {
return Err(anyhow!("could not send MESSAGE_UNCHOKE to peer"));
}
Ok(())
}
pub fn read_unchoke(&mut self) {
info!("Receive MESSAGE_UNCHOKE from peer {:?}", self.peer.get_id());
self.choked = false
}
pub fn send_interested(&mut self) -> Result<()> {
let message: Message = Message::new(MESSAGE_INTERESTED);
let message_encoded = message.serialize()?;
info!("Send MESSAGE_INTERESTED to peer {:?}", self.peer.get_id());
if self.conn.write(&message_encoded).is_err() {
return Err(anyhow!("could not send MESSAGE_INTERESTED to peer"));
}
Ok(())
}
pub fn send_have(&mut self, index: u32) -> Result<()> {
let mut payload: Vec<u8> = vec![];
payload.write_u32::<BigEndian>(index)?;
let message: Message = Message::new_with_payload(MESSAGE_HAVE, payload);
let message_encoded = message.serialize()?;
info!("Send MESSAGE_HAVE to peer {:?}", self.peer.get_id());
if self.conn.write(&message_encoded).is_err() {
return Err(anyhow!("could not send MESSAGE_HAVE to peer"));
}
Ok(())
}
pub fn read_have(&mut self, message: Message) -> Result<()> {
info!("Receive MESSAGE_HAVE from peer {:?}", self.peer.get_id());
if message.get_id() != MESSAGE_HAVE || message.get_payload().len() != 4 {
return Err(anyhow!("received invalid MESSAGE_HAVE from peer"));
}
let mut payload_cursor = Cursor::new(message.get_payload());
let index = payload_cursor.read_u32::<BigEndian>()?;
self.set_piece(index);
Ok(())
}
pub fn read_bitfield(&mut self) -> Result<()> {
info!(
"Receive MESSAGE_BITFIELD from peer {:?}",
self.peer.get_id()
);
let message: Message = self.read_message()?;
if message.get_id() != MESSAGE_BITFIELD {
return Err(anyhow!("received invalid MESSAGE_BITFIELD from peer"));
}
self.bitfield = message.get_payload();
Ok(())
}
pub fn send_request(&mut self, index: u32, begin: u32, length: u32) -> Result<()> {
let mut payload: Vec<u8> = vec![];
payload.write_u32::<BigEndian>(index)?;
payload.write_u32::<BigEndian>(begin)?;
payload.write_u32::<BigEndian>(length)?;
let message: Message = Message::new_with_payload(MESSAGE_REQUEST, payload);
let message_encoded = message.serialize()?;
info!(
"Send MESSAGE_REQUEST for piece {:?} [{:?}:{:?}] to peer {:?}",
index,
begin,
begin + length,
self.peer.get_id()
);
if self.conn.write(&message_encoded).is_err() {
return Err(anyhow!("could not send MESSAGE_REQUEST to peer"));
}
Ok(())
}
pub fn read_piece(&mut self, message: Message, piece_work: &mut PieceWork) -> Result<()> {
info!("Receive MESSAGE_PIECE from peer {:?}", self.peer.get_id());
if message.get_id() != MESSAGE_PIECE || message.get_payload().len() < 8 {
return Err(anyhow!("received invalid MESSAGE_HAVE from peer"));
}
let payload: Vec<u8> = message.get_payload();
let mut payload_cursor = Cursor::new(&payload[0..4]);
let index = payload_cursor.read_u32::<BigEndian>()?;
if index != piece_work.index {
return Err(anyhow!("received invalid piece from peer"));
}
let mut payload_cursor = Cursor::new(&payload[4..8]);
let begin: u32 = payload_cursor.read_u32::<BigEndian>()?;
let block: Vec<u8> = payload[8..].to_vec();
let block_len: u32 = block.len() as u32;
if begin + block_len > piece_work.length as u32 {
return Err(anyhow!(
"received invalid byte offset within piece from peer"
));
}
info!(
"Download piece {:?} [{:?}:{:?}] from peer {:?}",
index,
begin,
begin + block_len,
self.peer.get_id()
);
for i in 0..block_len {
piece_work.data[begin as usize + i as usize] = block[i as usize];
}
piece_work.downloaded += block_len;
piece_work.requests -= 1;
Ok(())
}
}