toku_connection/
connection.rs

1use crate::event_handler::EventHandler;
2use crate::framed_io::ReaderWriter;
3use crate::handler::{Handler, Ready};
4use crate::select_break::StreamExt as SelectBreakStreamExt;
5use crate::sender::Sender;
6use crate::timeout_at;
7use crate::TokuError;
8use failure::Error;
9use futures::channel::mpsc::UnboundedReceiver;
10use futures::channel::oneshot;
11use futures::{Future, StreamExt};
12use toku_protocol::frames::{TokuFrame, Response};
13use std::net::SocketAddr;
14use tokio::net::TcpStream;
15use tokio::task::spawn;
16use tokio::time::interval;
17use tokio::time::Instant;
18
19#[derive(Debug)]
20pub struct Connection<H: Handler> {
21    self_sender: Sender<H::InternalEvent>,
22}
23
24impl<H: Handler> Connection<H> {
25    /// Spawn a new `Connection` that runs in a separate task. Returns a handle for sending to
26    /// the `Connection`.
27    ///
28    /// # Arguments
29    ///
30    /// * `tcp_stream` - the tcp socket
31    /// * `handler` - implements client or server specific logic
32    /// * `handshake_deadline` - how long until we fail due to handshake not completing
33    /// * `ready_tx` - a sender used to notify that the connection is ready for requests
34    pub fn spawn_from_address(
35        address: SocketAddr,
36        handler: H,
37        handshake_deadline: Instant,
38        ready_tx: Option<oneshot::Sender<&'static str>>,
39    ) -> Self {
40        let (self_sender, self_rx) = Sender::new();
41        let connection = Self {
42            self_sender: self_sender.clone(),
43        };
44        spawn(async move {
45            match timeout_at(handshake_deadline, TcpStream::connect(&address)).await {
46                Ok(tcp_stream) => {
47                    info!("Connected to {}", address);
48                    let result = run(
49                        tcp_stream,
50                        self_sender,
51                        self_rx,
52                        handler,
53                        handshake_deadline,
54                        ready_tx,
55                    )
56                    .await;
57                    if let Err(e) = result {
58                        warn!("Connection closed. ip={:?} error={:?}", address, e)
59                    }
60                }
61                Err(e) => error!("Connect failed. error={:?}", e),
62            };
63        });
64        connection
65    }
66
67    /// Spawn a new `Connection` that runs in a separate task. Returns a handle for sending to
68    /// the `Connection`.
69    ///
70    /// # Arguments
71    ///
72    /// * `tcp_stream` - the tcp socket
73    /// * `handler` - implements client or server specific logic
74    /// * `handshake_deadline` - how long until we fail due to handshake not completing
75    /// * `ready_tx` - a sender used to notify that the connection is ready for requests
76    pub fn spawn(
77        tcp_stream: TcpStream,
78        handler: H,
79        handshake_deadline: Instant,
80        ready_tx: Option<oneshot::Sender<&'static str>>,
81    ) -> Self {
82        let (self_sender, self_rx) = Sender::new();
83        let connection = Self {
84            self_sender: self_sender.clone(),
85        };
86        spawn(async move {
87            let ip = tcp_stream.peer_addr();
88            let result = run(
89                tcp_stream,
90                self_sender,
91                self_rx,
92                handler,
93                handshake_deadline,
94                ready_tx,
95            )
96            .await;
97            if let Err(e) = result {
98                warn!("Connection closed. ip={:?} error={:?}", ip, e)
99            }
100        });
101        connection
102    }
103
104    pub fn send(&self, event: H::InternalEvent) -> Result<(), Error> {
105        self.self_sender.internal(event)
106    }
107
108    pub fn close(&self) -> Result<(), Error> {
109        self.self_sender.close()
110    }
111
112    pub fn is_closed(&self) -> bool {
113        self.self_sender.is_closed()
114    }
115}
116
117/// The events that can be received by the core connection loop once it begins running.
118#[derive(Debug)]
119pub enum Event<InternalEvent: Send + 'static> {
120    /// A full frame was received on the socket.
121    SocketReceive(TokuFrame),
122    /// A ping should be sent.
123    Ping,
124    /// Generic event that will be delegated to the connection handler.
125    InternalEvent(InternalEvent),
126    /// A response for a request was computed and should be sent back over the socket.
127    ResponseComplete(Result<Response, (Error, u32)>),
128    /// Close the connection gracefully.
129    Close,
130}
131
132/// The core run loop for a connection.
133/// Negotiates the connection then handles events until the socket dies or there is an error.
134///
135/// # Arguments
136///
137/// * `tcp_stream` - the tcp socket
138/// * `self_sender` - a sender that is used to for the connection to enqueue an event to itself.
139///                   This is used when a response for a request is computed asynchronously in a task.
140/// * `self_rx` - a receiver that InternalEvents will be sent over
141/// * `handler` - implements logic for the client or server specific things
142/// * `handshake_deadline` - how long until we fail due to handshake not completing
143/// * `ready_tx` - a sender used to notify that the connection is ready for requests
144async fn run<H: Handler>(
145    tcp_stream: TcpStream,
146    self_sender: Sender<H::InternalEvent>,
147    self_rx: UnboundedReceiver<Event<H::InternalEvent>>,
148    handler: H,
149    handshake_deadline: Instant,
150    ready_tx: Option<oneshot::Sender<&'static str>>,
151) -> Result<(), Error> {
152    let (ready, reader_writer, handler) =
153        timeout_at(handshake_deadline, negotiate(tcp_stream, handler, ready_tx)).await?;
154    debug!("Ready. {:?}", ready);
155    let (reader, mut writer) = reader_writer.split();
156
157    let Ready {
158        ping_interval,
159        encoding,
160    } = ready;
161    // Convert each stream into a Result<Event, Error> stream.
162    let ping_stream = interval(ping_interval).map(|_| Ok(Event::Ping));
163    let framed_reader = reader.map(|result| result.map(Event::SocketReceive));
164    let self_rx = self_rx.map(|event| Ok(event));
165
166    let mut stream = framed_reader
167        .select_break(self_rx)
168        .select_break(ping_stream);
169
170    let mut event_handler = EventHandler::new(self_sender, handler, encoding);
171    while let Some(event) = stream.next().await {
172        let event = event?;
173
174        match event_handler.handle_event(event) {
175            Ok(Some(frame)) => writer = writer.write(frame).await?,
176            Ok(None) => {}
177            Err(error) => {
178                writer.close(Some(&error), None).await;
179                return Ok(());
180            }
181        }
182    }
183
184    Err(TokuError::ConnectionClosed.into())
185}
186
187/// Negotiates the connection.
188///
189/// # Arguments
190///
191/// * `tcp_stream` - the tcp socket
192/// * `handler` - implements logic for the client or server specific things
193/// * `ready_tx` - a sender used to notify that the connection is ready for requests
194fn negotiate<H: Handler>(
195    tcp_stream: TcpStream,
196    mut handler: H,
197    ready_tx: Option<oneshot::Sender<&'static str>>,
198) -> impl Future<Output = Result<(Ready, ReaderWriter, H), Error>> {
199    async move {
200        let tcp_stream = handler.upgrade(tcp_stream).await?;
201        let max_payload_size = handler.max_payload_size();
202        let reader_writer = ReaderWriter::new(tcp_stream, max_payload_size, H::SEND_GO_AWAY);
203
204        match handler.handshake(reader_writer).await {
205            Ok((ready, reader_writer)) => {
206                if let Some(ready_tx) = ready_tx {
207                    ready_tx
208                        .send(ready.encoding)
209                        .map_err(|_| Error::from(TokuError::ReadySendFailed))?;
210                }
211                Ok((ready, reader_writer, handler))
212            }
213            Err((error, reader_writer)) => {
214                debug!("Not ready. e={:?}", error);
215                if let Some(reader_writer) = reader_writer {
216                    reader_writer.close(Some(&error)).await;
217                }
218                Err(error)
219            }
220        }
221    }
222}