#[cfg(feature = "local-dns-relay")]
use std::net::IpAddr;
#[cfg(feature = "local-dns-relay")]
use std::time::Duration;
use std::{
io,
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use bloomfilter::Bloom;
use log::{log_enabled, warn};
#[cfg(feature = "local-dns-relay")]
use lru_time_cache::LruCache;
use spin::Mutex as SpinMutex;
#[cfg(feature = "local-dns-relay")]
use tokio::sync::Mutex as AsyncMutex;
#[cfg(feature = "trust-dns")]
use trust_dns_resolver::TokioAsyncResolver;
#[cfg(any(feature = "sodium", feature = "rc4"))]
use crate::crypto::CipherType;
#[cfg(feature = "trust-dns")]
use crate::relay::dns_resolver::create_resolver;
#[cfg(not(feature = "local-dns-relay"))]
use crate::relay::dns_resolver::resolve;
#[cfg(feature = "local-dns-relay")]
use crate::relay::dnsrelay::upstream::LocalUpstream;
#[cfg(feature = "local-flow-stat")]
use crate::relay::flow::ServerFlowStatistic;
use crate::{
acl::AccessControl,
config::{Config, ConfigType, ServerConfig},
relay::socks5::Address,
};
const BF_NUM_ENTRIES_FOR_SERVER: usize = 1_000_000;
const BF_NUM_ENTRIES_FOR_CLIENT: usize = 10_000;
const BF_ERROR_RATE_FOR_SERVER: f64 = 1e-6;
const BF_ERROR_RATE_FOR_CLIENT: f64 = 1e-15;
struct PingPongBloom {
blooms: [Bloom<[u8]>; 2],
bloom_count: [usize; 2],
item_count: usize,
current: usize,
}
impl PingPongBloom {
fn new(ty: ConfigType) -> PingPongBloom {
let (mut item_count, fp_p) = if ty.is_local() {
(BF_NUM_ENTRIES_FOR_CLIENT, BF_ERROR_RATE_FOR_CLIENT)
} else {
(BF_NUM_ENTRIES_FOR_SERVER, BF_ERROR_RATE_FOR_SERVER)
};
item_count /= 2;
PingPongBloom {
blooms: [
Bloom::new_for_fp_rate(item_count, fp_p),
Bloom::new_for_fp_rate(item_count, fp_p),
],
bloom_count: [0, 0],
item_count,
current: 0,
}
}
fn check_and_set(&mut self, buf: &[u8]) -> bool {
for bloom in &self.blooms {
if bloom.check(buf) {
return true;
}
}
if self.bloom_count[self.current] >= self.item_count {
self.current = (self.current + 1) % 2;
self.bloom_count[self.current] = 0;
self.blooms[self.current].clear();
}
self.blooms[self.current].set(buf);
self.bloom_count[self.current] += 1;
false
}
}
pub struct ServerState {
#[cfg(feature = "trust-dns")]
dns_resolver: Option<TokioAsyncResolver>,
}
#[cfg(feature = "trust-dns")]
impl ServerState {
pub async fn new_shared(config: &Config) -> SharedServerState {
let state = ServerState {
dns_resolver: match create_resolver(config.get_dns_config(), config.ipv6_first).await {
Ok(resolver) => Some(resolver),
Err(..) => None,
},
};
Arc::new(state)
}
pub fn dns_resolver(&self) -> Option<&TokioAsyncResolver> {
self.dns_resolver.as_ref()
}
}
#[cfg(not(feature = "trust-dns"))]
impl ServerState {
pub async fn new_shared(_config: &Config) -> SharedServerState {
Arc::new(ServerState {})
}
}
pub type SharedServerState = Arc<ServerState>;
pub struct Context {
config: Config,
server_state: SharedServerState,
server_running: AtomicBool,
nonce_ppbloom: SpinMutex<PingPongBloom>,
#[cfg(feature = "local-flow-stat")]
local_flow_statistic: ServerFlowStatistic,
#[cfg(feature = "local-dns-relay")]
reverse_lookup_cache: AsyncMutex<LruCache<IpAddr, bool>>,
#[cfg(feature = "local-dns-relay")]
local_dns: LocalUpstream,
}
pub type SharedContext = Arc<Context>;
impl Context {
async fn new(config: Config) -> Context {
let state = ServerState::new_shared(&config).await;
Context::new_with_state(config, state)
}
fn new_with_state(config: Config, server_state: SharedServerState) -> Context {
for server in &config.server {
let t = server.method();
let deprecated = match t {
#[cfg(feature = "sodium")]
CipherType::ChaCha20 | CipherType::Salsa20 => true,
#[cfg(feature = "rc4")]
CipherType::Rc4Md5 => true,
_ => false,
};
if deprecated {
warn!(
"stream cipher {} (for server {}) have inherent weaknesses \
(see discussion at https://github.com/shadowsocks/shadowsocks-org/issues/36). \
DO NOT USE. It will be removed in the future.",
t,
server.addr(),
);
}
}
let nonce_ppbloom = SpinMutex::new(PingPongBloom::new(config.config_type));
#[cfg(feature = "local-dns-relay")]
let local_dns = LocalUpstream::new(&config);
Context {
config,
server_state,
server_running: AtomicBool::new(true),
nonce_ppbloom,
#[cfg(feature = "local-flow-stat")]
local_flow_statistic: ServerFlowStatistic::new(),
#[cfg(feature = "local-dns-relay")]
reverse_lookup_cache: AsyncMutex::new(LruCache::with_expiry_duration(Duration::from_secs(
3 * 24 * 60 * 60,
))),
#[cfg(feature = "local-dns-relay")]
local_dns,
}
}
pub async fn new_shared(config: Config) -> SharedContext {
SharedContext::new(Context::new(config).await)
}
pub fn new_with_state_shared(config: Config, server_state: SharedServerState) -> SharedContext {
SharedContext::new(Context::new_with_state(config, server_state))
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn server_state(&self) -> &SharedServerState {
&self.server_state
}
pub fn config_mut(&mut self) -> &mut Config {
&mut self.config
}
pub fn server_config(&self, idx: usize) -> &ServerConfig {
&self.config.server[idx]
}
pub fn server_config_mut(&mut self, idx: usize) -> &mut ServerConfig {
&mut self.config.server[idx]
}
#[cfg(feature = "trust-dns")]
pub fn dns_resolver(&self) -> Option<&TokioAsyncResolver> {
self.server_state.dns_resolver()
}
pub async fn dns_resolve(&self, host: &str, port: u16) -> io::Result<Vec<SocketAddr>> {
if log_enabled!(log::Level::Debug) {
use log::debug;
use std::time::Instant;
let start = Instant::now();
let result = self.dns_resolve_impl(host, port).await;
let elapsed = Instant::now() - start;
debug!(
"DNS resolved {}:{} elapsed: {}.{:03}s, {:?}",
host,
port,
elapsed.as_secs(),
elapsed.subsec_millis(),
result
);
result
} else {
self.dns_resolve_impl(host, port).await
}
}
#[cfg(feature = "local-dns-relay")]
#[inline(always)]
async fn dns_resolve_impl(&self, host: &str, port: u16) -> io::Result<Vec<SocketAddr>> {
self.local_dns().lookup_ip(host, port).await
}
#[cfg(not(feature = "local-dns-relay"))]
#[inline(always)]
async fn dns_resolve_impl(&self, host: &str, port: u16) -> io::Result<Vec<SocketAddr>> {
resolve(self, host, port).await
}
pub fn server_running(&self) -> bool {
self.server_running.load(Ordering::Acquire)
}
pub fn set_server_stopped(&self) {
self.server_running.store(false, Ordering::Release)
}
pub fn check_nonce_and_set(&self, nonce: &[u8]) -> bool {
if nonce.is_empty() {
return false;
}
let mut ppbloom = self.nonce_ppbloom.lock();
ppbloom.check_and_set(nonce)
}
pub async fn check_client_blocked(&self, addr: &SocketAddr) -> bool {
match self.acl() {
None => false,
Some(a) => a.check_client_blocked(addr),
}
}
pub async fn check_outbound_blocked(&self, addr: &Address) -> bool {
match self.acl() {
None => false,
Some(a) => a.check_outbound_blocked(self, addr).await,
}
}
#[cfg(feature = "local-dns-relay")]
pub async fn add_to_reverse_lookup_cache(&self, addr: &IpAddr, forward: bool) {
let is_exception = forward
!= match self.acl() {
None => true,
Some(a) => a.check_ip_in_proxy_list(addr),
};
let mut reverse_lookup_cache = self.reverse_lookup_cache.lock().await;
match reverse_lookup_cache.get_mut(addr) {
Some(value) => {
if is_exception {
*value = forward;
} else {
reverse_lookup_cache.remove(addr);
}
}
None => {
if is_exception {
reverse_lookup_cache.insert(addr.clone(), forward);
}
}
}
}
pub fn acl(&self) -> Option<&AccessControl> {
self.config.acl.as_ref()
}
#[cfg(feature = "local-dns-relay")]
pub fn local_dns(&self) -> &LocalUpstream {
&self.local_dns
}
pub async fn check_target_bypassed(&self, target: &Address) -> bool {
match self.acl() {
None => false,
Some(a) => {
#[cfg(feature = "local-dns-relay")]
{
if let Address::SocketAddress(ref saddr) = target {
let mut reverse_lookup_cache = self.reverse_lookup_cache.lock().await;
if let Some(forward) = reverse_lookup_cache.get(&saddr.ip()) {
return !*forward;
}
}
}
self.check_target_bypassed_with_acl(a, target).await
}
}
}
#[inline(always)]
async fn check_target_bypassed_with_acl(&self, a: &AccessControl, target: &Address) -> bool {
a.check_target_bypassed(self, target).await
}
#[cfg(feature = "local-flow-stat")]
pub fn local_flow_statistic(&self) -> &ServerFlowStatistic {
&self.local_flow_statistic
}
}