use std::{rand};
use std::num::{Int};
use std::thread::{Thread};
use std::time::duration::{Duration};
use std::sync::mpsc::{self};
use std::old_io::net::ip::{SocketAddr, Ipv4Addr, IpAddr, ToSocketAddr, Port};
use std::old_io::{IoResult, IoError, BufWriter, BufReader, ConnectionFailed, OtherIoError};
use self::connect::{ConnectID};
use self::stream::{UdpStream};
use util;
use types::{PeerID, InfoHash, Timepoint};
use tracker::{AnnounceInfo, ScrapeInfo, Tracker, TransferStatus};
mod connect;
mod stream;
pub type EventID = i32;
pub type TransID = i32;
const CONNECT_ACTION: i32 = 0;
const ANNOUNCE_ACTION: i32 = 1;
const SCRAPE_ACTION: i32 = 2;
const ERROR_ACTION: i32 = 3;
const NONE_ID: i32 = 0;
const COMPLETED_ID: i32 = 1;
const STARTED_ID: i32 = 2;
const STOPPED_ID: i32 = 3;
const TRACKER_BASE_PORT: Port = 6969;
const TRACKER_PORT_RANGE: u32 = 10;
const BYTES_PER_PEER: usize = 4 + 2;
const ANNOUNCE_REQUEST_LEN: usize = 100;
const ANNOUNCE_MIN_RESPONSE_LEN: usize = 20;
const ANNOUNCE_VAR_RESPONSE_LEN: usize = BYTES_PER_PEER * 50; const SCRAPE_REQUEST_LEN: usize = 36;
const SCRAPE_RESPONSE_LEN: usize = 20;
const ERROR_VAR_RESPONSE_LEN: usize = 100;
pub const TRACKER_MAX_ATTEMPTS: u32 = 8;
const ASYNC_TIMEOUT_MILLI: u64 = 1;
pub struct UdpTracker {
conn: UdpStream,
conn_id: ConnectID,
info_hash: InfoHash,
local_peer: PeerID,
port_forward: Port
}
impl UdpTracker {
pub fn connect<T: ToSocketAddr>(dst: T, info: InfoHash, peer: PeerID, forward: Port)
-> IoResult<UdpTracker> {
let remote_addr = try!(dst.to_socket_addr());
let (stream, id) = try!(find_local_interface(remote_addr, TRACKER_BASE_PORT,
TRACKER_PORT_RANGE
));
Ok(UdpTracker{ conn: stream, conn_id: id, info_hash: info,
local_peer: peer, port_forward: forward }
)
}
fn check_connect_id(&mut self) -> IoResult<i64> {
match self.conn_id.connect_id() {
Some(id) => Ok(id),
None => {
self.conn_id = try!(ConnectID::request(&mut self.conn));
self.conn_id.connect_id().ok_or(
util::simple_ioerror(OtherIoError, "Connection ID Expired Too Fast")
)
}
}
}
fn scrape(&mut self) -> IoResult<ScrapeInfo> {
let mut send_buffer = [0u8; SCRAPE_REQUEST_LEN];
let
let sent_id = try!(self.write_scrape(&mut send_buffer));
try!(make_request(&mut self.conn, &send_buffer[], sent_id, SCRAPE_ACTION));
recv_scrape(&mut self.conn)
}
fn write_scrape(&mut self, buf: &mut [u8]) -> IoResult<TransID> {
let connect_id = try!(self.check_connect_id());
let trans_id = rand::random::<TransID>();
let mut buf_writer = BufWriter::new(buf);
try!(buf_writer.write_be_i64(connect_id));
try!(buf_writer.write_be_i32(SCRAPE_ACTION));
try!(buf_writer.write_be_i32(trans_id));
try!(buf_writer.write_all(self.info_hash.as_slice()));
Ok(trans_id)
}
fn announce(&mut self, status: TransferStatus, event: EventID) -> IoResult<AnnounceInfo> {
let mut send_buffer = [0u8; ANNOUNCE_REQUEST_LEN];
let sent_id = try!(self.write_announce(&mut send_buffer, status, event));
try!(make_request(&mut self.conn, &send_buffer[], sent_id, ANNOUNCE_ACTION));
recv_announce(&mut self.conn)
}
fn write_announce(&mut self, buf: &mut [u8], status: TransferStatus, event: EventID) -> IoResult<TransID> {
let connect_id = try!(self.check_connect_id());
let trans_id = rand::random::<TransID>();
let key = rand::random::<u32>();
let mut buf_writer = BufWriter::new(buf);
try!(buf_writer.write_be_i64(connect_id)); try!(buf_writer.write_be_i32(ANNOUNCE_ACTION)); try!(buf_writer.write_be_i32(trans_id)); try!(buf_writer.write_all(self.info_hash.as_slice())); try!(buf_writer.write_str(self.local_peer.as_slice())); try!(buf_writer.write_be_i64(status.downloaded)); try!(buf_writer.write_be_i64(status.remaining)); try!(buf_writer.write_be_i64(status.uploaded)); try!(buf_writer.write_be_i32(event)); try!(buf_writer.write_be_u32(0)); try!(buf_writer.write_be_u32(key)); try!(buf_writer.write_be_i32(-1)); try!(buf_writer.write_be_u16(self.port_forward)); try!(buf_writer.write_be_u16(0));
Ok(trans_id)
}
}
fn make_request(stream: &mut UdpStream, msg: &[u8], sent_id: TransID, sent_action: i32) -> IoResult<()> {
let mut verify_buffer = [0u8; VERIFY_RESPONSE_LEN];
let bytes_read = try!(stream.request(msg, &mut verify_buffer, TRACKER_MAX_ATTEMPTS, udp_tracker_wait));
if bytes_read != verify_buffer.len() {
return Err(util::simple_ioerror(OtherIoError, "UDP Tracker Sent An Incomplete Response"))
}
let mut buf_reader = BufReader::new(&verify_buffer[]);
let recv_action = try!(buf_reader.read_be_i32());
let recv_trans_id = try!(buf_reader.read_be_i32());
if recv_trans_id != sent_id {
return Err(util::simple_ioerror(OtherIoError, "Tracker Responded With A Different Transaction ID"))
} else if recv_action == ERROR_ACTION {
return Err(recv_error_message(stream))
} else if recv_action != sent_action {
return Err(wrong_event_received(sent_action, recv_action))
}
Ok(())
}
fn recv_scrape(stream: &mut UdpStream) -> IoResult<ScrapeInfo> {
let mut recv_buffer = [0u8; SCRAPE_PAYLOAD_LEN];
let bytes_read = try!(stream.recv(&mut recv_buffer[], Some(ASYNC_TIMEOUT_MILLI)));
if bytes_read != recv_buffer.len() {
return Err(util::simple_ioerror(OtherIoError, "UDP Tracker Sent An Incomplete Scrape Response"))
}
let mut buf_reader = BufReader::new(&recv_buffer[]);
let seeders = try!(buf_reader.read_be_i32());
let downloads = try!(buf_reader.read_be_i32());
let leechers = try!(buf_reader.read_be_i32());
Ok(ScrapeInfo{ leechers: leechers, seeders: seeders, downloads: downloads })
}
fn recv_announce(stream: &mut UdpStream) -> IoResult<AnnounceInfo> {
let mut recv_buffer = [0u8; ANNOUNCE_MIN_PAYLOAD_LEN];
let bytes_read = try!(stream.recv(&mut recv_buffer[], Some(ASYNC_TIMEOUT_MILLI)));
if bytes_read != recv_buffer.len() {
return Err(util::simple_ioerror(OtherIoError, "UDP Tracker Sent An Incomplete Announce Response"))
}
let mut buf_reader = BufReader::new(&recv_buffer[]);
let interval = try!(buf_reader.read_be_i32());
let interval_tp = try!(Timepoint::new(Duration::seconds(interval as i64)));
let leechers = try!(buf_reader.read_be_i32());
let seeders = try!(buf_reader.read_be_i32());
let peers = try!(recv_peers(stream));
Ok(AnnounceInfo{ leechers: leechers, seeders: seeders, peers: peers, interval: interval_tp })
}
fn recv_peers(stream: &mut UdpStream) -> IoResult<Vec<SocketAddr>> {
let mut peers_buffer = [0u8; ANNOUNCE_VAR_PAYLOAD_LEN];
let mut peer_list = Vec::new();
while let Ok(bytes_read) = stream.recv(&mut peers_buffer, Some(ASYNC_TIMEOUT_MILLI)) {
let mut buf_reader = BufReader::new(&peers_buffer[]);
for _ in (0..bytes_read / BYTES_PER_PEER) {
let ip = Ipv4Addr(try!(buf_reader.read_u8()),
try!(buf_reader.read_u8()),
try!(buf_reader.read_u8()),
try!(buf_reader.read_u8())
);
let port = try!(buf_reader.read_be_u16());
peer_list.push(try!((ip, port).to_socket_addr()));
}
}
Ok(peer_list)
}
fn recv_error_message(stream: &mut UdpStream) -> IoError {
let mut msg_buf = Vec::with_capacity(ERROR_VAR_RESPONSE_LEN);
let mut pos = 0;
while let Ok(bytes_read) = stream.recv(&mut msg_buf[pos..], Some(ASYNC_TIMEOUT_MILLI)) {
if bytes_read == msg_buf.len() {
let new_len = msg_buf.len() * 2;
msg_buf.reserve(new_len);
}
pos += bytes_read;
}
match (String::from_utf8(msg_buf), pos != 0) {
(Ok(msg), true) => IoError{ kind: OtherIoError,
desc: "Received Error Response For Scrape Request (See detail)",
detail: Some(msg) },
_ => IoError{ kind: OtherIoError,
desc: "Received Error Response For Scrape Request (No detail)",
detail: None }
}
}
pub fn udp_tracker_wait(attempt: u64) -> u64 {
15 * 2.pow(attempt as usize) * 1000
}
fn wrong_event_received(expected: EventID, received: EventID) -> IoError {
match received {
CONNECT_ACTION => IoError{
kind: OtherIoError,
desc: "UDP Tracker Sent Unexpected Connect Response (Check Detail)",
detail: Some(format!("Expected EventID {} Received EventID {}", expected, received))
},
ANNOUNCE_ACTION => IoError{
kind: OtherIoError,
desc: "UDP Tracker Sent Unexpected Announce Response (Check Detail)",
detail: Some(format!("Expected EventID {} Received EventID {}", expected, received))
},
SCRAPE_ACTION => IoError{
kind: OtherIoError,
desc: "UDP Tracker Sent Unexpected Scrape Response (Check Detail)",
detail: Some(format!("Expected EventID {} Received EventID {}", expected, received))
},
_ => IoError{
kind: OtherIoError,
desc: "UDP Tracker Sent Unexpected Unknown Response (Check Detail)",
detail: Some(format!("Expected EventID {} Received EventID {}", expected, received))
}
}
}
fn find_local_interface(remote_addr: SocketAddr, local_port: Port, port_range: u32) -> IoResult<(UdpStream, ConnectID)> {
let local_ip_addrs = try!(util::find_net_addrs());
let (tx, rx) = mpsc::channel();
{ let tx = tx;
for ip in local_ip_addrs.into_iter() {
let tx = tx.clone();
Thread::spawn(move || {
let udp_socket = util::open_udp(SocketAddr{ ip: ip, port: local_port }, port_range).unwrap();
let mut udp_stream = UdpStream::new(udp_socket, remote_addr).unwrap();
match ConnectID::request(&mut udp_stream) {
Ok(connect_id) => return tx.send((udp_stream, connect_id)).unwrap(),
Err(e) => println!("{:?}", e)
};
});
}
}
let (udp_stream, connect_id) = try!(rx.recv().map_err( |_|
util::simple_ioerror(ConnectionFailed, "Could Not Communicate On Any IPv4 Interfaces")
));
Ok((udp_stream, connect_id))
}
impl Tracker for UdpTracker {
fn local_ip(&mut self) -> IpAddr {
self.conn.local_sock().ip
}
fn send_scrape(&mut self) -> IoResult<ScrapeInfo> {
self.scrape()
}
fn start_announce(&mut self, remaining: i64) -> IoResult<AnnounceInfo> {
let status = TransferStatus{ downloaded: 0, remaining: remaining, uploaded: 0 };
self.announce(status, STARTED_ID)
}
fn update_announce(&mut self, status: TransferStatus) -> IoResult<AnnounceInfo> {
self.announce(status, NONE_ID)
}
fn stop_announce(&mut self, status: TransferStatus) -> IoResult<()> {
let _ = try!(self.announce(status, STOPPED_ID));
Ok(())
}
fn complete_announce(&mut self, status: TransferStatus) -> IoResult<AnnounceInfo> {
let status = try!(self.announce(status, COMPLETED_ID));
Ok(status)
}
}