pub mod decryptionthread;
pub mod receivethread;
pub mod sendthread;
use std::net::SocketAddr;
use std::net::UdpSocket;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
use crossbeam::channel::unbounded;
use crossbeam::channel::Receiver;
use crossbeam::channel::Sender;
use crate::acknowledgement::{AcknowledgementCheck, AcknowledgementList};
use crate::config::Config;
use crate::encryption::AetherCipher;
use crate::encryption::KEY_SIZE;
use crate::error::AetherError;
use crate::identity::Id;
use crate::identity::PublicId;
use crate::link::receivethread::ReceiveThread;
use crate::link::sendthread::SendThread;
use crate::packet::PType;
use crate::packet::Packet;
use crate::util::gen_nonce;
use crate::util::xor;
use self::decryptionthread::DecryptionThread;
pub fn needs_ack(packet: &Packet) -> bool {
match packet.flags.p_type {
PType::Data => true,
PType::KeyExchange => true,
PType::AckOnly => false,
_ => false,
}
}
#[derive(Debug)]
pub struct Link {
pub private_id: Id,
pub peer_id: PublicId,
cipher: Option<AetherCipher>,
ack_list: Arc<Mutex<AcknowledgementList>>,
ack_check: Arc<Mutex<AcknowledgementCheck>>,
socket: Arc<UdpSocket>,
peer_addr: SocketAddr,
primary_queue: (Sender<Packet>, Receiver<Packet>),
receive_queue: (Sender<Packet>, Receiver<Packet>),
output_queue: (Sender<Packet>, Receiver<Packet>),
thread_handles: Vec<JoinHandle<()>>,
send_seq: Arc<Mutex<u32>>,
recv_seq: Arc<Mutex<u32>>,
stop_flag: Arc<Mutex<bool>>,
batch_empty: Arc<Mutex<bool>>,
read_timeout: Option<Duration>,
config: Config,
}
impl Link {
pub fn new(
id: Id,
socket: UdpSocket,
peer_addr: SocketAddr,
peer_id: PublicId,
send_seq: u32,
recv_seq: u32,
config: Config,
) -> Result<Link, AetherError> {
let socket = Arc::new(socket);
if socket
.set_read_timeout(Some(Duration::from_secs(1)))
.is_err()
{
return Err(AetherError::SetReadTimeout);
}
let primary_queue = unbounded();
let receive_queue = unbounded();
let output_queue = unbounded();
let stop_flag = Arc::new(Mutex::new(false));
let batch_empty = Arc::new(Mutex::new(false));
Ok(Link {
private_id: id,
ack_list: Arc::new(Mutex::new(AcknowledgementList::new(recv_seq))),
ack_check: Arc::new(Mutex::new(AcknowledgementCheck::new(send_seq))),
peer_addr,
peer_id,
cipher: None,
socket,
primary_queue,
receive_queue,
output_queue,
send_seq: Arc::new(Mutex::new(send_seq)),
recv_seq: Arc::new(Mutex::new(recv_seq)),
thread_handles: Vec::new(),
stop_flag,
batch_empty,
read_timeout: None,
config,
})
}
pub fn start(&mut self) {
let mut send_thread_data = SendThread::new(
self.socket.clone(),
self.peer_addr,
self.primary_queue.1.clone(),
self.stop_flag.clone(),
self.ack_check.clone(),
self.ack_list.clone(),
self.send_seq.clone(),
self.batch_empty.clone(),
self.config,
);
let send_thread = thread::spawn(move || {
send_thread_data.start();
});
let mut recv_thread_data = ReceiveThread::new(
self.socket.clone(),
self.peer_addr,
self.receive_queue.0.clone(),
self.stop_flag.clone(),
self.ack_check.clone(),
self.ack_list.clone(),
self.recv_seq.clone(),
self.config,
);
let recv_thread = thread::spawn(move || {
recv_thread_data.start();
});
self.thread_handles.push(send_thread);
self.thread_handles.push(recv_thread);
}
pub fn enable_encryption(&mut self) -> Result<(), AetherError> {
let own_secret = gen_nonce(KEY_SIZE);
let encrypted_secret = self.peer_id.public_encrypt(&own_secret)?;
let mut packet = Packet::new(PType::KeyExchange, 0);
packet.append_payload(encrypted_secret);
self.send_packet(packet)?;
let other_encrypted = self.recv()?;
let other_secret = self.private_id.private_decrypt(&other_encrypted)?;
let shared_secret = xor(own_secret, other_secret);
let cipher = AetherCipher::new(shared_secret);
let decryption_thread_data = DecryptionThread::new(
cipher.clone(),
self.receive_queue.1.clone(),
self.output_queue.0.clone(),
self.stop_flag.clone(),
self.config,
);
let decryption_thread = thread::spawn(move || {
decryption_thread_data.start().unwrap();
});
self.thread_handles.push(decryption_thread);
self.cipher = Some(cipher);
Ok(())
}
pub fn is_encrypted(&self) -> bool {
self.cipher.is_some()
}
pub fn stop(&mut self) -> Result<(), AetherError> {
match self.stop_flag.lock() {
Ok(mut flag_lock) => {
*flag_lock = true;
drop(flag_lock);
while match self.thread_handles.pop() {
Some(handle) => {
handle.join().expect("Thread failed to join");
true
}
None => false,
} {}
Ok(())
}
Err(_) => Err(AetherError::MutexLock("stop flag")),
}
}
pub fn get_addr(&self) -> SocketAddr {
self.peer_addr
}
pub fn send(&self, buf: Vec<u8>) -> Result<(), AetherError> {
let mut packet = Packet::new(PType::Data, 0);
let data: Vec<u8> = match self.cipher {
Some(ref cipher) => {
packet.set_enc(true);
cipher.encrypt_bytes(buf)?.into()
}
None => buf,
};
packet.append_payload(data);
self.send_packet(packet)
}
pub fn send_packet(&self, mut packet: Packet) -> Result<(), AetherError> {
match self.send_seq.lock() {
Ok(mut seq_lock) => {
(*seq_lock) += 1;
let seq: u32 = *seq_lock;
drop(seq_lock);
packet.sequence = seq;
self.primary_queue.0.send(packet)?;
Ok(())
}
Err(_) => Err(AetherError::MutexLock("send queue")),
}
}
pub fn set_read_timout(&mut self, timeout: Duration) {
self.read_timeout = Some(timeout);
}
pub fn recv_timeout(&self, timeout: Duration) -> Result<Vec<u8>, AetherError> {
let receiver = self.get_receiver()?;
let packet = receiver.recv_timeout(timeout)?;
Ok(packet.payload)
}
pub fn recv(&self) -> Result<Vec<u8>, AetherError> {
let receiver = self.get_receiver()?;
let packet = if let Some(time) = self.read_timeout {
receiver.recv_timeout(time)?
} else {
receiver.recv()?
};
Ok(packet.payload)
}
pub fn get_receiver(&self) -> Result<Receiver<Packet>, AetherError> {
match self.stop_flag.lock() {
Ok(flag_lock) => {
let stop = *flag_lock;
drop(flag_lock);
if stop {
Err(AetherError::LinkStopped("get receiver"))
} else {
if self.is_encrypted() {
Ok(self.output_queue.1.clone())
} else {
Ok(self.receive_queue.1.clone())
}
}
}
Err(_) => Err(AetherError::MutexLock("stop flag")),
}
}
pub fn is_empty(&self) -> Result<bool, AetherError> {
if self.primary_queue.0.is_empty() {
match self.batch_empty.lock() {
Ok(batch_lock) => Ok(*batch_lock),
Err(_) => Err(AetherError::MutexLock("batch empty flag")),
}
} else {
Ok(false)
}
}
pub fn wait_empty(&self) -> Result<(), AetherError> {
loop {
match self.is_empty() {
Ok(empty) => {
if empty {
thread::sleep(Duration::from_millis(self.config.link.ack_wait_time));
break Ok(());
} else {
thread::sleep(Duration::from_micros(self.config.link.poll_time_us));
}
}
Err(aether_error) => {
break Err(aether_error);
}
}
}
}
}
impl Drop for Link {
fn drop(&mut self) {
match self.stop() {
Ok(_) => {}
Err(aether_error) => {
log::error!("{}", aether_error)
}
}
}
}