use std::any;
use std::collections::VecDeque;
use std::fmt;
use std::net;
use std::sync::{Arc, Mutex, RwLock};
use crate::endpoint;
use crate::frame;
use crate::socket;
use crate::timer_queue;
use crate::ErrorKind;
use crate::SendMode;
use super::epoch;
use super::Event;
struct PeerCore {
addr: net::SocketAddr,
data: Option<Arc<dyn any::Any + Send + Sync>>,
epoch: Arc<epoch::Epoch>,
socket_tx: Arc<socket::SocketTx>,
rto_timer: Option<u64>,
events: Arc<Mutex<VecDeque<Event>>>,
}
pub struct Peer {
core: PeerCore,
endpoint: endpoint::Endpoint,
}
impl Peer {
pub fn new(
addr: net::SocketAddr,
endpoint_config: endpoint::Config,
epoch: Arc<epoch::Epoch>,
socket_tx: Arc<socket::SocketTx>,
events: Arc<Mutex<VecDeque<Event>>>,
) -> Self {
Self {
core: PeerCore {
addr,
data: None,
epoch,
socket_tx,
events,
rto_timer: None,
},
endpoint: endpoint::Endpoint::new(endpoint_config),
}
}
pub fn addr(&self) -> &net::SocketAddr {
&self.core.addr
}
}
struct EndpointContext<'a> {
peer: &'a mut PeerCore,
peer_ref: &'a PeerRef,
}
impl<'a> EndpointContext<'a> {
pub fn new(peer: &'a mut PeerCore, peer_ref: &'a PeerRef) -> Self {
Self { peer, peer_ref }
}
}
impl<'a> endpoint::HostContext for EndpointContext<'a> {
fn send_frame(&mut self, frame_bytes: &[u8]) {
self.peer.socket_tx.send(frame_bytes, &self.peer.addr);
}
fn set_rto_timer(&mut self, time_ms: u64) {
self.peer.rto_timer = Some(time_ms);
}
fn unset_rto_timer(&mut self) {
self.peer.rto_timer = None;
}
fn on_connect(&mut self) {
let handle = PeerHandle::new(Arc::clone(self.peer_ref));
let event = Event::Connect(handle);
self.peer.events.lock().unwrap().push_back(event);
}
fn on_disconnect(&mut self) {
let handle = PeerHandle::new(Arc::clone(self.peer_ref));
let event = Event::Disconnect(handle);
self.peer.events.lock().unwrap().push_back(event);
}
fn on_receive(&mut self, packet_bytes: Box<[u8]>) {
let handle = PeerHandle::new(Arc::clone(self.peer_ref));
let event = Event::Receive(handle, packet_bytes);
self.peer.events.lock().unwrap().push_back(event);
}
fn on_timeout(&mut self) {
let handle = PeerHandle::new(Arc::clone(self.peer_ref));
let event = Event::Error(handle, ErrorKind::Timeout);
self.peer.events.lock().unwrap().push_back(event);
}
}
pub type PeerRef = Arc<RwLock<Peer>>;
impl timer_queue::Timer for RwLock<Peer> {
fn test(&self, time_now: u64) -> bool {
if let Ok(peer) = &mut self.write() {
if let Some(t) = &peer.core.rto_timer {
if *t < time_now {
peer.core.rto_timer = None;
return true;
}
}
}
false
}
}
#[derive(Clone)]
pub struct PeerHandle {
peer: PeerRef,
}
impl PeerHandle {
pub(super) fn new(peer: PeerRef) -> Self {
Self { peer }
}
pub fn data(&self) -> Option<Arc<dyn any::Any + Send + Sync>> {
self.peer.read().unwrap().core.data.clone()
}
pub fn set_data(&self, data: Option<Arc<dyn any::Any + Send + Sync>>) {
self.peer.write().unwrap().core.data = data;
}
pub fn send(&mut self, packet_bytes: Box<[u8]>, mode: SendMode) {
let peer = &mut (*self.peer.write().unwrap());
let now_ms = peer.core.epoch.time_now_ms();
let ctx = &mut EndpointContext::new(&mut peer.core, &self.peer);
peer.endpoint.enqueue(packet_bytes, mode, now_ms);
peer.endpoint.flush(now_ms, ctx);
}
pub fn enqueue(&mut self, packet_bytes: Box<[u8]>, mode: SendMode) {
let peer = &mut (*self.peer.write().unwrap());
let now_ms = peer.core.epoch.time_now_ms();
peer.endpoint.enqueue(packet_bytes, mode, now_ms);
}
pub fn flush(&mut self) {
let peer = &mut (*self.peer.write().unwrap());
let now_ms = peer.core.epoch.time_now_ms();
let ctx = &mut EndpointContext::new(&mut peer.core, &self.peer);
peer.endpoint.flush(now_ms, ctx);
}
pub fn disconnect(&mut self) {
let peer = &mut (*self.peer.write().unwrap());
let now_ms = peer.core.epoch.time_now_ms();
let ctx = &mut EndpointContext::new(&mut peer.core, &self.peer);
peer.endpoint.disconnect(now_ms, ctx);
}
}
impl std::cmp::PartialEq for PeerHandle {
fn eq(&self, other: &PeerHandle) -> bool {
Arc::ptr_eq(&self.peer, &other.peer)
}
}
impl std::cmp::Eq for PeerHandle {}
impl std::fmt::Debug for PeerHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeerHandle").finish()
}
}
pub fn init(peer_ref: &PeerRef) {
let peer = &mut (*peer_ref.write().unwrap());
let endpoint = &mut peer.endpoint;
let peer_core = &mut peer.core;
let now_ms = peer_core.epoch.time_now_ms();
let ctx = &mut EndpointContext::new(peer_core, peer_ref);
endpoint.init(now_ms, ctx);
}
pub fn handle_frame(peer_ref: &PeerRef, frame_type: frame::FrameType, payload_bytes: &[u8]) {
let peer = &mut (*peer_ref.write().unwrap());
let endpoint = &mut peer.endpoint;
let peer_core = &mut peer.core;
let now_ms = peer_core.epoch.time_now_ms();
let ctx = &mut EndpointContext::new(peer_core, peer_ref);
endpoint.handle_frame(frame_type, payload_bytes, now_ms, ctx);
}
pub fn handle_rto_timer(peer_ref: &PeerRef) -> endpoint::TimeoutAction {
let peer = &mut (*peer_ref.write().unwrap());
let now_ms = peer.core.epoch.time_now_ms();
let ctx = &mut EndpointContext::new(&mut peer.core, peer_ref);
peer.endpoint.handle_rto_timer(now_ms, ctx)
}