use std::io;
use std::time::{Duration, Instant};
use mio::net::TcpStream;
use mio::{Interest, Registry, Token};
use slab::Slab;
use crate::connector::Target;
use crate::message_stream::{MessageStream, ReadError};
use crate::{Config, Message, PeerId, StreamConfig};
#[derive(Debug, Clone)]
enum Direction {
Inbound { peer: PeerId },
Outbound { target: Target, state: State },
}
#[derive(Debug, Clone)]
enum State {
Connecting { start: Instant },
Connected { peer: PeerId },
}
pub struct Connection {
stream: MessageStream<TcpStream>,
direction: Direction,
token: Token,
}
impl Connection {
pub fn read<M: Message, F: Fn(M, usize)>(
&mut self,
rx_buf: &mut [u8],
on_msg: F,
) -> Result<bool, ReadError> {
self.stream.read(rx_buf, on_msg)
}
pub fn queue_message<M: Message>(
&mut self,
message: &M,
registry: &Registry,
) -> io::Result<bool> {
if self.stream.queue_message(message) {
registry.reregister(
self.stream.as_source(),
self.token,
Interest::READABLE | Interest::WRITABLE,
)?;
Ok(true)
} else {
Ok(false)
}
}
pub fn write(
&mut self,
now: Instant,
registry: &Registry,
token: Token,
) -> io::Result<io::Result<()>> {
match self.stream.write(now) {
Ok(true) => Ok(Ok(())),
Ok(false) => {
registry.reregister(self.stream.as_source(), token, Interest::READABLE)?;
Ok(Ok(()))
}
Err(err) => Ok(Err(err)),
}
}
}
pub enum Connectedness<'a> {
Nonexistent,
NotReady,
Ready {
peer: PeerId,
connection: &'a mut Connection,
},
New {
peer: PeerId,
target: Target,
},
Errored {
target: Target,
error: io::Error,
},
}
pub enum Dead {
OutboundTimeout(Target),
WriteStale(PeerId),
}
pub struct Manager {
connections: Slab<Connection>,
token_map: hashbrown::HashMap<PeerId, Token>,
next_peer_id: PeerId,
dead: Vec<Dead>,
last_buffer_resize: Instant,
last_cleanup: Instant,
}
impl Manager {
pub fn new() -> Self {
Self {
connections: Slab::with_capacity(16),
token_map: hashbrown::HashMap::with_capacity(16),
next_peer_id: PeerId(0),
dead: Vec::with_capacity(8),
last_buffer_resize: Instant::now(),
last_cleanup: Instant::now(),
}
}
pub fn get_by_peer_id(&mut self, peer: &PeerId) -> Option<&mut Connection> {
self.token_map.get_mut(peer).map(|token| {
self.connections
.get_mut(token.0)
.expect("get: token -> connection must exist")
})
}
pub fn try_ready<'a>(
&'a mut self,
token: &Token,
registry: &Registry,
) -> io::Result<Connectedness<'a>> {
let connect_err = {
let Some(connection) = self.connections.get(token.0) else {
return Ok(Connectedness::Nonexistent);
};
match &connection.direction {
Direction::Outbound {
target,
state: State::Connecting { .. },
} => connection
.stream
.take_error()
.map(|err| (err, target.clone())),
_ => None,
}
};
if let Some((err, target)) = connect_err {
let mut connection = self.connections.remove(token.0);
let _ = registry.deregister(connection.stream.as_source());
Ok(Connectedness::Errored { target, error: err })
} else {
let connection = self
.connections
.get_mut(token.0)
.expect("try_ready: token -> connection must exist");
match &connection.direction {
Direction::Outbound {
target,
state: State::Connecting { .. },
} => {
if !connection.stream.is_ready() {
log::debug!("stream not ready: {:?}", connection.direction);
Ok(Connectedness::NotReady)
} else {
log::debug!("stream ready: {:?}", connection.direction);
let peer = self.next_peer_id;
let prev_mapping = self.token_map.insert(peer, *token);
assert!(prev_mapping.is_none());
self.next_peer_id = self.next_peer_id.next();
let target = target.clone();
connection.direction = Direction::Outbound {
target: target.clone(),
state: State::Connected { peer },
};
registry.reregister(
connection.stream.as_source(),
*token,
Interest::READABLE,
)?;
Ok(Connectedness::New { peer, target })
}
}
Direction::Outbound {
state: State::Connected { peer },
..
}
| Direction::Inbound { peer, .. } => Ok(Connectedness::Ready {
peer: *peer,
connection,
}),
}
}
}
pub fn add_outbound(
&mut self,
registry: &Registry,
mut stream: TcpStream,
stream_cfg: StreamConfig,
start: Instant,
target: Target,
) -> std::io::Result<()> {
let vacancy = self.connections.vacant_entry();
let token = Token(vacancy.key());
registry.register(&mut stream, Token(vacancy.key()), Interest::WRITABLE)?;
vacancy.insert(Connection {
stream: MessageStream::new(stream, stream_cfg),
direction: Direction::Outbound {
target,
state: State::Connecting { start },
},
token,
});
Ok(())
}
pub fn add_inbound(
&mut self,
registry: &Registry,
mut stream: TcpStream,
stream_cfg: StreamConfig,
) -> std::io::Result<PeerId> {
let vacancy = self.connections.vacant_entry();
let token = Token(vacancy.key());
registry.register(&mut stream, token, Interest::READABLE)?;
let peer = self.next_peer_id;
let prev_mapping = self.token_map.insert(peer, token);
assert!(prev_mapping.is_none());
self.next_peer_id = self.next_peer_id.next();
vacancy.insert(Connection {
stream: MessageStream::new(stream, stream_cfg),
direction: Direction::Inbound { peer },
token,
});
Ok(peer)
}
pub fn disconnect(
&mut self,
peer: &PeerId,
registry: &Registry,
now: Instant,
) -> io::Result<bool> {
match self.token_map.remove(peer) {
Some(token) => {
let mut connection = self
.connections
.try_remove(token.0)
.expect("disconnect: token -> connection must exist");
registry.deregister(connection.stream.as_source())?;
let _ = connection.stream.write(now);
let _ = connection.stream.shutdown();
Ok(true)
}
None => Ok(false),
}
}
pub fn compact<F: FnMut(Token, Token)>(
&mut self,
now: Instant,
registry: &Registry,
mut rekey: F,
) -> io::Result<()> {
if (now - self.last_buffer_resize).as_secs() > 30 {
for (_, connection) in &mut self.connections {
connection.stream.shrink_buffers();
}
self.last_buffer_resize = now;
}
let mut result = Ok(());
let capacity = self.connections.capacity();
if capacity > 32 && capacity >= self.connections.len() * 2 {
self.connections.compact(|c, from, to| {
let (_, entry) = self
.token_map
.iter_mut()
.find(|(_, token)| token.0 == from)
.unwrap();
*entry = Token(to);
c.token = Token(to);
rekey(Token(from), Token(to));
let interest = if c.stream.has_queued_data() {
Interest::READABLE | Interest::WRITABLE
} else {
Interest::READABLE
};
if let Err(err) = registry.reregister(c.stream.as_source(), Token(to), interest) {
result = Err(err);
false
} else {
true
}
});
}
result
}
pub fn remove_dead(
&mut self,
now: Instant,
config: &Config,
registry: &Registry,
) -> io::Result<impl Iterator<Item = Dead>> {
if (now - self.last_cleanup).as_millis() >= 1000 {
self.last_cleanup = now;
self.dead.clear();
self.dead.shrink_to(8);
let mut deregister_error = Ok(());
self.connections.retain(|_, connection| {
let retain = match &connection.direction {
Direction::Outbound {
target,
state: State::Connecting { start },
} if (now - *start) > config.stream_config.connect_timeout => {
log::debug!("connect timeout: {:?}", &connection.direction);
self.dead.push(Dead::OutboundTimeout(target.clone()));
false
}
Direction::Inbound { peer, .. }
| Direction::Outbound {
state: State::Connected { peer },
..
} if connection.stream.is_write_stale(now) => {
self.dead.push(Dead::WriteStale(*peer));
self.token_map.remove(peer);
false
}
_ => true,
};
if !retain {
if let Err(err) = registry.deregister(connection.stream.as_source()) {
deregister_error = Err(err);
}
}
retain
});
deregister_error.map(|_| self.dead.drain(..))
} else {
self.dead.clear();
Ok(self.dead.drain(..))
}
}
pub fn shutdown(self, now: Instant) {
for (token, mut connection) in self.connections {
let _ = connection.stream.write(now);
if connection
.stream
.is_write_stale(now + Duration::from_secs(3600))
{
log::debug!("shutdown: connection had unsent data");
}
let shutdown_result = connection.stream.shutdown();
log::debug!("shutdown: stream {}: {:?}", token, shutdown_result);
}
}
#[inline(always)]
pub fn has_slot(&self, n_listeners: usize) -> bool {
self.connections.vacant_key() < usize::MAX - n_listeners
}
}