1use futures::Future;
2use futures::lock::Mutex;
3use futures::prelude::{AsyncRead, AsyncWrite};
4use futures::stream::Stream;
5use futures::task::Context;
6use std::clone::Clone;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::Poll;
10use crate::State;
11
12#[derive(Debug)]
14pub struct Socket {
15 pub state: Arc<Mutex<State>>,
17}
18
19impl Socket {
20 pub fn with_chunk_size(csize: usize) -> Self {
22 Self {
23 state: Arc::new(Mutex::new(State::with_chunk_size(csize))),
24 }
25 }
26}
27
28impl Default for Socket {
29 fn default() -> Self {
30 Self {
31 state: Arc::new(Mutex::new(State::default())),
32 }
33 }
34}
35
36impl Clone for Socket {
37 fn clone(&self) -> Self {
38 Self {
39 state: self.state.clone(),
40 }
41 }
42}
43
44impl AsyncRead for Socket {
45 fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
46 match Pin::new(&mut self.state.lock()).poll(cx) {
47 Poll::Ready(mut state) => {
48 state.waker = Some(cx.waker().clone());
49
50 let dsize = state.buf.len();
51 let bsize = buf.len();
52 if dsize < bsize {
53 return Poll::Pending;
54 }
55
56 let data = state.buf.drain(0..bsize).as_slice().to_vec();
57 buf[..bsize].copy_from_slice(&data);
58 Poll::Ready(Ok(bsize))
59 },
60 Poll::Pending => {
61 Poll::Pending
62 },
63 }
64 }
65}
66
67impl AsyncWrite for Socket {
68 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, data: &[u8]) -> Poll<std::io::Result<usize>> {
69 match Pin::new(&mut self.state.lock()).poll(cx) {
70 Poll::Ready(mut state) => {
71 state.buf.append(&mut data.to_vec());
72 state.wake();
73 Poll::Ready(Ok(state.buf.len()))
74 },
75 Poll::Pending => {
76 Poll::Pending
77 },
78 }
79 }
80
81 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
82 Poll::Ready(Ok(()))
83 }
84
85 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
86 Poll::Ready(Ok(()))
87 }
88}
89
90impl Stream for Socket {
91 type Item = Vec<u8>;
92
93 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
94 match Pin::new(&mut self.state.lock()).poll(cx) {
95 Poll::Ready(mut state) => {
96 state.waker = Some(cx.waker().clone());
97
98 let max = std::cmp::min(state.chunk_size, state.buf.len());
99 let data = state.buf.drain(0..max).as_slice().to_vec();
100 if data.is_empty() {
101 Poll::Pending
102 } else {
103 Poll::Ready(Some(data))
104 }
105 },
106 _ => {
107 Poll::Pending
108 },
109 }
110 }
111}