1use std::cell::RefCell;
2use std::io;
3use std::io::ErrorKind::WouldBlock;
4use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
5use std::os::unix::net::{self, SocketAddr};
6use std::path::Path;
7use std::time::Duration;
8
9use futures::{Poll, Stream};
10use futures::Async::{NotReady, Ready};
11use futures::task::Task;
12use futures::task;
13use futures_glib::{IoCondition, MainContext, Source, SourceFuncs, UnixToken};
14use glib_sys;
15use nix;
16use nix::sys::socket::{AddressFamily, SockAddr, SockType, UnixAddr, bind, listen, socket, SOCK_NONBLOCK};
17
18use super::{UnixStream, new_source};
19
20impl AsRawFd for UnixListener {
21 fn as_raw_fd(&self) -> RawFd {
22 self.inner.get_ref().listener.as_raw_fd()
23 }
24}
25
26pub struct UnixListener {
27 context: MainContext,
28 inner: Source<UnixListenerInner>,
29}
30
31impl UnixListener {
32 pub fn bind<P: AsRef<Path>>(path: P, context: &MainContext) -> io::Result<Self> {
33 let listener = net::UnixListener::bind(path)?;
34 listener.set_nonblocking(true)?;
35 Ok(Self::from_unix_listener(listener, context))
36 }
37
38 pub fn bind_abstract(name: &[u8], context: &MainContext) -> nix::Result<Self> {
39 let fd = socket(AddressFamily::Unix, SockType::Stream, SOCK_NONBLOCK, 0)?;
40 let addr = SockAddr::Unix(UnixAddr::new_abstract(name)?);
41 bind(fd, &addr)?;
42 listen(fd, 128)?;
43 Ok(Self::from_fd(fd, context))
44 }
45
46 pub fn from_fd(fd: RawFd, context: &MainContext) -> Self {
47 let listener = unsafe { net::UnixListener::from_raw_fd(fd) };
48 Self::from_unix_listener(listener, context)
49 }
50
51 fn from_unix_listener(listener: net::UnixListener, context: &MainContext) -> Self {
52 let fd = listener.as_raw_fd();
53 let mut active = IoCondition::new();
54 active.input(true);
55 let inner = Source::new(UnixListenerInner {
56 active: RefCell::new(active.clone()),
57 listener,
58 task: RefCell::new(None),
59 token: RefCell::new(None),
60 });
61 inner.attach(context);
62
63 let token = inner.unix_add_fd(fd, &active);
65 *inner.get_ref().token.borrow_mut() = Some(token);
66 UnixListener {
67 context: context.clone(),
68 inner,
69 }
70 }
71
72 pub fn accept(&mut self) -> io::Result<(UnixStream, SocketAddr)> {
73 let inner = self.inner.get_ref();
74 match inner.listener.accept() {
75 Err(err) => {
76 if err.kind() == WouldBlock {
77 *inner.task.borrow_mut() = Some(task::current());
78
79 let token = inner.token.borrow();
82 let token = token.as_ref().unwrap();
83 let mut active = inner.active.borrow_mut();
84 active.input(true);
85 unsafe {
86 self.inner.unix_modify_fd(token, &active);
87 }
88 }
89 Err(err)
90 },
91 Ok((socket, addr)) => {
92 socket.set_nonblocking(true)?;
93 let inner = unsafe { new_source(socket.into_raw_fd(), &self.context) };
94 let stream = UnixStream {
95 inner,
96 };
97 Ok((stream, addr))
98 },
99 }
100 }
101
102 pub fn incoming(self) -> Incoming {
103 Incoming {
104 listener: self,
105 }
106 }
107
108 pub fn local_addr(&self) -> io::Result<SocketAddr> {
109 self.inner.get_ref().listener.local_addr()
110 }
111}
112
113pub struct Incoming {
114 listener: UnixListener,
115}
116
117impl Stream for Incoming {
118 type Item = (UnixStream, SocketAddr);
119 type Error = io::Error;
120
121 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
122 match self.listener.accept() {
123 Err(err) => {
124 if err.kind() == WouldBlock {
125 Ok(NotReady)
128 }
129 else {
130 Err(err)
131 }
132 },
133 Ok(val) => {
134 Ok(Ready(Some(val)))
135 },
136 }
137 }
138}
139
140struct UnixListenerInner {
141 active: RefCell<IoCondition>,
142 listener: net::UnixListener,
143 task: RefCell<Option<Task>>,
144 token: RefCell<Option<UnixToken>>,
145}
146
147impl SourceFuncs for UnixListenerInner {
148 type CallbackArg = ();
149
150 fn prepare(&self, _source: &Source<Self>) -> (bool, Option<Duration>) {
151 (false, None)
152 }
153
154 fn check(&self, source: &Source<Self>) -> bool {
155 let token = self.token.borrow();
158 let token = token.as_ref().unwrap();
159 let ready = unsafe { source.unix_query_fd(token) };
160
161 ready.is_input() && self.task.borrow().is_some()
162 }
163
164 fn dispatch(&self, _source: &Source<Self>, _f: glib_sys::GSourceFunc, _data: glib_sys::gpointer) -> bool {
165 if let Some(task) = self.task.borrow_mut().take() {
166 task.notify();
167 }
168 true
169 }
170
171 fn g_source_func<F>() -> glib_sys::GSourceFunc
172 where F: FnMut(Self::CallbackArg) -> bool
173 {
174 panic!()
177 }
178}