1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::path::Path;
use tokio::future::poll_fn;
use tokio::io::PollEvented;
use tokio::net::unix::UCred;
use std::task::{Context, Poll};
use ::mio::Ready;
pub struct UnixSeqpacket {
io: PollEvented<crate::mio::EventedSocket>,
}
impl UnixSeqpacket {
pub(crate) fn new(socket: socket2::Socket) -> std::io::Result<Self> {
let socket = crate::mio::EventedSocket::new(socket);
let io = PollEvented::new(socket)?;
Ok(Self { io })
}
pub async fn connect<P: AsRef<Path>>(address: P) -> std::io::Result<Self> {
let address = socket2::SockAddr::unix(address)?;
let socket = socket2::Socket::new(socket2::Domain::unix(), crate::socket_type(), None)?;
match socket.connect(&address) {
Ok(()) => (),
Err(e) => if e.kind() != std::io::ErrorKind::WouldBlock {
return Err(e);
}
};
let socket = Self::new(socket)?;
poll_fn(|cx| socket.io.poll_write_ready(cx)).await?;
Ok(socket)
}
pub fn pair() -> std::io::Result<(Self, Self)> {
let (a, b) = socket2::Socket::pair(socket2::Domain::unix(), crate::socket_type(), None)?;
let a = Self::new(a)?;
let b = Self::new(b)?;
Ok((a, b))
}
pub fn local_addr(&self) -> std::io::Result<std::os::unix::net::SocketAddr> {
let addr = self.io.get_ref().local_addr()?;
Ok(crate::sockaddr_as_unix(&addr).unwrap())
}
pub fn peer_addr(&self) -> std::io::Result<std::os::unix::net::SocketAddr> {
let addr = self.io.get_ref().peer_addr()?;
Ok(crate::sockaddr_as_unix(&addr).unwrap())
}
pub fn peer_cred(&self) -> std::io::Result<UCred> {
crate::ucred::get_peer_cred(self.io.get_ref())
}
pub fn take_error(&self) -> std::io::Result<Option<std::io::Error>> {
self.io.get_ref().take_error()
}
pub async fn send(&mut self, buf: &[u8]) -> std::io::Result<usize> {
poll_fn(|cx| self.poll_send_priv(cx, buf)).await
}
pub async fn recv(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
poll_fn(|cx| self.poll_recv_priv(cx, buf)).await
}
pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> {
self.io.get_ref().shutdown(how)
}
pub fn poll_send_priv(&self, cx: &mut Context, buf: &[u8]) -> Poll<std::io::Result<usize>> {
let ready = self.io.poll_write_ready(cx)?;
if ready.is_pending() {
return Poll::Pending;
}
match self.io.get_ref().send(buf) {
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
self.io.clear_write_ready(cx)?;
Poll::Pending
}
x => Poll::Ready(x),
}
}
pub fn poll_recv_priv(&self, cx: &mut Context, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
let ready = self.io.poll_read_ready(cx, Ready::readable())?;
if ready.is_pending() {
return Poll::Pending;
}
match self.io.get_ref().recv(buf) {
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
self.io.clear_read_ready(cx, Ready::readable())?;
Poll::Pending
}
x => Poll::Ready(x),
}
}
}