fg_uds/
listener.rs

1use std::cell::RefCell;
2use std::io;
3use std::io::ErrorKind::WouldBlock;
4use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
5use std::os::unix::net::{self, SocketAddr};
6use std::path::Path;
7use std::time::Duration;
8
9use futures::{Poll, Stream};
10use futures::Async::{NotReady, Ready};
11use futures::task::Task;
12use futures::task;
13use futures_glib::{IoCondition, MainContext, Source, SourceFuncs, UnixToken};
14use glib_sys;
15use nix;
16use nix::sys::socket::{AddressFamily, SockAddr, SockType, UnixAddr, bind, listen, socket, SOCK_NONBLOCK};
17
18use super::{UnixStream, new_source};
19
20impl AsRawFd for UnixListener {
21    fn as_raw_fd(&self) -> RawFd {
22        self.inner.get_ref().listener.as_raw_fd()
23    }
24}
25
26pub struct UnixListener {
27    context: MainContext,
28    inner: Source<UnixListenerInner>,
29}
30
31impl UnixListener {
32    pub fn bind<P: AsRef<Path>>(path: P, context: &MainContext) -> io::Result<Self> {
33        let listener = net::UnixListener::bind(path)?;
34        listener.set_nonblocking(true)?;
35        Ok(Self::from_unix_listener(listener, context))
36    }
37
38    pub fn bind_abstract(name: &[u8], context: &MainContext) -> nix::Result<Self> {
39        let fd = socket(AddressFamily::Unix, SockType::Stream, SOCK_NONBLOCK, 0)?;
40        let addr = SockAddr::Unix(UnixAddr::new_abstract(name)?);
41        bind(fd, &addr)?;
42        listen(fd, 128)?;
43        Ok(Self::from_fd(fd, context))
44    }
45
46    pub fn from_fd(fd: RawFd, context: &MainContext) -> Self {
47        let listener = unsafe { net::UnixListener::from_raw_fd(fd) };
48        Self::from_unix_listener(listener, context)
49    }
50
51    fn from_unix_listener(listener: net::UnixListener, context: &MainContext) -> Self {
52        let fd = listener.as_raw_fd();
53        let mut active = IoCondition::new();
54        active.input(true);
55        let inner = Source::new(UnixListenerInner {
56            active: RefCell::new(active.clone()),
57            listener,
58            task: RefCell::new(None),
59            token: RefCell::new(None),
60        });
61        inner.attach(context);
62
63        // Add the file descriptor to the source so it knows what we're tracking.
64        let token = inner.unix_add_fd(fd, &active);
65        *inner.get_ref().token.borrow_mut() = Some(token);
66        UnixListener {
67            context: context.clone(),
68            inner,
69        }
70    }
71
72    pub fn accept(&mut self) -> io::Result<(UnixStream, SocketAddr)> {
73        let inner = self.inner.get_ref();
74        match inner.listener.accept() {
75            Err(err) => {
76                if err.kind() == WouldBlock {
77                    *inner.task.borrow_mut() = Some(task::current());
78
79                    // Be sure to update the IoCondition that we're interested so we can get
80                    // events related to this condition.
81                    let token = inner.token.borrow();
82                    let token = token.as_ref().unwrap();
83                    let mut active = inner.active.borrow_mut();
84                    active.input(true);
85                    unsafe {
86                        self.inner.unix_modify_fd(token, &active);
87                    }
88                }
89                Err(err)
90            },
91            Ok((socket, addr)) => {
92                socket.set_nonblocking(true)?;
93                let inner = unsafe { new_source(socket.into_raw_fd(), &self.context) };
94                let stream = UnixStream {
95                    inner,
96                };
97                Ok((stream, addr))
98            },
99        }
100    }
101
102    pub fn incoming(self) -> Incoming {
103        Incoming {
104            listener: self,
105        }
106    }
107
108    pub fn local_addr(&self) -> io::Result<SocketAddr> {
109        self.inner.get_ref().listener.local_addr()
110    }
111}
112
113pub struct Incoming {
114    listener: UnixListener,
115}
116
117impl Stream for Incoming {
118    type Item = (UnixStream, SocketAddr);
119    type Error = io::Error;
120
121    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
122        match self.listener.accept() {
123            Err(err) => {
124                if err.kind() == WouldBlock {
125                    // The current task was registered to receive a notification in the accept()
126                    // method.
127                    Ok(NotReady)
128                }
129                else {
130                    Err(err)
131                }
132            },
133            Ok(val) => {
134                Ok(Ready(Some(val)))
135            },
136        }
137    }
138}
139
140struct UnixListenerInner {
141    active: RefCell<IoCondition>,
142    listener: net::UnixListener,
143    task: RefCell<Option<Task>>,
144    token: RefCell<Option<UnixToken>>,
145}
146
147impl SourceFuncs for UnixListenerInner {
148    type CallbackArg = ();
149
150    fn prepare(&self, _source: &Source<Self>) -> (bool, Option<Duration>) {
151        (false, None)
152    }
153
154    fn check(&self, source: &Source<Self>) -> bool {
155        // Test to see whether the events on the fd indicate that we're ready to
156        // do some work.
157        let token = self.token.borrow();
158        let token = token.as_ref().unwrap();
159        let ready = unsafe { source.unix_query_fd(token) };
160
161        ready.is_input() && self.task.borrow().is_some()
162    }
163
164    fn dispatch(&self, _source: &Source<Self>, _f: glib_sys::GSourceFunc, _data: glib_sys::gpointer) -> bool {
165        if let Some(task) = self.task.borrow_mut().take() {
166            task.notify();
167        }
168        true
169    }
170
171    fn g_source_func<F>() -> glib_sys::GSourceFunc
172    where F: FnMut(Self::CallbackArg) -> bool
173    {
174        // we never register a callback on this source, so no need to implement
175        // this
176        panic!()
177    }
178}