1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::path::Path;
use tokio::future::poll_fn;
use tokio::io::PollEvented;
use tokio::net::unix::UCred;
use std::task::{Context, Poll};
use ::mio::Ready;

pub struct UnixSeqpacket {
	io: PollEvented<crate::mio::EventedSocket>,
}

impl UnixSeqpacket {
	pub(crate) fn new(socket: socket2::Socket) -> std::io::Result<Self> {
		let socket = crate::mio::EventedSocket::new(socket);
		let io = PollEvented::new(socket)?;
		Ok(Self { io })
	}

	/// Connect a new seqpacket socket to the given address.
	pub async fn connect<P: AsRef<Path>>(address: P) -> std::io::Result<Self> {
		let address = socket2::SockAddr::unix(address)?;
		let socket = socket2::Socket::new(socket2::Domain::unix(), crate::socket_type(), None)?;
		match socket.connect(&address) {
			Ok(()) => (),
			Err(e) => if e.kind() != std::io::ErrorKind::WouldBlock {
				return Err(e);
			}
		};

		let socket = Self::new(socket)?;
		poll_fn(|cx| socket.io.poll_write_ready(cx)).await?;
		Ok(socket)
	}

	/// Create a pair of connected seqpacket sockets.
	pub fn pair() -> std::io::Result<(Self, Self)> {
		let (a, b) = socket2::Socket::pair(socket2::Domain::unix(), crate::socket_type(), None)?;
		let a = Self::new(a)?;
		let b = Self::new(b)?;
		Ok((a, b))
	}

	/// Get the socket address of the local half of this connection.
	pub fn local_addr(&self) -> std::io::Result<std::os::unix::net::SocketAddr> {
		let addr = self.io.get_ref().local_addr()?;
		Ok(crate::sockaddr_as_unix(&addr).unwrap())
	}

	/// Get the socket address of the remote half of this connection.
	pub fn peer_addr(&self) -> std::io::Result<std::os::unix::net::SocketAddr> {
		let addr = self.io.get_ref().peer_addr()?;
		Ok(crate::sockaddr_as_unix(&addr).unwrap())
	}

	/// Get the effective credentials of the process which called `connect` or `pair`.
	pub fn peer_cred(&self) -> std::io::Result<UCred> {
		crate::ucred::get_peer_cred(self.io.get_ref())
	}

	/// Get the value of the `SO_ERROR` option.
	pub fn take_error(&self) -> std::io::Result<Option<std::io::Error>> {
		self.io.get_ref().take_error()
	}

	/// Send data on the socket to the connected peer.
	pub async fn send(&mut self, buf: &[u8]) -> std::io::Result<usize> {
		poll_fn(|cx| self.poll_send_priv(cx, buf)).await
	}

	/// Receove data on the socket from the connected peer.
	pub async fn recv(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
		poll_fn(|cx| self.poll_recv_priv(cx, buf)).await
	}

	/// Shuts down the read, write, or both halves of this connection.
	///
	/// This function will cause all pending and future I/O calls on the
	/// specified portions to immediately return with an appropriate value
	/// (see the documentation of `Shutdown`).
	pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> {
		self.io.get_ref().shutdown(how)
	}

	pub fn poll_send_priv(&self, cx: &mut Context, buf: &[u8]) -> Poll<std::io::Result<usize>> {
		let ready = self.io.poll_write_ready(cx)?;
		if ready.is_pending() {
			return Poll::Pending;
		}

		match self.io.get_ref().send(buf) {
			Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
				self.io.clear_write_ready(cx)?;
				Poll::Pending
			}
			x => Poll::Ready(x),
		}
	}

	pub fn poll_recv_priv(&self, cx: &mut Context, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
		let ready = self.io.poll_read_ready(cx, Ready::readable())?;
		if ready.is_pending() {
			return Poll::Pending;
		}

		match self.io.get_ref().recv(buf) {
			Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
				self.io.clear_read_ready(cx, Ready::readable())?;
				Poll::Pending
			}
			x => Poll::Ready(x),
		}
	}
}