toku_connection/
connection.rs1use crate::event_handler::EventHandler;
2use crate::framed_io::ReaderWriter;
3use crate::handler::{Handler, Ready};
4use crate::select_break::StreamExt as SelectBreakStreamExt;
5use crate::sender::Sender;
6use crate::timeout_at;
7use crate::TokuError;
8use failure::Error;
9use futures::channel::mpsc::UnboundedReceiver;
10use futures::channel::oneshot;
11use futures::{Future, StreamExt};
12use toku_protocol::frames::{TokuFrame, Response};
13use std::net::SocketAddr;
14use tokio::net::TcpStream;
15use tokio::task::spawn;
16use tokio::time::interval;
17use tokio::time::Instant;
18
19#[derive(Debug)]
20pub struct Connection<H: Handler> {
21 self_sender: Sender<H::InternalEvent>,
22}
23
24impl<H: Handler> Connection<H> {
25 pub fn spawn_from_address(
35 address: SocketAddr,
36 handler: H,
37 handshake_deadline: Instant,
38 ready_tx: Option<oneshot::Sender<&'static str>>,
39 ) -> Self {
40 let (self_sender, self_rx) = Sender::new();
41 let connection = Self {
42 self_sender: self_sender.clone(),
43 };
44 spawn(async move {
45 match timeout_at(handshake_deadline, TcpStream::connect(&address)).await {
46 Ok(tcp_stream) => {
47 info!("Connected to {}", address);
48 let result = run(
49 tcp_stream,
50 self_sender,
51 self_rx,
52 handler,
53 handshake_deadline,
54 ready_tx,
55 )
56 .await;
57 if let Err(e) = result {
58 warn!("Connection closed. ip={:?} error={:?}", address, e)
59 }
60 }
61 Err(e) => error!("Connect failed. error={:?}", e),
62 };
63 });
64 connection
65 }
66
67 pub fn spawn(
77 tcp_stream: TcpStream,
78 handler: H,
79 handshake_deadline: Instant,
80 ready_tx: Option<oneshot::Sender<&'static str>>,
81 ) -> Self {
82 let (self_sender, self_rx) = Sender::new();
83 let connection = Self {
84 self_sender: self_sender.clone(),
85 };
86 spawn(async move {
87 let ip = tcp_stream.peer_addr();
88 let result = run(
89 tcp_stream,
90 self_sender,
91 self_rx,
92 handler,
93 handshake_deadline,
94 ready_tx,
95 )
96 .await;
97 if let Err(e) = result {
98 warn!("Connection closed. ip={:?} error={:?}", ip, e)
99 }
100 });
101 connection
102 }
103
104 pub fn send(&self, event: H::InternalEvent) -> Result<(), Error> {
105 self.self_sender.internal(event)
106 }
107
108 pub fn close(&self) -> Result<(), Error> {
109 self.self_sender.close()
110 }
111
112 pub fn is_closed(&self) -> bool {
113 self.self_sender.is_closed()
114 }
115}
116
117#[derive(Debug)]
119pub enum Event<InternalEvent: Send + 'static> {
120 SocketReceive(TokuFrame),
122 Ping,
124 InternalEvent(InternalEvent),
126 ResponseComplete(Result<Response, (Error, u32)>),
128 Close,
130}
131
132async fn run<H: Handler>(
145 tcp_stream: TcpStream,
146 self_sender: Sender<H::InternalEvent>,
147 self_rx: UnboundedReceiver<Event<H::InternalEvent>>,
148 handler: H,
149 handshake_deadline: Instant,
150 ready_tx: Option<oneshot::Sender<&'static str>>,
151) -> Result<(), Error> {
152 let (ready, reader_writer, handler) =
153 timeout_at(handshake_deadline, negotiate(tcp_stream, handler, ready_tx)).await?;
154 debug!("Ready. {:?}", ready);
155 let (reader, mut writer) = reader_writer.split();
156
157 let Ready {
158 ping_interval,
159 encoding,
160 } = ready;
161 let ping_stream = interval(ping_interval).map(|_| Ok(Event::Ping));
163 let framed_reader = reader.map(|result| result.map(Event::SocketReceive));
164 let self_rx = self_rx.map(|event| Ok(event));
165
166 let mut stream = framed_reader
167 .select_break(self_rx)
168 .select_break(ping_stream);
169
170 let mut event_handler = EventHandler::new(self_sender, handler, encoding);
171 while let Some(event) = stream.next().await {
172 let event = event?;
173
174 match event_handler.handle_event(event) {
175 Ok(Some(frame)) => writer = writer.write(frame).await?,
176 Ok(None) => {}
177 Err(error) => {
178 writer.close(Some(&error), None).await;
179 return Ok(());
180 }
181 }
182 }
183
184 Err(TokuError::ConnectionClosed.into())
185}
186
187fn negotiate<H: Handler>(
195 tcp_stream: TcpStream,
196 mut handler: H,
197 ready_tx: Option<oneshot::Sender<&'static str>>,
198) -> impl Future<Output = Result<(Ready, ReaderWriter, H), Error>> {
199 async move {
200 let tcp_stream = handler.upgrade(tcp_stream).await?;
201 let max_payload_size = handler.max_payload_size();
202 let reader_writer = ReaderWriter::new(tcp_stream, max_payload_size, H::SEND_GO_AWAY);
203
204 match handler.handshake(reader_writer).await {
205 Ok((ready, reader_writer)) => {
206 if let Some(ready_tx) = ready_tx {
207 ready_tx
208 .send(ready.encoding)
209 .map_err(|_| Error::from(TokuError::ReadySendFailed))?;
210 }
211 Ok((ready, reader_writer, handler))
212 }
213 Err((error, reader_writer)) => {
214 debug!("Not ready. e={:?}", error);
215 if let Some(reader_writer) = reader_writer {
216 reader_writer.close(Some(&error)).await;
217 }
218 Err(error)
219 }
220 }
221 }
222}