1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use super::connection::{Connection, ConnectionState};
use tokio_io::{AsyncRead, AsyncWrite};
use super::Handler;
use {ChannelId, HandlerError, Error, Status, AtomicPoll};
use futures::{Poll, Async, Future};
use tcp::Tcp;
impl<R: AsyncRead + AsyncWrite, H: Handler> Connection<R, H> {
pub fn data<T: AsRef<[u8]>>(
self,
channel: ChannelId,
extended: Option<u32>,
data: T,
) -> Data<R, H, T> {
debug!("data: {:?}", data.as_ref().len());
Data {
connection: Some(self),
channel: channel,
extended: extended,
data: Some(data),
position: 0,
first_round: true,
}
}
}
pub struct Data<R: AsyncRead + AsyncWrite, H: Handler, T: AsRef<[u8]>> {
connection: Option<Connection<R, H>>,
data: Option<T>,
extended: Option<u32>,
channel: ChannelId,
position: usize,
first_round: bool,
}
impl<R: AsyncRead + AsyncWrite + Tcp, H: Handler, T: AsRef<[u8]>> Future for Data<R, H, T> {
type Item = (Connection<R, H>, T);
type Error = HandlerError<H::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut connection = self.connection.take().unwrap();
if self.first_round {
connection.abort_read()?;
self.first_round = false
}
let data = self.data.take().unwrap();
loop {
debug!("Data loop");
let status = connection.atomic_poll()?;
let mut not_ready = false;
match status {
Async::Ready(Status::Disconnect) => return Err(From::from(Error::Disconnect)),
Async::Ready(Status::Ok) if connection.is_reading() => {}
Async::Ready(Status::Ok) => continue,
Async::NotReady if connection.is_reading() => not_ready = true,
Async::NotReady => {
self.connection = Some(connection);
self.data = Some(data);
return Ok(Async::NotReady);
}
}
let mut session = connection.session.take().unwrap();
{
let data_ = data.as_ref();
let enc = session.0.encrypted.as_mut().unwrap();
self.position += enc.data(self.channel, self.extended, &data_[self.position..]);
}
session.flush()?;
if !session.0.write_buffer.buffer.is_empty() {
if let Some(ConnectionState::Read(mut read)) = connection.state {
if let Some((stream, read_buffer)) = read.try_abort() {
connection.read_buffer = Some(read_buffer);
connection.state = Some(ConnectionState::Write(
session.0.write_buffer.write_all(stream),
));
connection.session = Some(session);
} else {
connection.state = Some(ConnectionState::Read(read));
connection.session = Some(session);
}
} else {
connection.session = Some(session);
}
} else if self.position < data.as_ref().len() {
connection.session = Some(session);
if not_ready {
self.connection = Some(connection);
self.data = Some(data);
return Ok(Async::NotReady);
}
} else {
connection.session = Some(session);
return Ok(Async::Ready((connection, data)));
}
}
}
}