use core::{
future::Future,
net::SocketAddr,
sync::atomic::{AtomicBool, AtomicU64, Ordering},
time::Duration,
};
use std::{net::UdpSocket, sync::Arc, thread};
use anyhow::Error;
use super::Config;
use crate::{
shaper::Shaper,
stat::{PerCpuStat, SockWorkerStat, Stat, TxWorkerStat},
OneProduce, Produce,
};
type WorkerStat = PerCpuStat<TxWorkerStat, (), SockWorkerStat, ()>;
type EngineStat = Stat<TxWorkerStat, (), SockWorkerStat, ()>;
#[derive(Debug)]
pub struct Engine {
cfg: Config,
limits: Vec<Arc<AtomicU64>>,
stat: Arc<EngineStat>,
}
impl Engine {
pub fn new(cfg: Config) -> Self {
let num_threads = cfg.native.threads.get();
let mut limits = Vec::with_capacity(num_threads);
let mut stats = Vec::new();
for _ in 0..num_threads {
limits.push(Arc::new(AtomicU64::new(0)));
stats.push(Arc::new(WorkerStat::default()));
}
let stat = Arc::new(EngineStat::new(stats));
Self { cfg, limits, stat }
}
#[inline]
pub fn limits(&self) -> Vec<Arc<AtomicU64>> {
self.limits.clone()
}
#[inline]
pub fn stat(&self) -> Arc<EngineStat> {
self.stat.clone()
}
pub fn run<F>(self, _stop: F, is_running: Arc<AtomicBool>) -> Result<(), Error>
where
F: Future<Output = ()> + 'static,
{
let num_threads = self.cfg.native.threads.into();
let mut threads = Vec::with_capacity(num_threads);
let bind = self.cfg.native.bind_endpoints.clone();
let data = Arc::new(OneProduce::new(b"GET / HTTP/1.1\r\n\r\n".to_vec()));
for (idx, thread_limits) in self.limits.clone().into_iter().enumerate() {
let thread = {
let mut worker = Worker::new(
self.cfg.addr,
bind.clone(),
data.clone(),
self.cfg.native.requests_per_socket(),
thread_limits,
is_running.clone(),
self.stat.stats[idx].clone(),
);
thread::Builder::new()
.name("dwd:w".into())
.spawn(move || worker.run())?
};
threads.push(thread);
}
for thread in threads {
thread.join().expect("no self join");
}
Ok(())
}
}
#[derive(Debug)]
pub struct Worker<B, D> {
addr: SocketAddr,
bind: B,
data: D,
sock: Option<UdpSocket>,
requests_per_sock: u64,
requests_per_sock_done: u64,
is_running: Arc<AtomicBool>,
shaper: Shaper,
stat: Arc<WorkerStat>,
}
impl<B, D> Worker<B, D> {
pub fn new(
addr: SocketAddr,
bind: B,
data: D,
requests_per_sock: u64,
limit: Arc<AtomicU64>,
is_running: Arc<AtomicBool>,
stat: Arc<WorkerStat>,
) -> Self {
let shaper = Shaper::new(0, limit);
Self {
addr,
bind,
data,
sock: None,
requests_per_sock,
requests_per_sock_done: 0,
is_running,
shaper,
stat,
}
}
}
impl<B, D> Worker<B, D>
where
B: Produce<Item = SocketAddr>,
D: Produce<Item = Vec<u8>>,
{
pub fn run(&mut self) {
while self.is_running.load(Ordering::Relaxed) {
match self.shaper.tick() {
0 => Self::wait(),
n => {
for _ in 0..n {
self.execute();
}
self.shaper.consume(n);
}
}
}
}
#[inline(always)]
fn execute(&mut self) {
let sock = match self.curr_sock() {
Ok(sock) => sock,
Err(..) => {
self.stat.on_sock_err();
return;
}
};
let data = self.data.next();
match sock.send(data) {
Ok(..) => {
self.requests_per_sock_done += 1;
if self.requests_per_sock_done < self.requests_per_sock {
self.sock = Some(sock);
}
self.stat.on_requests(1);
self.stat.on_send(data.len() as u64);
}
Err(..) => {
self.stat.on_sock_err();
}
}
}
#[inline]
fn curr_sock(&mut self) -> Result<UdpSocket, Error> {
let sock = match self.sock.take() {
Some(sock) => sock,
None => {
let bind = self.bind.next();
let sock = UdpSocket::bind(bind)?;
sock.connect(self.addr)?;
self.requests_per_sock_done = 0;
self.stat.on_sock_created();
sock
}
};
Ok(sock)
}
#[inline]
fn wait() {
thread::sleep(Duration::from_micros(1));
}
}