interprocess_docfix/nonblocking/
local_socket.rs1use super::imports::*;
8use crate::local_socket::{self as sync, ToLocalSocketName};
9use std::{
10 io,
11 pin::Pin,
12 sync::Arc,
13 task::{Context, Poll},
14};
15
16#[derive(Debug)]
18pub struct LocalSocketListener {
19 inner: Arc<sync::LocalSocketListener>,
20}
21
22impl LocalSocketListener {
23 pub async fn bind<'a>(name: impl ToLocalSocketName<'_> + Send + 'static) -> io::Result<Self> {
25 Ok(Self {
26 inner: Arc::new(unblock(move || sync::LocalSocketListener::bind(name)).await?),
27 })
28 }
29 pub async fn accept(&self) -> io::Result<LocalSocketStream> {
35 let s = self.inner.clone();
36 Ok(LocalSocketStream {
37 inner: Unblock::new(unblock(move || s.accept()).await?),
38 })
39 }
40 pub fn incoming(&self) -> Incoming {
48 Incoming {
49 inner: Unblock::new(SyncArcIncoming {
50 inner: Arc::clone(&self.inner),
51 }),
52 }
53 }
54}
55
56#[derive(Debug)]
63pub struct Incoming {
64 inner: Unblock<SyncArcIncoming>,
65}
66#[cfg(feature = "nonblocking")]
67impl Stream for Incoming {
68 type Item = Result<LocalSocketStream, io::Error>;
69 fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70 let poll = <Unblock<_> as Stream>::poll_next(Pin::new(&mut self.inner), ctx);
71 match poll {
72 Poll::Ready(val) => {
73 let val = val.map(|val| match val {
74 Ok(inner) => Ok(LocalSocketStream {
75 inner: Unblock::new(inner),
76 }),
77 Err(error) => Err(error),
78 });
79 Poll::Ready(val)
80 }
81 Poll::Pending => Poll::Pending,
82 }
83 }
84}
85#[cfg(feature = "nonblocking")]
86impl FusedStream for Incoming {
87 fn is_terminated(&self) -> bool {
88 false
89 }
90}
91
92#[derive(Debug)]
93struct SyncArcIncoming {
94 inner: Arc<sync::LocalSocketListener>,
95}
96impl Iterator for SyncArcIncoming {
97 type Item = Result<sync::LocalSocketStream, io::Error>;
98 fn next(&mut self) -> Option<Self::Item> {
99 Some(self.inner.accept())
100 }
101}
102
103#[derive(Debug)]
105pub struct LocalSocketStream {
106 inner: Unblock<sync::LocalSocketStream>,
107}
108impl LocalSocketStream {
109 pub async fn connect<'a>(
111 name: impl ToLocalSocketName<'a> + Send + 'static,
112 ) -> io::Result<Self> {
113 Ok(Self {
114 inner: Unblock::new(unblock(move || sync::LocalSocketStream::connect(name)).await?),
115 })
116 }
117}
118
119#[cfg(feature = "nonblocking")]
120impl AsyncRead for LocalSocketStream {
121 fn poll_read(
122 mut self: Pin<&mut Self>,
123 cx: &mut Context<'_>,
124 buf: &mut [u8],
125 ) -> Poll<Result<usize, io::Error>> {
126 AsyncRead::poll_read(Pin::new(&mut self.inner), cx, buf)
127 }
128}
129#[cfg(feature = "nonblocking")]
130impl AsyncWrite for LocalSocketStream {
131 fn poll_write(
132 mut self: Pin<&mut Self>,
133 cx: &mut Context<'_>,
134 buf: &[u8],
135 ) -> Poll<Result<usize, io::Error>> {
136 AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, buf)
137 }
138 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
139 AsyncWrite::poll_flush(Pin::new(&mut self.inner), cx)
140 }
141 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
142 AsyncWrite::poll_close(Pin::new(&mut self.inner), cx)
143 }
144}