use super::endpoint::{Endpoint};
use super::resource_id::{ResourceId, ResourceType};
use super::poll::{Poll};
use super::registry::{ResourceRegistry};
use super::remote_addr::{RemoteAddr};
use super::adapter::{Adapter, Remote, Local, SendStatus, AcceptedType, ReadStatus};
use std::net::{SocketAddr};
use std::sync::{Arc};
use std::io::{self};
#[cfg(doctest)]
use super::transport::{Transport};
pub enum NetEvent<'a> {
Connected(Endpoint, ResourceId),
Message(Endpoint, &'a [u8]),
Disconnected(Endpoint),
}
impl std::fmt::Debug for NetEvent<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let string = match self {
Self::Connected(endpoint, id) => format!("Connected({}, {})", endpoint, id),
Self::Message(endpoint, data) => format!("Message({}, {})", endpoint, data.len()),
Self::Disconnected(endpoint) => format!("Disconnected({})", endpoint),
};
write!(f, "NetEvent::{}", string)
}
}
pub trait ActionController: Send + Sync {
fn connect(&self, addr: RemoteAddr) -> io::Result<(Endpoint, SocketAddr)>;
fn listen(&self, addr: SocketAddr) -> io::Result<(ResourceId, SocketAddr)>;
fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus;
fn remove(&self, id: ResourceId) -> bool;
}
pub trait EventProcessor: Send + Sync {
fn process(&self, resource_id: ResourceId, event_callback: &mut dyn FnMut(NetEvent<'_>));
}
pub struct Driver<R: Remote, L: Local> {
remote_registry: Arc<ResourceRegistry<R>>,
local_registry: Arc<ResourceRegistry<L>>,
}
impl<R: Remote, L: Local> Driver<R, L> {
pub fn new(
_: impl Adapter<Remote = R, Local = L>,
adapter_id: u8,
poll: &mut Poll,
) -> Driver<R, L> {
let remote_poll_registry = poll.create_registry(adapter_id, ResourceType::Remote);
let local_poll_registry = poll.create_registry(adapter_id, ResourceType::Local);
Driver {
remote_registry: Arc::new(ResourceRegistry::<R>::new(remote_poll_registry)),
local_registry: Arc::new(ResourceRegistry::<L>::new(local_poll_registry)),
}
}
}
impl<R: Remote, L: Local> Clone for Driver<R, L> {
fn clone(&self) -> Driver<R, L> {
Driver {
remote_registry: self.remote_registry.clone(),
local_registry: self.local_registry.clone(),
}
}
}
impl<R: Remote, L: Local> ActionController for Driver<R, L> {
fn connect(&self, addr: RemoteAddr) -> io::Result<(Endpoint, SocketAddr)> {
R::connect(addr).map(|info| {
(
Endpoint::new(
self.remote_registry.add(info.remote, info.peer_addr),
info.peer_addr,
),
info.local_addr,
)
})
}
fn listen(&self, addr: SocketAddr) -> io::Result<(ResourceId, SocketAddr)> {
L::listen(addr)
.map(|info| (self.local_registry.add(info.local, info.local_addr), info.local_addr))
}
fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus {
match endpoint.resource_id().resource_type() {
ResourceType::Remote => match self.remote_registry.get(endpoint.resource_id()) {
Some(remote) => remote.resource.send(data),
None => SendStatus::ResourceNotFound,
},
ResourceType::Local => match self.local_registry.get(endpoint.resource_id()) {
Some(remote) => remote.resource.send_to(endpoint.addr(), data),
None => SendStatus::ResourceNotFound,
},
}
}
fn remove(&self, id: ResourceId) -> bool {
match id.resource_type() {
ResourceType::Remote => self.remote_registry.remove(id),
ResourceType::Local => self.local_registry.remove(id),
}
}
}
impl<R: Remote, L: Local<Remote = R>> EventProcessor for Driver<R, L> {
fn process(&self, id: ResourceId, event_callback: &mut dyn FnMut(NetEvent<'_>)) {
match id.resource_type() {
ResourceType::Remote => self.process_remote(id, event_callback),
ResourceType::Local => self.process_local(id, event_callback),
}
}
}
impl<R: Remote, L: Local<Remote = R>> Driver<R, L> {
fn process_remote(&self, id: ResourceId, mut event_callback: impl FnMut(NetEvent<'_>)) {
if let Some(remote) = self.remote_registry.get(id) {
let endpoint = Endpoint::new(id, remote.addr);
log::trace!("Processed remote for {}", endpoint);
let status = remote.resource.receive(|data| {
event_callback(NetEvent::Message(endpoint, data));
});
log::trace!("Processed remote receive status {}", status);
if let ReadStatus::Disconnected = status {
if self.remote_registry.remove(id) {
event_callback(NetEvent::Disconnected(endpoint));
}
}
}
}
fn process_local(&self, id: ResourceId, mut event_callback: impl FnMut(NetEvent<'_>)) {
if let Some(local) = self.local_registry.get(id) {
log::trace!("Processed local for {}", id);
local.resource.accept(|accepted| {
log::trace!("Processed local accepted type {}", accepted);
match accepted {
AcceptedType::Remote(addr, remote) => {
let remote_id = self.remote_registry.add(remote, addr);
let endpoint = Endpoint::new(remote_id, addr);
event_callback(NetEvent::Connected(endpoint, id));
}
AcceptedType::Data(addr, data) => {
let endpoint = Endpoint::new(id, addr);
event_callback(NetEvent::Message(endpoint, data));
}
}
});
}
}
}
impl std::fmt::Display for ReadStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let string = match self {
ReadStatus::Disconnected => "Disconnected",
ReadStatus::WaitNextEvent => "WaitNextEvent",
};
write!(f, "ReadStatus::{}", string)
}
}
impl<R> std::fmt::Display for AcceptedType<'_, R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let string = match self {
AcceptedType::Remote(addr, _) => format!("Remote({})", addr),
AcceptedType::Data(addr, _) => format!("Data({})", addr),
};
write!(f, "AcceptedType::{}", string)
}
}