use std::rc::Rc;
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6};
use std::io::{self, Cursor};
use std::cell::RefCell;
use std::net::IpAddr;
use futures::{self, Future};
use tokio_core::reactor::Handle;
use tokio_core::net::UdpSocket;
use net2::UdpBuilder;
use lru_cache::LruCache;
use config::{Config, ServerConfig, ServerAddr};
use relay::{BoxIoFuture, boxed_future};
use relay::loadbalancing::server::{LoadBalancer, RoundRobin};
use relay::dns_resolver::resolve;
use relay::socks5::{Address, UdpAssociateHeader};
use super::{MAXIMUM_ASSOCIATE_MAP_SIZE, MAXIMUM_UDP_PAYLOAD_SIZE};
use super::crypto_io::{encrypt_payload, decrypt_payload};
type AssociateMap = LruCache<Address, SocketAddr>;
type ServerCache = LruCache<SocketAddr, Rc<ServerConfig>>;
struct Client {
assoc: Rc<RefCell<AssociateMap>>,
server_picker: Rc<RefCell<RoundRobin>>,
servers: Rc<RefCell<ServerCache>>,
socket: UdpSocket,
handle: Handle,
}
impl Client {
fn resolve_server_addr(svr_cfg: Rc<ServerConfig>, handle: &Handle) -> BoxIoFuture<SocketAddr> {
match *svr_cfg.addr() {
ServerAddr::SocketAddr(ref addr) => boxed_future(futures::finished(*addr)),
ServerAddr::DomainName(ref dname, port) => {
let fut = resolve(dname, handle).map(move |sockaddr| {
match sockaddr {
IpAddr::V4(v4) => SocketAddr::V4(SocketAddrV4::new(v4, port)),
IpAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(v6, port, 0, 0)),
}
});
boxed_future(fut)
}
}
}
fn handle_s2c(self, svr_cfg: Rc<ServerConfig>, buf: Vec<u8>, n: usize) -> BoxIoFuture<Client> {
let Client { assoc, server_picker, servers, socket, handle } = self;
let fut = futures::lazy(move || {
let buf = &buf[..n];
trace!("Got packet from server {}, length {}",
svr_cfg.addr(),
buf.len());
decrypt_payload(svr_cfg.method(), svr_cfg.key(), buf)
})
.and_then(move |payload| {
Address::read_from(Cursor::new(payload))
.map_err(From::from)
.and_then(move |(r, addr)| {
let header_len = r.position() as usize;
let payload = r.into_inner();
let body = &payload[header_len..];
trace!("Got packet from {}, payload length {}", addr, body.len());
let buf = Cursor::new(Vec::new());
let mut reply_body =
UdpAssociateHeader::new(0, addr.clone()).write_to(buf).wait().unwrap().into_inner();
reply_body.extend_from_slice(body);
let cloned_assoc = assoc.clone();
let mut assoc = assoc.borrow_mut();
assoc.remove(&addr)
.ok_or_else(|| {
warn!("Got unassociated packet from server, addr: {:?}", addr);
io::Error::new(io::ErrorKind::Other, "unassociated packet")
})
.map(|client_addr| {
info!("UDP ASSOCIATE {} <- {}, payload length {} bytes",
client_addr,
addr,
body.len());
(client_addr, cloned_assoc, reply_body)
})
})
.and_then(|(client_addr, assoc, reply_body)| {
socket.send_dgram(reply_body, client_addr).map(move |(socket, _)| {
Client {
assoc: assoc,
servers: servers,
server_picker: server_picker,
socket: socket,
handle: handle,
}
})
})
});
boxed_future(fut)
}
fn handle_c2s(self, buf: Vec<u8>, n: usize, src: SocketAddr) -> BoxIoFuture<Client> {
let Client { assoc, server_picker, servers, socket, handle } = self;
let fut = futures::lazy(move || {
let reader = Cursor::new(buf[..n].to_vec());
let (reader, header) = try!(UdpAssociateHeader::read_from(reader).wait());
let header_length = reader.position() as usize;
Ok((reader.into_inner(), header, header_length))
})
.and_then(|(payload, header, header_len)| {
if header.frag != 0x00 {
warn!("Does not support UDP fragment, got header {:?}", header);
let err = io::Error::new(io::ErrorKind::Other, "Not supported UDP fragment");
Err(err)
} else {
Ok((payload, header, header_len))
}
})
.and_then(move |(payload, header, header_len)| {
let assoc_addr = header.address;
info!("UDP ASSOCIATE {} -> {}, payload length {} bytes",
src,
assoc_addr,
&payload[header_len..].len());
{
let mut assoc = assoc.borrow_mut();
assoc.insert(assoc_addr.clone(), src);
}
let svr_cfg = server_picker.borrow_mut().pick_server();
let buf = Cursor::new(Vec::with_capacity(payload.len()));
assoc_addr.write_to(buf)
.and_then(move |payload_buf| {
let mut payload_buf = payload_buf.into_inner();
payload_buf.extend_from_slice(&payload[header_len..]);
Ok(payload_buf)
})
.and_then(move |payload| -> io::Result<_> {
encrypt_payload(svr_cfg.method(), svr_cfg.key(), &payload).map(move |b| (svr_cfg, b))
})
.map_err(From::from)
.and_then(move |(svr_cfg, payload)| {
Client::resolve_server_addr(svr_cfg.clone(), &handle).and_then(move |addr| {
{
let mut svrs_ref = servers.borrow_mut();
svrs_ref.insert(addr, svr_cfg.clone());
}
socket.send_dgram(payload, addr).map(|(socket, body)| {
trace!("Sent body, size: {}", body.len());
Client {
assoc: assoc,
server_picker: server_picker,
servers: servers,
socket: socket,
handle: handle,
}
})
})
})
});
boxed_future(fut)
}
fn handle_once(self) -> BoxIoFuture<Client> {
let Client { assoc, server_picker, servers, socket, handle } = self;
let fut = socket.recv_dgram(vec![0u8; MAXIMUM_UDP_PAYLOAD_SIZE]).and_then(move |(socket, buf, n, src)| {
let c = Client {
assoc: assoc,
server_picker: server_picker,
servers: servers.clone(),
socket: socket,
handle: handle,
};
let mut servers = servers.borrow_mut();
match servers.get_mut(&src) {
Some(svr_cfg) => c.handle_s2c(svr_cfg.clone(), buf, n),
None => c.handle_c2s(buf, n, src),
}
});
boxed_future(fut)
}
}
fn handle_client(client: Client) -> BoxIoFuture<()> {
let fut = client.handle_once()
.and_then(handle_client);
boxed_future(fut)
}
fn listen(config: Rc<Config>, l: UdpSocket, handle: Handle) -> BoxIoFuture<()> {
let assoc = Rc::new(RefCell::new(AssociateMap::new(MAXIMUM_ASSOCIATE_MAP_SIZE)));
let server_picker = Rc::new(RefCell::new(RoundRobin::new(&*config)));
let servers: Rc<RefCell<ServerCache>> = Rc::new(RefCell::new(ServerCache::new(config.server.len())));
let c = Client {
assoc: assoc,
server_picker: server_picker,
servers: servers,
socket: l,
handle: handle,
};
handle_client(c)
}
pub fn run(config: Rc<Config>, handle: Handle) -> BoxIoFuture<()> {
let fut = futures::lazy(move || {
let l = {
let local_addr = config.local.as_ref().unwrap();
let udp_builder = match *local_addr {
SocketAddr::V4(..) => UdpBuilder::new_v4(),
SocketAddr::V6(..) => UdpBuilder::new_v6(),
}
.unwrap_or_else(|err| panic!("Failed to create socket, {}", err));
super::reuse_port(&udp_builder)
.and_then(|b| b.reuse_address(true))
.unwrap_or_else(|err| panic!("Failed to set reuse {}, {}", local_addr, err));
info!("ShadowSocks UDP Listening on {}", local_addr);
try!(udp_builder.bind(local_addr).and_then(|s| UdpSocket::from_socket(s, &handle)))
};
Ok((config, l, handle))
})
.and_then(move |(config, l, handle)| listen(config, l, handle));
boxed_future(fut)
}