use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use bip_handshake::{DiscoveryInfo, InitiateMessage};
use bip_util::bt::{InfoHash};
use bip_util::trans::{TransactionIds, LocallyShuffledIds};
use futures::future::Either;
use futures::sink::Sink;
use umio::external::{Sender};
use announce::{AnnounceResponse, ClientState};
use client::dispatcher::DispatchMessage;
use client::error::ClientResult;
use scrape::ScrapeResponse;
mod dispatcher;
pub mod error;
const DEFAULT_CAPACITY: usize = 4096;
#[derive(Debug)]
pub enum ClientRequest {
Announce(InfoHash, ClientState),
Scrape(InfoHash),
}
#[derive(Debug)]
pub struct ClientMetadata {
token: ClientToken,
result: ClientResult<ClientResponse>,
}
impl ClientMetadata {
pub fn new(token: ClientToken, result: ClientResult<ClientResponse>) -> ClientMetadata {
ClientMetadata {
token: token,
result: result,
}
}
pub fn token(&self) -> ClientToken {
self.token
}
pub fn result(&self) -> &ClientResult<ClientResponse> {
&self.result
}
}
#[derive(Debug)]
pub enum ClientResponse {
Announce(AnnounceResponse<'static>),
Scrape(ScrapeResponse<'static>),
}
impl ClientResponse {
pub fn announce_response(&self) -> Option<&AnnounceResponse<'static>> {
match self {
&ClientResponse::Announce(ref res) => Some(res),
&ClientResponse::Scrape(_) => None,
}
}
pub fn scrape_response(&self) -> Option<&ScrapeResponse<'static>> {
match self {
&ClientResponse::Announce(_) => None,
&ClientResponse::Scrape(ref res) => Some(res),
}
}
}
pub struct TrackerClient {
send: Sender<DispatchMessage>,
limiter: RequestLimiter,
generator: TokenGenerator,
}
impl TrackerClient {
pub fn new<H>(bind: SocketAddr, handshaker: H) -> io::Result<TrackerClient>
where H: Sink + DiscoveryInfo + Send + 'static,
H::SinkItem: From<Either<InitiateMessage, ClientMetadata>>
{
TrackerClient::with_capacity(bind, handshaker, DEFAULT_CAPACITY)
}
pub fn with_capacity<H>(bind: SocketAddr,
handshaker: H,
capacity: usize)
-> io::Result<TrackerClient>
where H: Sink + DiscoveryInfo + Send + 'static,
H::SinkItem: From<Either<InitiateMessage, ClientMetadata>>
{
let (chan_capacity, would_overflow) = capacity.overflowing_add(1);
if would_overflow {
panic!("bip_utracker: Tracker Client Capacity Must Be Less Than Max Size");
}
let limiter = RequestLimiter::new(capacity);
dispatcher::create_dispatcher(bind, handshaker, chan_capacity, limiter.clone())
.map(|chan| {
TrackerClient {
send: chan,
limiter: limiter,
generator: TokenGenerator::new(),
}
})
}
pub fn request(&mut self, addr: SocketAddr, request: ClientRequest) -> Option<ClientToken> {
if self.limiter.can_initiate() {
let token = self.generator.generate();
self.send
.send(DispatchMessage::Request(addr, token, request))
.expect("bip_utracker: Failed To Send Client Request Message...");
Some(token)
} else {
None
}
}
}
impl Drop for TrackerClient {
fn drop(&mut self) {
self.send
.send(DispatchMessage::Shutdown)
.expect("bip_utracker: Failed To Send Client Shutdown Message...");
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct ClientToken(u32);
struct TokenGenerator {
generator: LocallyShuffledIds<u32>
}
impl TokenGenerator {
pub fn new() -> TokenGenerator {
TokenGenerator{ generator: LocallyShuffledIds::<u32>::new() }
}
pub fn generate(&mut self) -> ClientToken {
ClientToken(self.generator.generate())
}
}
#[derive(Clone)]
pub struct RequestLimiter {
active: Arc<AtomicUsize>,
capacity: usize,
}
impl RequestLimiter {
pub fn new(capacity: usize) -> RequestLimiter {
RequestLimiter {
active: Arc::new(AtomicUsize::new(0)),
capacity: capacity,
}
}
pub fn acknowledge(&self) {
self.active.fetch_sub(1, Ordering::AcqRel);
}
pub fn can_initiate(&self) -> bool {
let current_active_requests = self.active.fetch_add(1, Ordering::AcqRel);
if current_active_requests < self.capacity {
true
} else {
self.acknowledge();
false
}
}
}