interprocess_docfix/nonblocking/
local_socket.rs

1//! Asynchronous local sockets.
2//!
3//! See the [blocking version of this module] for more on what those are.
4//!
5//! [blocking version of this module]: ../../local_socket/index.html " "
6
7use super::imports::*;
8use crate::local_socket::{self as sync, ToLocalSocketName};
9use std::{
10    io,
11    pin::Pin,
12    sync::Arc,
13    task::{Context, Poll},
14};
15
16/// An asynchronous local socket server, listening for connections.
17#[derive(Debug)]
18pub struct LocalSocketListener {
19    inner: Arc<sync::LocalSocketListener>,
20}
21
22impl LocalSocketListener {
23    /// Creates a socket server with the specified local socket name.
24    pub async fn bind<'a>(name: impl ToLocalSocketName<'_> + Send + 'static) -> io::Result<Self> {
25        Ok(Self {
26            inner: Arc::new(unblock(move || sync::LocalSocketListener::bind(name)).await?),
27        })
28    }
29    /// Listens for incoming connections to the socket, blocking until a client is connected.
30    ///
31    /// See [`incoming`] for a convenient way to create a main loop for a server.
32    ///
33    /// [`incoming`]: #method.incoming " "
34    pub async fn accept(&self) -> io::Result<LocalSocketStream> {
35        let s = self.inner.clone();
36        Ok(LocalSocketStream {
37            inner: Unblock::new(unblock(move || s.accept()).await?),
38        })
39    }
40    /// Creates an infinite asynchronous stream which calls `accept()` with each iteration. Used together with [`for_each`]/[`try_for_each`] stream adaptors to conveniently create a main loop for a socket server.
41    ///
42    /// # Example
43    /// See struct-level documentation for a complete example which already uses this method.
44    ///
45    /// [`for_each`]: https://docs.rs/futures/*/futures/stream/trait.StreamExt.html#method.for_each " "
46    /// [`try_for_each`]: https://docs.rs/futures/*/futures/stream/trait.TryStreamExt.html#method.try_for_each " "
47    pub fn incoming(&self) -> Incoming {
48        Incoming {
49            inner: Unblock::new(SyncArcIncoming {
50                inner: Arc::clone(&self.inner),
51            }),
52        }
53    }
54}
55
56/// An infinite asynchronous stream over incoming client connections of a [`LocalSocketListener`].
57///
58/// This stream is created by the [`incoming`] method on [`LocalSocketListener`] – see its documentation for more.
59///
60/// [`LocalSocketListener`]: struct.LocalSocketListener.html " "
61/// [`incoming`]: struct.LocalSocketListener.html#method.incoming " "
62#[derive(Debug)]
63pub struct Incoming {
64    inner: Unblock<SyncArcIncoming>,
65}
66#[cfg(feature = "nonblocking")]
67impl Stream for Incoming {
68    type Item = Result<LocalSocketStream, io::Error>;
69    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70        let poll = <Unblock<_> as Stream>::poll_next(Pin::new(&mut self.inner), ctx);
71        match poll {
72            Poll::Ready(val) => {
73                let val = val.map(|val| match val {
74                    Ok(inner) => Ok(LocalSocketStream {
75                        inner: Unblock::new(inner),
76                    }),
77                    Err(error) => Err(error),
78                });
79                Poll::Ready(val)
80            }
81            Poll::Pending => Poll::Pending,
82        }
83    }
84}
85#[cfg(feature = "nonblocking")]
86impl FusedStream for Incoming {
87    fn is_terminated(&self) -> bool {
88        false
89    }
90}
91
92#[derive(Debug)]
93struct SyncArcIncoming {
94    inner: Arc<sync::LocalSocketListener>,
95}
96impl Iterator for SyncArcIncoming {
97    type Item = Result<sync::LocalSocketStream, io::Error>;
98    fn next(&mut self) -> Option<Self::Item> {
99        Some(self.inner.accept())
100    }
101}
102
103/// An asynchronous local socket byte stream, obtained eiter from [`LocalSocketListener`] or by connecting to an existing local socket.
104#[derive(Debug)]
105pub struct LocalSocketStream {
106    inner: Unblock<sync::LocalSocketStream>,
107}
108impl LocalSocketStream {
109    /// Connects to a remote local socket server.
110    pub async fn connect<'a>(
111        name: impl ToLocalSocketName<'a> + Send + 'static,
112    ) -> io::Result<Self> {
113        Ok(Self {
114            inner: Unblock::new(unblock(move || sync::LocalSocketStream::connect(name)).await?),
115        })
116    }
117}
118
119#[cfg(feature = "nonblocking")]
120impl AsyncRead for LocalSocketStream {
121    fn poll_read(
122        mut self: Pin<&mut Self>,
123        cx: &mut Context<'_>,
124        buf: &mut [u8],
125    ) -> Poll<Result<usize, io::Error>> {
126        AsyncRead::poll_read(Pin::new(&mut self.inner), cx, buf)
127    }
128}
129#[cfg(feature = "nonblocking")]
130impl AsyncWrite for LocalSocketStream {
131    fn poll_write(
132        mut self: Pin<&mut Self>,
133        cx: &mut Context<'_>,
134        buf: &[u8],
135    ) -> Poll<Result<usize, io::Error>> {
136        AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, buf)
137    }
138    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
139        AsyncWrite::poll_flush(Pin::new(&mut self.inner), cx)
140    }
141    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
142        AsyncWrite::poll_close(Pin::new(&mut self.inner), cx)
143    }
144}