#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", tokio_unstable, not(tokio_wasi)))]
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
use tokio::time::{self, Duration};
#[test]
fn num_workers() {
let rt = current_thread();
assert_eq!(1, rt.metrics().num_workers());
let rt = threaded();
assert_eq!(2, rt.metrics().num_workers());
}
#[test]
fn num_blocking_threads() {
let rt = current_thread();
assert_eq!(0, rt.metrics().num_blocking_threads());
let _ = rt.block_on(rt.spawn_blocking(move || {}));
assert_eq!(1, rt.metrics().num_blocking_threads());
}
#[test]
fn num_idle_blocking_threads() {
let rt = current_thread();
assert_eq!(0, rt.metrics().num_idle_blocking_threads());
let _ = rt.block_on(rt.spawn_blocking(move || {}));
rt.block_on(async {
time::sleep(Duration::from_millis(5)).await;
});
assert_eq!(1, rt.metrics().num_idle_blocking_threads());
}
#[test]
fn blocking_queue_depth() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(1)
.build()
.unwrap();
assert_eq!(0, rt.metrics().blocking_queue_depth());
let ready = Arc::new(Mutex::new(()));
let guard = ready.lock().unwrap();
let ready_cloned = ready.clone();
let wait_until_ready = move || {
let _unused = ready_cloned.lock().unwrap();
};
let h1 = rt.spawn_blocking(wait_until_ready.clone());
let h2 = rt.spawn_blocking(wait_until_ready);
assert!(rt.metrics().blocking_queue_depth() > 0);
drop(guard);
let _ = rt.block_on(h1);
let _ = rt.block_on(h2);
assert_eq!(0, rt.metrics().blocking_queue_depth());
}
#[test]
fn remote_schedule_count() {
use std::thread;
let rt = current_thread();
let handle = rt.handle().clone();
let task = thread::spawn(move || {
handle.spawn(async {
})
})
.join()
.unwrap();
rt.block_on(task).unwrap();
assert_eq!(1, rt.metrics().remote_schedule_count());
let rt = threaded();
let handle = rt.handle().clone();
let task = thread::spawn(move || {
handle.spawn(async {
})
})
.join()
.unwrap();
rt.block_on(task).unwrap();
assert_eq!(1, rt.metrics().remote_schedule_count());
}
#[test]
fn worker_park_count() {
let rt = current_thread();
let metrics = rt.metrics();
rt.block_on(async {
time::sleep(Duration::from_millis(1)).await;
});
drop(rt);
assert!(2 <= metrics.worker_park_count(0));
let rt = threaded();
let metrics = rt.metrics();
rt.block_on(async {
time::sleep(Duration::from_millis(1)).await;
});
drop(rt);
assert!(1 <= metrics.worker_park_count(0));
assert!(1 <= metrics.worker_park_count(1));
}
#[test]
fn worker_noop_count() {
let rt = current_thread();
let metrics = rt.metrics();
rt.block_on(async {
time::sleep(Duration::from_millis(1)).await;
});
drop(rt);
assert!(2 <= metrics.worker_noop_count(0));
let rt = threaded();
let metrics = rt.metrics();
rt.block_on(async {
time::sleep(Duration::from_millis(1)).await;
});
drop(rt);
assert!(1 <= metrics.worker_noop_count(0));
assert!(1 <= metrics.worker_noop_count(1));
}
#[test]
fn worker_steal_count() {
use std::sync::mpsc::channel;
let rt = threaded();
let metrics = rt.metrics();
rt.block_on(async {
let (tx, rx) = channel();
tokio::spawn(async move {
tokio::spawn(async move {
tx.send(()).unwrap();
});
tokio::spawn(async {});
rx.recv().unwrap();
})
.await
.unwrap();
});
drop(rt);
let n: u64 = (0..metrics.num_workers())
.map(|i| metrics.worker_steal_count(i))
.sum();
assert_eq!(1, n);
}
#[test]
fn worker_poll_count() {
const N: u64 = 5;
let rt = current_thread();
let metrics = rt.metrics();
rt.block_on(async {
for _ in 0..N {
tokio::spawn(async {}).await.unwrap();
}
});
drop(rt);
assert_eq!(N, metrics.worker_poll_count(0));
let rt = threaded();
let metrics = rt.metrics();
rt.block_on(async {
for _ in 0..N {
tokio::spawn(async {}).await.unwrap();
}
});
drop(rt);
let n = (0..metrics.num_workers())
.map(|i| metrics.worker_poll_count(i))
.sum();
assert_eq!(N, n);
}
#[test]
fn worker_total_busy_duration() {
const N: usize = 5;
let zero = Duration::from_millis(0);
let rt = current_thread();
let metrics = rt.metrics();
rt.block_on(async {
for _ in 0..N {
tokio::spawn(async {
tokio::task::yield_now().await;
})
.await
.unwrap();
}
});
drop(rt);
assert!(zero < metrics.worker_total_busy_duration(0));
let rt = threaded();
let metrics = rt.metrics();
rt.block_on(async {
for _ in 0..N {
tokio::spawn(async {
tokio::task::yield_now().await;
})
.await
.unwrap();
}
});
drop(rt);
for i in 0..metrics.num_workers() {
assert!(zero < metrics.worker_total_busy_duration(i));
}
}
#[test]
fn worker_local_schedule_count() {
let rt = current_thread();
let metrics = rt.metrics();
rt.block_on(async {
tokio::spawn(async {}).await.unwrap();
});
drop(rt);
assert_eq!(1, metrics.worker_local_schedule_count(0));
assert_eq!(0, metrics.remote_schedule_count());
let rt = threaded();
let metrics = rt.metrics();
rt.block_on(async {
tokio::spawn(async {
tokio::spawn(async {}).await.unwrap();
})
.await
.unwrap();
});
drop(rt);
let n: u64 = (0..metrics.num_workers())
.map(|i| metrics.worker_local_schedule_count(i))
.sum();
assert_eq!(2, n);
assert_eq!(1, metrics.remote_schedule_count());
}
#[test]
fn worker_overflow_count() {
let rt = threaded();
let metrics = rt.metrics();
rt.block_on(async {
tokio::spawn(async {
let (tx1, rx1) = std::sync::mpsc::channel();
let (tx2, rx2) = std::sync::mpsc::channel();
tokio::spawn(async move {
tx1.send(()).unwrap();
rx2.recv().unwrap();
});
tokio::spawn(async {});
rx1.recv().unwrap();
for _ in 0..300 {
tokio::spawn(async {});
}
tx2.send(()).unwrap();
})
.await
.unwrap();
});
drop(rt);
let n: u64 = (0..metrics.num_workers())
.map(|i| metrics.worker_overflow_count(i))
.sum();
assert_eq!(1, n);
}
#[test]
fn injection_queue_depth() {
use std::thread;
let rt = current_thread();
let handle = rt.handle().clone();
let metrics = rt.metrics();
thread::spawn(move || {
handle.spawn(async {});
})
.join()
.unwrap();
assert_eq!(1, metrics.injection_queue_depth());
let rt = threaded();
let handle = rt.handle().clone();
let metrics = rt.metrics();
let (tx1, rx1) = std::sync::mpsc::channel();
let (tx2, rx2) = std::sync::mpsc::channel();
rt.spawn(async move { rx1.recv().unwrap() });
rt.spawn(async move { rx2.recv().unwrap() });
thread::spawn(move || {
handle.spawn(async {});
})
.join()
.unwrap();
let n = metrics.injection_queue_depth();
assert!(1 <= n, "{}", n);
assert!(3 >= n, "{}", n);
tx1.send(()).unwrap();
tx2.send(()).unwrap();
}
#[test]
fn worker_local_queue_depth() {
const N: usize = 100;
let rt = current_thread();
let metrics = rt.metrics();
rt.block_on(async {
for _ in 0..N {
tokio::spawn(async {});
}
assert_eq!(N, metrics.worker_local_queue_depth(0));
});
let rt = threaded();
let metrics = rt.metrics();
rt.block_on(async move {
tokio::spawn(async move {
let (tx1, rx1) = std::sync::mpsc::channel();
let (tx2, rx2) = std::sync::mpsc::channel();
tokio::spawn(async move {
tx1.send(()).unwrap();
rx2.recv().unwrap();
});
tokio::spawn(async {});
rx1.recv().unwrap();
for _ in 0..100 {
tokio::spawn(async {});
}
let n: usize = (0..metrics.num_workers())
.map(|i| metrics.worker_local_queue_depth(i))
.sum();
assert_eq!(n, N);
tx2.send(()).unwrap();
})
.await
.unwrap();
});
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn io_driver_fd_count() {
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(metrics.io_driver_fd_registered_count(), 0);
let stream = tokio::net::TcpStream::connect("google.com:80");
let stream = rt.block_on(async move { stream.await.unwrap() });
assert_eq!(metrics.io_driver_fd_registered_count(), 1);
assert_eq!(metrics.io_driver_fd_deregistered_count(), 0);
drop(stream);
assert_eq!(metrics.io_driver_fd_deregistered_count(), 1);
assert_eq!(metrics.io_driver_fd_registered_count(), 1);
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn io_driver_ready_count() {
let rt = current_thread();
let metrics = rt.metrics();
let stream = tokio::net::TcpStream::connect("google.com:80");
let _stream = rt.block_on(async move { stream.await.unwrap() });
assert_eq!(metrics.io_driver_ready_count(), 1);
}
fn current_thread() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
}
fn threaded() -> Runtime {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.unwrap()
}