bluez_async_ots/
l2cap.rs

1use core::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5use ots_core::l2cap::{self, L2capSockAddr};
6use std::{
7    io::Result,
8    os::fd::{AsRawFd, RawFd},
9};
10
11pub struct L2capSocket {
12    inner: l2cap::L2capSocket,
13}
14
15impl core::ops::Deref for L2capSocket {
16    type Target = l2cap::L2capSocket;
17
18    fn deref(&self) -> &Self::Target {
19        &self.inner
20    }
21}
22
23impl core::ops::DerefMut for L2capSocket {
24    fn deref_mut(&mut self) -> &mut Self::Target {
25        &mut self.inner
26    }
27}
28
29impl L2capSocket {
30    pub fn new(type_: l2cap::SocketType) -> Result<Self> {
31        l2cap::L2capSocket::new(type_).map(|inner| Self { inner })
32    }
33
34    pub async fn connect(self, sockaddr: &L2capSockAddr) -> Result<L2capStream> {
35        self.inner.connect(sockaddr)?;
36        self.inner.set_nonblocking(true)?;
37        let inner = tokio::io::unix::AsyncFd::new(self)?;
38
39        // Once we've connected, wait for the stream to be writable as
40        // that's when the actual connection has been initiated. Once we're
41        // writable we check for `take_socket_error` to see if the connect
42        // actually hit an error or not.
43        //
44        // If all that succeeded then we ship everything on up.
45        let _ = core::future::poll_fn(|cx| inner.poll_write_ready(cx)).await?;
46
47        if let Some(e) = inner.get_ref().inner.take_error()? {
48            return Err(e);
49        }
50
51        Ok(L2capStream { inner })
52    }
53}
54
55impl AsRawFd for L2capSocket {
56    fn as_raw_fd(&self) -> RawFd {
57        self.inner.as_raw_fd()
58    }
59}
60
61pub struct L2capStream {
62    inner: tokio::io::unix::AsyncFd<L2capSocket>,
63}
64
65impl core::ops::Deref for L2capStream {
66    type Target = L2capSocket;
67    fn deref(&self) -> &Self::Target {
68        self.inner.get_ref()
69    }
70}
71
72impl tokio::io::AsyncRead for L2capStream {
73    fn poll_read(
74        mut self: Pin<&mut Self>,
75        cx: &mut Context<'_>,
76        buf: &mut tokio::io::ReadBuf<'_>,
77    ) -> Poll<std::io::Result<()>> {
78        use std::io::Read;
79
80        loop {
81            let mut guard = futures::ready!(self.inner.poll_read_ready_mut(cx)?);
82
83            let unfilled = buf.initialize_unfilled();
84            match guard.try_io(|inner| {
85                inner
86                    .get_mut()
87                    .inner
88                    .read(unsafe { &mut *(unfilled as *mut _ as *mut _) })
89            }) {
90                Ok(Ok(len)) => {
91                    buf.advance(len);
92                    return Poll::Ready(Ok(()));
93                }
94                Ok(Err(err)) => return Poll::Ready(Err(err)),
95                Err(_would_block) => continue,
96            }
97        }
98    }
99}
100
101impl tokio::io::AsyncWrite for L2capStream {
102    fn poll_write(
103        self: Pin<&mut Self>,
104        cx: &mut Context<'_>,
105        buf: &[u8],
106    ) -> Poll<std::io::Result<usize>> {
107        //use std::io::Write;
108
109        loop {
110            let mut guard = futures::ready!(self.inner.poll_write_ready(cx))?;
111
112            match guard.try_io(|inner| inner.get_ref().inner.send(buf)) {
113                Ok(result) => return Poll::Ready(result),
114                Err(_would_block) => continue,
115            }
116        }
117    }
118
119    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
120        // tcp flush is a no-op
121        Poll::Ready(Ok(()))
122    }
123
124    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
125        self.inner
126            .get_ref()
127            .inner
128            .shutdown(std::net::Shutdown::Write)?;
129        Poll::Ready(Ok(()))
130    }
131}