use crate::config::config::Config;
use super::{utils::{bind_udp_sockets, }, request::Request};
use crate::dns;
use log::*;
use crossbeam_channel::Sender as XBeamSender;
use srvzio;
use std::{net::{Ipv4Addr, Ipv6Addr, UdpSocket}, thread, time::Duration, io::ErrorKind};
const SERVER_SERVICE_NAME: &'static str = "Server";
#[derive(Debug)]
pub struct Server {
ip4s: Vec<Ipv4Addr>,
ip6s: Vec<Ipv6Addr>,
port: u16,
threads: Vec<thread::JoinHandle<()>>,
sender: XBeamSender<Request>,
status: srvzio::ServiceStatusFlag,
}
impl srvzio::Service for Server {
fn name(&self) -> &'static str {
SERVER_SERVICE_NAME
}
fn start(&mut self) {
self.status.starting();
let udp_sockets = bind_udp_sockets(&self.ip4s, &self.ip6s, &self.port);
let threads = self.start_udp_threads(udp_sockets);
self.threads.extend(threads);
}
fn await_started(&mut self) {
while !self.status.is_started() {}
}
fn stop(&mut self) {
trace!("Server should now stop...");
self.status.stopping();
}
fn await_stopped(&mut self) {
while let Some(t) = self.threads.pop() {
t.join()
.expect("A Server's thread panicked upon termination");
}
self.status.stopped();
}
}
impl Server {
pub fn new(config: &Config, sender: XBeamSender<Request>) -> Server {
Server {
ip4s: config.ipv4(),
ip6s: config.ipv6(),
port: config.port(),
threads: Vec::with_capacity(config.ipv4().len() + config.ipv6().len()),
sender,
status: srvzio::ServiceStatusFlag::default(),
}
}
fn start_udp_threads(&mut self, udp_sockets: Vec<UdpSocket>) -> Vec<thread::JoinHandle<()>> {
udp_sockets.iter().enumerate().map(|(idx, udp_sock)| {
let thread_udp_sock = udp_sock.try_clone().unwrap();
let thread_udp_sender = self.sender.clone();
let status = self.status.clone();
thread::Builder::new().name(format!("udp_socket_thread_{}", idx)).spawn(move || {
let mut buf: [u8; 512] = [0; 512];
thread_udp_sock
.set_read_timeout(Some(Duration::from_secs(10))) .expect("Unable to set read timeout on UDP socket");
status.started();
trace!("Waiting for UDP datagram...");
loop {
match thread_udp_sock.recv_from(&mut buf) {
Ok((amount, src)) => {
debug!("Received {} bytes via UDP datagram from '{}'", amount, src);
match dns::protocol::dns_message_from_bytes(&buf) {
Ok(dns_message) => {
if dns_message.message_type() == dns::protocol::DnsMessageType::Query {
let u_sock = thread_udp_sock.try_clone().unwrap();
let dns_request = Request::from_udp(src, dns_message, u_sock);
thread_udp_sender.send(dns_request).expect("Unable to pass on DNS Request for processing");
} else {
warn!("Received unexpected DNS message of type {:?}: ignoring", dns_message.message_type());
}
},
Err(e) => error!("Unable to parse DNS message: {}", e)
};
}
Err(e) => match e.kind() {
ErrorKind::WouldBlock | ErrorKind::TimedOut => {
if status.is_stopping() {
trace!("Server is done running: stop listening for requests");
break;
}
},
_ => {
error!("Error receiving: {:?} {}", e.kind(), e);
}
}
}
}
}).expect("Unable to spawn thread for UDP bound socket")
}).collect()
}
}