mod iobuf;
mod serial;
mod tcp;
mod udp;
use super::proto::{self, Packet};
use super::util;
use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
use std::thread;
use std::time::{Duration, Instant};
#[derive(Debug, thiserror::Error)]
pub enum RecvError {
#[error("no packets available")]
NotReady,
#[error("port disconnected")]
Disconnected,
#[error("protocol error: {0}")]
Protocol(#[from] proto::Error),
#[error("I/O error: {0}")]
IO(#[from] io::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum SendError {
#[error("must drain before sending")]
MustDrain,
#[error("outgoing queue full")]
Full,
#[error("port disconnected")]
Disconnected,
#[error("I/O error: {0}")]
IO(#[from] io::Error),
#[error("packet exceeds protocol size limits")]
Serialization,
}
#[derive(Debug, thiserror::Error)]
pub enum RateError {
#[error("rate changes not supported on this port")]
Unsupported,
#[error("invalid rate")]
InvalidRate,
#[error("failed to set rate")]
Failed,
}
#[derive(Clone)]
pub struct RateInfo {
pub default_bps: u32,
pub target_bps: u32,
}
trait RawPort {
fn recv(&mut self) -> Result<Packet, RecvError>;
fn send(&mut self, pkt: &Packet) -> Result<(), SendError>;
fn drain(&mut self) -> Result<(), SendError> {
Ok(())
}
fn has_data_to_drain(&self) -> bool {
false
}
fn set_rate(&mut self, _rate: u32) -> Result<(), RateError> {
Err(RateError::Unsupported)
}
fn rate_info(&self) -> Option<RateInfo> {
None
}
fn max_send_interval(&self) -> Option<Duration> {
None
}
fn startup_holdoff(&self) -> bool {
false
}
}
enum AddrFamilyRestrict {
V4,
V6,
Either,
}
static TIO_DEFAULT_PORT: u16 = 7855;
fn find_addr(addr: &str, family: AddrFamilyRestrict) -> Result<SocketAddr, io::Error> {
let iter = match addr.to_socket_addrs() {
Ok(iter) => iter,
Err(err) => {
let addr_port = format!("{}:{}", addr, TIO_DEFAULT_PORT);
match addr_port.to_socket_addrs() {
Ok(iter) => iter,
Err(_) => {
let addr_port = format!("[{}]:{}", addr, TIO_DEFAULT_PORT);
match addr_port.to_socket_addrs() {
Ok(iter) => iter,
_ => {
return Err(err);
}
}
}
}
}
};
for sa in iter {
match sa {
SocketAddr::V4(_) => {
if let AddrFamilyRestrict::V6 = family {
continue;
}
}
SocketAddr::V6(_) => {
if let AddrFamilyRestrict::V4 = family {
continue;
}
}
}
return Ok(sa);
}
Err(io::Error::new(
io::ErrorKind::Other,
"address resolution failed",
))
}
enum PacketOrControl {
Pkt(Packet),
SetRate(u32),
}
enum ControlResult {
Success,
SetRateError(RateError),
}
pub struct Port {
tx: Option<Box<crossbeam::channel::Sender<PacketOrControl>>>,
waker: mio::Waker,
ctl_result: crossbeam::channel::Receiver<ControlResult>,
rates: Option<RateInfo>,
}
pub static DEFAULT_RX_CHANNEL_SIZE: usize = 32768;
pub static DEFAULT_TX_CHANNEL_SIZE: usize = 32768;
impl Port {
fn poller_thread<
RawPortT: RawPort + mio::event::Source,
RxCallbackT: Fn(Result<Packet, RecvError>) -> io::Result<()>,
>(
mut raw_port: RawPortT,
mut poll: mio::Poll,
rx: RxCallbackT,
tx: crossbeam::channel::Receiver<PacketOrControl>,
ctl_result: crossbeam::channel::Sender<ControlResult>,
) {
use crossbeam::channel::TryRecvError;
let mut events = mio::Events::with_capacity(1);
let mut needs_draining = false;
let mut needs_tx_queue_check = false;
poll.registry()
.register(&mut raw_port, mio::Token(1), mio::Interest::READABLE)
.expect("mio::Poll raw_port registration failure");
let mut last_sent = Instant::now();
let mut startup = raw_port.startup_holdoff();
'ioloop: loop {
let timeout = if needs_draining {
None
} else if let Some(max_interval) = raw_port.max_send_interval() {
Some({
let mut until_hb = max_interval.saturating_sub(last_sent.elapsed());
if (until_hb == Duration::ZERO) | startup {
match raw_port.send(&util::PacketBuilder::make_empty_heartbeat()) {
Err(SendError::MustDrain) => {
needs_draining = true;
poll.registry()
.reregister(
&mut raw_port,
mio::Token(1),
mio::Interest::READABLE.add(mio::Interest::WRITABLE),
)
.expect("Writable interest set failed (HB)");
continue;
}
Err(_) => {
break 'ioloop;
}
Ok(_) => {
last_sent = Instant::now();
until_hb = max_interval;
}
}
}
until_hb + Duration::from_millis(1)
})
} else {
None
};
poll.poll(&mut events, timeout).expect("Poll failed");
if startup {
startup = raw_port.startup_holdoff();
}
let mut check_tx_channel = false;
for event in events.iter() {
match event.token() {
mio::Token(0) => {
if needs_draining || startup {
needs_tx_queue_check = true;
} else {
check_tx_channel = true;
}
}
mio::Token(1) => {
if event.is_writable() {
if needs_draining {
match raw_port.drain() {
Ok(_) => {
needs_draining = false;
poll.registry()
.reregister(
&mut raw_port,
mio::Token(1),
mio::Interest::READABLE,
)
.expect("Readable interest set failed");
last_sent = Instant::now();
}
Err(SendError::MustDrain) => {
}
Err(_) => {
break 'ioloop;
}
}
} else {
#[cfg(unix)]
panic!("Unexpected writable raw port when not draining");
}
}
loop {
match raw_port.recv() {
Ok(pkt) => {
if startup {
} else if let Err(_) = rx(Ok(pkt)) {
break 'ioloop;
}
}
Err(RecvError::NotReady) => {
break;
}
Err(e) => {
let disconnect = if let RecvError::Disconnected = e {
true
} else {
false
};
let ignore =
if let RecvError::Protocol(proto::Error::Text(_)) = e {
false
} else {
startup
};
if (!ignore && rx(Err(e)).is_err()) || disconnect {
break 'ioloop;
}
}
};
}
}
mio::Token(x) => {
panic!("Unexpected token {}", x);
}
}
}
if !needs_draining && !startup && needs_tx_queue_check {
check_tx_channel = true;
needs_tx_queue_check = false;
}
if check_tx_channel {
loop {
match tx.try_recv() {
Ok(PacketOrControl::Pkt(pkt)) => {
match raw_port.send(&pkt) {
Err(SendError::MustDrain) => {
needs_draining = true;
poll.registry()
.reregister(
&mut raw_port,
mio::Token(1),
mio::Interest::READABLE.add(mio::Interest::WRITABLE),
)
.expect("Writable interest set failed (TX)");
}
Err(SendError::Full) => {
}
Err(_) => {
break 'ioloop;
}
Ok(_) => {
last_sent = Instant::now();
}
}
}
Ok(PacketOrControl::SetRate(rate)) => {
if let Err(_) = ctl_result.send(match raw_port.set_rate(rate) {
Ok(_) => ControlResult::Success,
Err(e) => ControlResult::SetRateError(e),
}) {
break 'ioloop;
}
}
Err(TryRecvError::Empty) => {
break;
}
Err(TryRecvError::Disconnected) => {
break 'ioloop;
}
}
}
}
}
}
fn from_raw_custom<
RawPortT: RawPort + mio::event::Source + Send + 'static,
RxCallbackT: Fn(Result<Packet, RecvError>) -> io::Result<()> + Send + 'static,
>(
raw_port: RawPortT,
rx: RxCallbackT,
tx_size: usize,
) -> io::Result<Port> {
let rates = raw_port.rate_info();
let (tx, ttx) = crossbeam::channel::bounded::<PacketOrControl>(std::cmp::max(
DEFAULT_RX_CHANNEL_SIZE,
tx_size,
));
let (ctl_ret_sender, ctl_ret_receiver) = crossbeam::channel::bounded::<ControlResult>(1);
let poll = mio::Poll::new()?;
let waker = mio::Waker::new(poll.registry(), mio::Token(0))?;
thread::spawn(move || {
#[cfg(target_os = "windows")]
let _priority = super::os::windows_helpers::ActivityGuard::latency_critical()
.map_err(|e| eprintln!("port poller: failed to raise thread priority: {e}"))
.ok();
#[cfg(target_os = "macos")]
let _activity =
super::os::macos_helpers::ActivityGuard::latency_critical("Twinleaf I/O poller");
Port::poller_thread(raw_port, poll, rx, ttx, ctl_ret_sender);
});
io::Result::Ok(Port {
tx: Some(Box::new(tx)),
ctl_result: ctl_ret_receiver,
waker: waker,
rates: rates,
})
}
fn from_raw<
RawPortT: RawPort + mio::event::Source + Send + 'static,
RxCallbackT: Fn(Result<Packet, RecvError>) -> io::Result<()> + Send + 'static,
>(
raw_port: RawPortT,
rx: RxCallbackT,
) -> io::Result<Port> {
Self::from_raw_custom(raw_port, rx, DEFAULT_RX_CHANNEL_SIZE)
}
pub fn new<RXT: Fn(Result<Packet, RecvError>) -> io::Result<()> + Send + 'static>(
url: &str,
rx: RXT,
) -> io::Result<Port> {
#[cfg(unix)]
if url.starts_with("/dev/") {
return Port::from_raw(serial::Port::new(url)?, rx);
}
#[cfg(target_os = "windows")]
if url.starts_with("COM") {
return Port::from_raw(serial::Port::new(url)?, rx);
}
let split_url: Vec<&str> = url.splitn(2, "://").collect();
match split_url[..] {
["serial", port] => Port::from_raw(serial::Port::new(port)?, rx),
["tcp", addr] => Port::from_raw(
tcp::Port::new(&find_addr(addr, AddrFamilyRestrict::Either)?)?,
rx,
),
["udp", addr] => Port::from_raw(
udp::Port::new(&find_addr(addr, AddrFamilyRestrict::Either)?)?,
rx,
),
["tcp4", addr] => Port::from_raw(
tcp::Port::new(&find_addr(addr, AddrFamilyRestrict::V4)?)?,
rx,
),
["udp4", addr] => Port::from_raw(
udp::Port::new(&find_addr(addr, AddrFamilyRestrict::V4)?)?,
rx,
),
["tcp6", addr] => Port::from_raw(
tcp::Port::new(&find_addr(addr, AddrFamilyRestrict::V6)?)?,
rx,
),
["udp6", addr] => Port::from_raw(
udp::Port::new(&find_addr(addr, AddrFamilyRestrict::V6)?)?,
rx,
),
_ => io::Result::Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid url")),
}
}
pub fn from_mio_stream<
RXT: Fn(Result<Packet, RecvError>) -> io::Result<()> + Send + 'static,
>(
stream: mio::net::TcpStream,
rx: RXT,
) -> io::Result<Port> {
Port::from_raw(tcp::Port::from_stream(stream)?, rx)
}
pub fn from_tcp_stream<
RXT: Fn(Result<Packet, RecvError>) -> io::Result<()> + Send + 'static,
>(
stream: std::net::TcpStream,
rx: RXT,
) -> io::Result<Port> {
stream.set_nonblocking(true)?;
Port::from_mio_stream(mio::net::TcpStream::from_std(stream), rx)
}
pub fn from_mio_stream_custom<
RXT: Fn(Result<Packet, RecvError>) -> io::Result<()> + Send + 'static,
>(
stream: mio::net::TcpStream,
rx: RXT,
tx_size: usize,
) -> io::Result<Port> {
Port::from_raw_custom(tcp::Port::from_stream(stream)?, rx, tx_size)
}
pub fn from_tcp_stream_custom<
RXT: Fn(Result<Packet, RecvError>) -> io::Result<()> + Send + 'static,
>(
stream: std::net::TcpStream,
rx: RXT,
tx_size: usize,
) -> io::Result<Port> {
stream.set_nonblocking(true)?;
Port::from_mio_stream_custom(mio::net::TcpStream::from_std(stream), rx, tx_size)
}
pub fn rx_channel() -> (
crossbeam::channel::Sender<Result<Packet, RecvError>>,
crossbeam::channel::Receiver<Result<Packet, RecvError>>,
) {
Port::rx_channel_custom(DEFAULT_RX_CHANNEL_SIZE)
}
pub fn rx_to_channel(
rx_send: crossbeam::channel::Sender<Result<Packet, RecvError>>,
) -> impl Fn(Result<Packet, RecvError>) -> io::Result<()> {
Port::rx_to_channel_cb(rx_send, |_| {})
}
pub fn rx_channel_custom(
size: usize,
) -> (
crossbeam::channel::Sender<Result<Packet, RecvError>>,
crossbeam::channel::Receiver<Result<Packet, RecvError>>,
) {
crossbeam::channel::bounded::<Result<Packet, RecvError>>(size)
}
pub fn rx_to_channel_cb<FullCBT: Fn(Result<Packet, RecvError>) -> () + Send + 'static>(
rx_send: crossbeam::channel::Sender<Result<Packet, RecvError>>,
full_cb: FullCBT,
) -> impl Fn(Result<Packet, RecvError>) -> io::Result<()> {
move |rxdata| -> io::Result<()> {
if let Err(RecvError::Disconnected) = rxdata {
return Err(io::Error::from(io::ErrorKind::BrokenPipe));
}
use crossbeam::channel::TrySendError;
match rx_send.try_send(rxdata) {
Err(TrySendError::Full(res)) => {
full_cb(res);
Ok(())
}
Err(e) => {
if let TrySendError::Disconnected(_) = e {
Err(io::Error::from(io::ErrorKind::BrokenPipe))
} else {
Err(io::Error::from(io::ErrorKind::Other))
}
}
Ok(_) => Ok(()),
}
}
}
pub fn send(&self, packet: Packet) -> Result<(), SendError> {
let tx = self.tx.as_ref().expect("Tx channel invalid");
if let Err(_) = tx.send(PacketOrControl::Pkt(packet)) {
Err(SendError::Disconnected)
} else if let Err(_) = self.waker.wake() {
panic!("Wake failed");
} else {
Ok(())
}
}
pub fn try_send(&self, packet: Packet) -> Result<(), SendError> {
use crossbeam::channel::TrySendError;
let tx = self.tx.as_ref().expect("Tx channel invalid");
match tx.try_send(PacketOrControl::Pkt(packet)) {
Ok(()) => {
if let Err(_) = self.waker.wake() {
panic!("Wake failed");
} else {
Ok(())
}
}
Err(TrySendError::Full(_data)) => Err(SendError::Full),
Err(_) => Err(SendError::Disconnected),
}
}
pub fn rate_info(&self) -> Option<RateInfo> {
self.rates.clone()
}
pub fn set_rate(&self, rate: u32) -> Result<(), RateError> {
let tx = self.tx.as_ref().expect("Tx channel invalid");
if let Err(_) = tx.send(PacketOrControl::SetRate(rate)) {
return Err(RateError::Failed);
} else if let Err(_) = self.waker.wake() {
panic!("Wake failed");
}
match self.ctl_result.recv().expect("Missing control result") {
ControlResult::Success => Ok(()),
ControlResult::SetRateError(err) => Err(err),
}
}
}