tcp_typed 0.1.4

A wrapper around platform TCP socket APIs that leverages the type system to ensure correct usage. It's quite easy to accidentally misuse the Berkeley sockets or similar APIs, resulting in ECONNRESET/EPIPE/etc, data being lost on close, and potential hangs from non-exhaustive collection of edge-triggered events. This library aims to make it impossible to misuse in non-unsafe code.
Documentation
use nix;
use std::{cmp, fmt, os, ptr};

#[derive(Clone)]
pub struct CircularBuffer<T> {
	head: usize,
	tail: usize,
	buf: Vec<T>,
	read: usize,
	written: usize,
}
impl<T> CircularBuffer<T> {
	pub fn new(cap: usize) -> Self {
		let mut buf = Vec::with_capacity(cap);
		unsafe { buf.set_len(cap) };
		Self {
			head: 0,
			tail: 0,
			buf,
			read: 0,
			written: 0,
		}
	}
	#[inline(always)]
	pub fn capacity(&self) -> usize {
		self.buf.len()
	}
	#[inline(always)]
	pub fn read_available(&self) -> usize {
		self.head - self.tail
	}
	#[inline(always)]
	pub fn write_available(&self) -> usize {
		self.capacity() - self.read_available()
	}
	#[must_use]
	#[inline(always)]
	pub fn read<'a>(&'a mut self) -> Option<impl FnOnce() -> T + 'a> {
		if self.read_available() > 0 {
			Some(move || {
				let off = self.tail;
				let ret = unsafe { ptr::read(self.buf.get_unchecked(off)) };
				self.tail += 1;
				if self.tail >= self.capacity() {
					self.head -= self.capacity();
					self.tail -= self.capacity();
				}
				self.read += 1;
				ret
			})
		} else {
			None
		}
	}
	#[must_use]
	#[inline(always)]
	pub fn write<'a>(&'a mut self) -> Option<impl FnOnce(T) + 'a> {
		if self.write_available() > 0 {
			Some(move |t| {
				let off = self.head % self.capacity();
				unsafe { ptr::write(self.buf.get_unchecked_mut(off), t) };
				self.head += 1;
				self.written += 1;
			})
		} else {
			None
		}
	}
}
impl<T> Drop for CircularBuffer<T> {
	fn drop(&mut self) {
		while self.read_available() > 0 {
			let _ = self.read();
		}
		unsafe { self.buf.set_len(0) };
	}
}
impl CircularBuffer<u8> {
	pub fn read_to_fd(&mut self, fd: os::unix::io::RawFd) -> Result<usize, nix::Error> {
		let mut written = 0;
		loop {
			if self.read_available() > 0 {
				let a_start = self.tail;
				let a_end = cmp::min(self.capacity(), a_start + self.read_available());
				let b_start = 0;
				let b_end = self.read_available() - (a_end - a_start);
				match nix::sys::socket::sendmsg(
					fd,
					&[
						nix::sys::uio::IoVec::from_slice(&self.buf[a_start..a_end]),
						nix::sys::uio::IoVec::from_slice(&self.buf[b_start..b_end]),
					][0..if b_start != b_end { 2 } else { 1 }],
					&[],
					nix::sys::socket::MsgFlags::empty(),
					None,
				) {
					Err(nix::Error::Sys(nix::errno::Errno::EAGAIN)) => return Ok(written),
					Ok(n) => {
						self.tail += n;
						if self.tail >= self.capacity() {
							self.head -= self.capacity();
							self.tail -= self.capacity();
						}
						self.read += n;
						written += n;
					}
					Err(err) => return Err(err),
				}
			} else {
				match nix::sys::socket::getsockopt(fd, nix::sys::socket::sockopt::SocketError)
					.unwrap()
				{
					0 => return Ok(written),
					err => return Err(nix::Error::Sys(nix::errno::Errno::from_i32(err))),
				}
			}
		}
	}
	pub fn write_from_fd(&mut self, fd: os::unix::io::RawFd) -> Result<(usize, bool), nix::Error> {
		let mut read = 0;
		loop {
			if self.write_available() > 0 {
				let a_start = self.head % self.capacity();
				let a_end = cmp::min(self.capacity(), a_start + self.write_available());
				let b_start = 0;
				let b_end = self.write_available() - (a_end - a_start);
				let (b, a) = self.buf.split_at_mut(b_end);
				match nix::sys::socket::recvmsg(
					fd,
					&[
						nix::sys::uio::IoVec::from_mut_slice(
							&mut a[a_start - b_end..a_end - b_end],
						),
						nix::sys::uio::IoVec::from_mut_slice(b),
					][0..if b_start != b_end { 2 } else { 1 }],
					None,
					nix::sys::socket::MsgFlags::empty(),
				)
				.map(|x| x.bytes)
				{
					Err(nix::Error::Sys(nix::errno::Errno::EAGAIN)) => return Ok((read, false)),
					Ok(0) => return Ok((read, true)),
					Ok(n) => {
						self.head += n;
						self.written += n;
						read += n;
					}
					Err(err) => return Err(err),
				}
			} else {
				match nix::sys::socket::recvmsg(
					fd,
					&[nix::sys::uio::IoVec::from_mut_slice(&mut [0])],
					None,
					nix::sys::socket::MsgFlags::MSG_PEEK,
				)
				.map(|x| x.bytes)
				{
					Err(nix::Error::Sys(nix::errno::Errno::EAGAIN)) | Ok(1) => {
						return Ok((read, false))
					}
					Ok(0) => return Ok((read, true)),
					Err(err) => return Err(err),
					Ok(_) => unreachable!(),
				}
			}
		}
	}
}
impl<T> fmt::Debug for CircularBuffer<T>
where
	for<'a> &'a T: fmt::Debug,
{
	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
		fmt.debug_struct("CircularBuffer")
			.field("written", &self.written)
			.field("read", &self.read)
			.field(
				"contents",
				&format!(
					"{}/{} \"{:?}\"",
					self.read_available(),
					self.capacity(),
					(0..self.read_available())
						.map(|i| unsafe {
							self.buf.get_unchecked((self.tail + i) % self.capacity())
						})
						.collect::<Vec<_>>()
				),
			)
			.finish()
	}
}