async_socket/
socket.rs

1use futures::Future;
2use futures::lock::Mutex;
3use futures::prelude::{AsyncRead, AsyncWrite};
4use futures::stream::Stream;
5use futures::task::Context;
6use std::clone::Clone;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::Poll;
10use crate::State;
11
12/// The asynchronous socket that mimics the network stream.
13#[derive(Debug)]
14pub struct Socket {
15    /// A central socket state which is shared among all the cloned instances.
16    pub state: Arc<Mutex<State>>,
17}
18
19impl Socket {
20    /// Returns a new instance with a specific chunk size.
21    pub fn with_chunk_size(csize: usize) -> Self {
22        Self {
23            state: Arc::new(Mutex::new(State::with_chunk_size(csize))),
24        }
25    }
26}
27
28impl Default for Socket {
29    fn default() -> Self {
30        Self {
31            state: Arc::new(Mutex::new(State::default())),
32        }
33    }
34}
35
36impl Clone for Socket {
37    fn clone(&self) -> Self {
38        Self {
39            state: self.state.clone(),
40        }
41    }
42}
43
44impl AsyncRead for Socket {
45    fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
46        match Pin::new(&mut self.state.lock()).poll(cx) {
47            Poll::Ready(mut state) => {
48                state.waker = Some(cx.waker().clone());
49                
50                let dsize = state.buf.len();
51                let bsize = buf.len();
52                if dsize < bsize {
53                    return Poll::Pending;
54                }
55
56                let data = state.buf.drain(0..bsize).as_slice().to_vec();
57                buf[..bsize].copy_from_slice(&data);
58                Poll::Ready(Ok(bsize))
59            },
60            Poll::Pending => {
61                Poll::Pending
62            },
63        }
64    }
65}
66
67impl AsyncWrite for Socket {
68    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, data: &[u8]) -> Poll<std::io::Result<usize>> {
69        match Pin::new(&mut self.state.lock()).poll(cx) {
70            Poll::Ready(mut state) => {
71                state.buf.append(&mut data.to_vec());
72                state.wake();
73                Poll::Ready(Ok(state.buf.len()))
74            },
75            Poll::Pending => {
76                Poll::Pending
77            },
78        }
79    }
80
81    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
82        Poll::Ready(Ok(()))
83    }
84
85    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
86        Poll::Ready(Ok(()))
87    }
88}
89
90impl Stream for Socket {
91    type Item = Vec<u8>;
92
93    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
94        match Pin::new(&mut self.state.lock()).poll(cx) {
95            Poll::Ready(mut state) => {
96                state.waker = Some(cx.waker().clone());
97                
98                let max = std::cmp::min(state.chunk_size, state.buf.len());
99                let data = state.buf.drain(0..max).as_slice().to_vec();
100                if data.is_empty() {
101                    Poll::Pending
102                } else {
103                    Poll::Ready(Some(data))
104                }
105            },
106            _ => {
107                Poll::Pending
108            },
109        }
110    }
111}