#[macro_use]
extern crate lazy_static;
extern crate hyper_client_pool;
extern crate ipnet;
extern crate regex;
extern crate tracing_subscriber;
use futures::{channel::mpsc, prelude::*};
use std::net::IpAddr;
use std::process::Command;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::RwLock;
use std::time::{Duration, Instant};
use hyper::{Body, Request};
use hyper_client_pool::*;
use ipnet::{Contains, IpNet};
use regex::Regex;
lazy_static! {
static ref TEST_LOCK: RwLock<()> = RwLock::new(());
}
#[derive(Debug)]
struct MspcDeliverable(mpsc::UnboundedSender<DeliveryResult>);
impl Deliverable for MspcDeliverable {
fn complete(self, result: DeliveryResult) {
let _ = self.0.unbounded_send(result);
}
}
fn default_config() -> Config {
Config {
keep_alive_timeout: Duration::from_secs(3),
transaction_timeout: Duration::from_secs(2),
dns_threads_per_worker: 1,
max_transactions_per_worker: 1_000,
workers: 2,
}
}
fn onesignal_transaction<D: Deliverable>(deliverable: D) -> Transaction<D> {
Transaction::new(
deliverable,
Request::get("https://onesignal.com/")
.body(Body::empty())
.unwrap(),
false,
)
}
fn httpbin_transaction<D: Deliverable>(deliverable: D) -> Transaction<D> {
Transaction::new(
deliverable,
Request::get("http://httpbin:80/ip")
.body(Body::empty())
.unwrap(),
false,
)
}
fn check_successful_result(result: DeliveryResult) -> (bool, DeliveryResult) {
let successful = match result {
DeliveryResult::Response { ref response, .. } => response.status().is_success(),
_ => false,
};
(successful, result)
}
fn assert_successful_result(result: DeliveryResult) {
let (successful, result) = check_successful_result(result);
assert_eq!(true, successful, "Not successful result: {:?}!", result);
}
#[tokio::test]
async fn some_gets_single_worker() {
let _read = TEST_LOCK.read().unwrap_or_else(|e| e.into_inner());
let _ = tracing_subscriber::fmt::try_init();
let mut config = default_config();
config.workers = 1;
let mut pool = Pool::builder(config).build().unwrap();
let (tx, mut rx) = mpsc::unbounded();
for _ in 0..5 {
pool.request(onesignal_transaction(MspcDeliverable(tx.clone())))
.expect("request ok");
}
for _ in 0..5 {
assert_successful_result(rx.next().await.unwrap());
}
pool.shutdown().await;
}
#[tokio::test]
async fn ton_of_gets() {
const REQUEST_AMOUNT: usize = 600;
let _read = TEST_LOCK.read().unwrap_or_else(|e| e.into_inner());
let _ = tracing_subscriber::fmt::try_init();
let mut config = default_config();
config.dns_threads_per_worker = 4;
config.workers = 4;
config.max_transactions_per_worker = 1_000;
config.transaction_timeout = Duration::from_secs(60);
let mut pool = Pool::builder(config).build().unwrap();
let (tx, mut rx) = mpsc::unbounded();
for _ in 0..REQUEST_AMOUNT {
pool.request(httpbin_transaction(MspcDeliverable(tx.clone())))
.expect("request ok");
}
let mut successes = 0i32;
let mut not_successes = 0i32;
for _ in 0..REQUEST_AMOUNT {
let (successful, _result) = check_successful_result(rx.next().await.unwrap());
if successful {
successes += 1;
} else {
not_successes += 1;
}
}
let expected_successes = ((REQUEST_AMOUNT as f32) * 0.9) as _;
assert!(
successes > expected_successes,
"expected at least {} successes, found {}",
expected_successes,
successes,
);
let expected_failures = ((REQUEST_AMOUNT as f32) * 0.1) as _;
assert!(
not_successes < expected_failures,
"expected at most {} failures, found {}",
expected_failures,
not_successes
);
pool.shutdown().await;
println!(
"Successes: {} | Not Successes: {}",
successes, not_successes
);
}
#[derive(Debug, Clone)]
struct SuccessfulCompletionCounter {
count: Arc<AtomicUsize>,
}
impl SuccessfulCompletionCounter {
fn new() -> SuccessfulCompletionCounter {
SuccessfulCompletionCounter {
count: Arc::new(AtomicUsize::new(0)),
}
}
fn count(&self) -> usize {
self.count.load(Ordering::Acquire)
}
}
impl Deliverable for SuccessfulCompletionCounter {
fn complete(self, result: DeliveryResult) {
assert_successful_result(result);
self.count.fetch_add(1, Ordering::AcqRel);
}
}
#[tokio::test]
async fn graceful_shutdown() {
let _read = TEST_LOCK.read().unwrap_or_else(|e| e.into_inner());
let _ = tracing_subscriber::fmt::try_init();
let txn = 20;
let counter = SuccessfulCompletionCounter::new();
let mut config = default_config();
config.workers = 2;
let mut pool = Pool::builder(config).build().unwrap();
for _ in 0..txn {
pool.request(onesignal_transaction(counter.clone()))
.expect("request ok");
}
pool.shutdown().await;
assert_eq!(counter.count(), txn);
}
#[tokio::test]
async fn full_error() {
let _read = TEST_LOCK.read().unwrap_or_else(|e| e.into_inner());
let _ = tracing_subscriber::fmt::try_init();
let mut config = default_config();
config.workers = 3;
config.max_transactions_per_worker = 1;
let mut pool = Pool::builder(config).build().unwrap();
let (tx, mut rx) = mpsc::unbounded();
for _ in 0..3 {
pool.request(onesignal_transaction(MspcDeliverable(tx.clone())))
.expect("request ok");
}
match pool.request(onesignal_transaction(MspcDeliverable(tx.clone()))) {
Err(err) => assert_eq!(err.kind, ErrorKind::PoolFull),
_ => panic!("Expected Error, got success request!"),
}
for _ in 0..3 {
assert_successful_result(rx.next().await.unwrap());
}
pool.shutdown().await;
}
static CLOUDFLARE_NETS: &[&str] = &[
"103.21.244.0/22",
"103.22.200.0/22",
"103.31.4.0/22",
"104.16.0.0/12",
"108.162.192.0/18",
"131.0.72.0/22",
"141.101.64.0/18",
"162.158.0.0/15",
"172.64.0.0/13",
"173.245.48.0/20",
"188.114.96.0/20",
"190.93.240.0/20",
"197.234.240.0/22",
"198.41.128.0/17",
"2400:cb00::/32",
"2405:8100::/32",
"2405:b500::/32",
"2606:4700::/32",
"2803:f800::/32",
"2c0f:f248::/32",
"2a06:98c0::/29",
];
lazy_static! {
static ref CLOUDFLARE_PARSED_NETS: Vec<IpNet> = {
CLOUDFLARE_NETS
.iter()
.map(|net| net.parse::<IpNet>())
.collect::<Result<Vec<IpNet>, _>>()
.unwrap()
};
static ref LSOF_PARSE_IP_REGEX: Regex = Regex::new(r"->\[?([^\]]*)\]?:https").unwrap();
}
fn matches_cloudflare_ip(input: &str) -> bool {
if let Some(captures) = LSOF_PARSE_IP_REGEX.captures(input) {
match captures[1].parse::<IpAddr>() {
Ok(addr) => CLOUDFLARE_PARSED_NETS.iter().any(|net| net.contains(&addr)),
Err(_err) => false,
}
} else {
false
}
}
#[test]
fn matches_cloudflare_ip_works_as_expected() {
let input1 = "hyper_cli 29606 deploy 9u IPv6 74567336 0t0 TCP onepush-test-darren:46286->[2400:cb00:2048:1::6810:cea5]:https (ESTABLISHED)";
assert_eq!(matches_cloudflare_ip(input1), true);
let input2 = "lib-f13ca 83600 darrentsung 12u IPv4 0x2aad2644e2239ff9 0t0 TCP 192.168.2.240:54285->104.16.207.165:https (ESTABLISHED)";
assert_eq!(matches_cloudflare_ip(input2), true);
}
fn onesignal_connection_count() -> (usize, String) {
let output = Command::new("lsof")
.args(&["-i"])
.output()
.expect("command works");
let stdout = String::from_utf8(output.stdout).unwrap();
let matching_lines = stdout
.split("\n")
.filter(|line| matches_cloudflare_ip(line))
.count();
(matching_lines, stdout)
}
macro_rules! assert_onesignal_connection_open_count_eq {
($expected_open_count:expr) => {
let (open_count, stdout) = onesignal_connection_count();
assert_eq!($expected_open_count, open_count, "Output:\n{}", stdout);
};
}
#[tokio::test]
async fn keep_alive_works_as_expected() {
let _write = TEST_LOCK.write().unwrap_or_else(|e| e.into_inner());
let count = onesignal_connection_count().0;
while count > 0 {
eprintln!("keep_alive_works_as_expected blocking - open connections to cloudflare: {} must be 0 before test can run", count);
tokio::time::sleep(Duration::from_secs(1)).await;
}
let _ = tracing_subscriber::fmt::try_init();
let mut config = default_config();
config.keep_alive_timeout = Duration::from_secs(3);
let mut pool = Pool::builder(config).build().unwrap();
let (tx, mut rx) = mpsc::unbounded();
pool.request(onesignal_transaction(MspcDeliverable(tx.clone())))
.expect("request ok");
assert_successful_result(rx.next().await.unwrap());
let start = Instant::now();
loop {
let seconds_elapsed = start.elapsed().as_secs();
if seconds_elapsed < 2 {
assert_onesignal_connection_open_count_eq!(1);
} else if seconds_elapsed > 3 {
assert_onesignal_connection_open_count_eq!(0);
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
pool.shutdown().await;
}
#[tokio::test]
async fn connection_reuse_works_as_expected() {
let _write = TEST_LOCK.write().unwrap_or_else(|e| e.into_inner());
let count = onesignal_connection_count().0;
while count > 0 {
eprintln!("connection_reuse_works_as_expected blocking - open connections to cloudflare: {} must be 0 before test can run", count);
tokio::time::sleep(Duration::from_secs(1)).await;
}
let _ = tracing_subscriber::fmt::try_init();
let mut config = default_config();
config.workers = 1;
config.keep_alive_timeout = Duration::from_secs(10);
let mut pool = Pool::builder(config).build().unwrap();
let (tx, mut rx) = mpsc::unbounded();
pool.request(onesignal_transaction(MspcDeliverable(tx.clone())))
.expect("request ok");
assert_successful_result(rx.next().await.unwrap());
assert_onesignal_connection_open_count_eq!(1);
tokio::time::sleep(Duration::from_secs(3)).await;
assert_onesignal_connection_open_count_eq!(1);
pool.request(onesignal_transaction(MspcDeliverable(tx.clone())))
.expect("request ok");
assert_successful_result(rx.next().await.unwrap());
assert_onesignal_connection_open_count_eq!(1);
pool.shutdown().await;
}
#[tokio::test]
async fn timeout_works_as_expected() {
let _read = TEST_LOCK.read().unwrap_or_else(|e| e.into_inner());
let _ = tracing_subscriber::fmt::try_init();
let mut config = default_config();
config.transaction_timeout = Duration::from_secs(2);
let mut pool = Pool::builder(config).build().unwrap();
let (tx, mut rx) = mpsc::unbounded();
pool.request(
Transaction::new(
MspcDeliverable(tx.clone()),
Request::get("https://httpstat.us/200?sleep=5000")
.body(Body::empty())
.unwrap(),
false,
),
)
.expect("request ok");
match rx.next().await.unwrap() {
DeliveryResult::Timeout { .. } => (), res => panic!("Expected timeout!, got: {:?}", res),
}
pool.shutdown().await;
}
#[tokio::test]
async fn transaction_counting_works() {
let _read = TEST_LOCK.read().unwrap_or_else(|e| e.into_inner());
let _ = tracing_subscriber::fmt::try_init();
let mut config = default_config();
config.workers = 3;
let transaction_counters = Arc::new(RwLock::new(Vec::new()));
let mut pool = Pool::builder(config)
.transaction_counters(transaction_counters.clone())
.build()
.unwrap();
let (tx, mut rx) = mpsc::unbounded();
for _ in 0..3 {
pool.request(onesignal_transaction(MspcDeliverable(tx.clone())))
.expect("request ok");
}
let mut saw_transactions = false;
let mut received = 0i32;
loop {
let counters = transaction_counters.try_read().unwrap();
for counter in counters.iter() {
assert_eq!(counter.is_valid(), true);
let transaction_count = counter.count();
if transaction_count == 1 {
saw_transactions = true;
}
assert!(transaction_count <= 1);
}
if let Ok(Some(recv)) = rx.try_next() {
assert_successful_result(recv);
received += 1;
if received == 3 {
break;
}
}
tokio::time::sleep(Duration::from_millis(250)).await;
}
assert!(saw_transactions);
pool.shutdown().await;
let counters = transaction_counters.try_read().unwrap();
for counter in counters.iter() {
assert_eq!(counter.is_valid(), false);
}
}