mod sdp;
pub use sdp::*;
mod sdp_self;
pub use sdp_self::*;
use std::error::Error;
use std::net::SocketAddr;
use std::sync::Mutex;
use std::collections::{VecDeque, HashSet};
use std::task::Waker;
use std::pin::Pin;
use serde::{Serialize, Deserialize};
use bincode;
use rand::Rng;
use super::*;
use crate::crypto::history::*;
use crate::peer::*;
const RECEIVE_BUFFER_SIZE: usize = 1220;
const WINDOW_SIZE: usize = 20;
#[derive(Debug, Clone)]
enum SentStatus {
Awaiting,
Synchronizing
}
#[derive(Debug)]
pub struct Packet {
status: Mutex<SentStatus>,
header: Header,
chat_sync: ChatSynchronizer,
packet_sync: PacketSynchronizer,
payload: Vec<u8>
}
impl Packet {
pub(crate) fn new(
protocol_type: ProtocolType,
packet_type: PacketType,
src_id: &PeerId,
rec_id: &PeerId,
chat_sync: &ChatSynchronizer,
packet_sync: PacketSynchronizer,
payload: Vec<u8>
) -> Packet {
Packet {
status: Mutex::new(SentStatus::Awaiting),
header: Header {
protocol_type,
packet_type,
length: 100 + payload.len() as u16,
src_id: src_id.clone(),
rec_id: rec_id.clone()
},
chat_sync: chat_sync.clone(),
packet_sync: packet_sync.clone(),
payload: payload
}
}
pub(crate) fn update_id(self) -> Packet {
let mut rng = rand::thread_rng();
Packet {
status: self.status,
header: self.header,
chat_sync: self.chat_sync,
packet_sync: PacketSynchronizer {
timestamp: self.packet_sync.timestamp,
n_packets: self.packet_sync.n_packets,
packet_id: rng.gen_range(u64::MIN..(u64::MAX - self.packet_sync.n_packets))
},
payload: self.payload
}
}
pub(crate) fn serialize(&self) -> Vec<u8> {
let mut packet = self.header.serialize();
packet.extend(&self.chat_sync.serialize());
packet.extend(&self.packet_sync.serialize());
packet.extend(&self.payload);
return packet;
}
pub(crate) fn sync(&self) {
*self.status.lock().unwrap() = SentStatus::Synchronizing;
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub struct PacketSynchronizer {
timestamp: u64,
n_packets: u64,
packet_id: u64
}
impl PacketSynchronizer {
pub fn new(timestamp: u64, n_packets: u64, packet_id: u64) -> PacketSynchronizer {
PacketSynchronizer { timestamp, n_packets, packet_id }
}
pub fn serialize(&self) -> Vec<u8> {
bincode::serialize(&self).unwrap()
}
#[track_caller]
pub fn deserialize(bytes: Vec<u8>) -> PacketSynchronizer {
match bincode::deserialize(&bytes) {
Ok(pack_sync) => pack_sync,
Err(_) => panic!("Wrong size of `PacketSynchronizer`"),
}
}
}
#[derive(Debug)]
pub enum Transaction {
First {
first_packet: Packet,
rest_of_payload: VecDeque<Vec<u8>>
},
Rest {
first_packet_id: u64,
payload: Mutex<VecDeque<Packet>>
}
}
impl Transaction {
fn update_id_strategy(self) -> Result<Transaction, Box<dyn Error>> {
match self {
Transaction::Rest { .. } => {
Err("Updating `Transaction` id strategy is allowed only for `Transaction::First`".into())
},
Transaction::First {
first_packet,
rest_of_payload
} => {
Ok(Transaction::First {
first_packet: first_packet.update_id(),
rest_of_payload: rest_of_payload
})
}
}
}
fn construct_rest(self, packet_id: u64) -> Transaction {
match self {
Transaction::First {
first_packet,
rest_of_payload
} => {
let Header {
protocol_type,
packet_type,
length: _,
src_id,
rec_id
} = first_packet.header;
let chat_sync = first_packet.chat_sync;
let PacketSynchronizer{
timestamp,
n_packets,
packet_id: _
} = first_packet.packet_sync;
let payload = rest_of_payload.into_iter()
.zip((packet_id+1)..(packet_id+n_packets))
.map(
|(p, id)|
Packet::new(
protocol_type,
packet_type,
&src_id,
&rec_id,
&chat_sync,
PacketSynchronizer::new(
timestamp, n_packets, id
),
p.to_vec()
)
).collect();
Transaction::Rest {
first_packet_id: packet_id,
payload: Mutex::new(payload)
}
},
Transaction::Rest{ .. } => {
self
}
}
}
fn ack_packets(&self, packet_ids: &Vec<u64>) {
match self {
Transaction::First { .. } => { },
Transaction::Rest{
first_packet_id: _,
payload
} => {
let mut packets = payload.lock().unwrap();
(*packets).retain(|p| !packet_ids.contains(&p.packet_sync.packet_id));
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum ConnectionState {
Receiving,
Pending,
Sending
}
#[derive(Debug, Clone)]
struct ConnectionWaker {
state: ConnectionState,
waker: Option<Waker>
}
#[derive(Debug, Clone)]
pub struct MessageHandler {
peer_id: PeerId,
sender_src: SocketAddr,
chat_t: Chat,
timestamp_l: u64,
first_packet_sync: PacketSynchronizer,
data: Vec<(u64, Vec<u8>)>,
acknowledging: Vec<u64>,
acknowledged: HashSet<u64>
}
impl MessageHandler {
fn acknow(&mut self) {
self.acknowledged = self.acknowledged
.union(&HashSet::from_iter(self.acknowledging.clone()))
.map(|id| id.to_owned())
.collect::<HashSet<u64>>();
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PacketWindow {
pub(crate) packet_ids: Vec<u64>
}
impl PacketWindow {
pub(crate) fn serialize(&self) -> Vec<u8> {
bincode::serialize(&self).unwrap()
}
#[track_caller]
pub(crate) fn deserialize(bytes: Vec<u8>) -> Result<PacketWindow, Box<dyn Error>> {
match bincode::deserialize(&bytes) {
Ok(chat_sync) => Ok(chat_sync),
Err(_) => Err("Wrong size of `PacketWindow`".into())
}
}
}
#[derive(Debug, Clone)]
pub enum Acknowledgement {
First(PacketSynchronizer),
Rest(PacketWindow)
}
#[derive(Debug)]
pub enum MessageWrapper {
Receiving {
chat_t: Chat,
chat_sync: ChatSynchronizer,
payload: Vec<u8>
},
Sending {
receivers: Vec<Peer>,
chat_t: Chat,
chat_sync: ChatSynchronizer,
message: Message,
},
Acknowledgement {
chat_t: Chat,
chat_sync: ChatSynchronizer,
packets: Acknowledgement
},
Recover {
ack: bool,
peer_id: PeerId,
histories: ChatSynchronizers
},
Initial {
ack: bool,
peer: Peer,
history: ChatSynchronizer
},
SelfRecover {
ack: bool,
device_id: u16,
sync: SelfSynchronizer
},
SelfInitial {
ack: bool,
device_id: u16,
sync: SelfSynchronizer
}
}