libp2prs_mplex/connection/
control.rs

1// Copyright 2020 Netwarps Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use futures::{
22    channel::{mpsc, oneshot},
23    SinkExt,
24};
25
26use crate::{
27    connection::{stream::Stream, ControlCommand},
28    error::ConnectionError,
29};
30
31pub struct Control {
32    sender: mpsc::Sender<ControlCommand>,
33}
34
35type Result<T> = std::result::Result<T, ConnectionError>;
36
37impl Control {
38    pub fn new(sender: mpsc::Sender<ControlCommand>) -> Self {
39        Control { sender }
40    }
41
42    /// Open a new stream to the remote.
43    pub async fn open_stream(&mut self) -> Result<Stream> {
44        let (tx, rx) = oneshot::channel();
45        self.sender.send(ControlCommand::OpenStream(tx)).await?;
46        rx.await?
47    }
48
49    /// Accept a new stream from the remote.
50    pub async fn accept_stream(&mut self) -> Result<Stream> {
51        let (tx, rx) = oneshot::channel();
52        self.sender.send(ControlCommand::AcceptStream(tx)).await?;
53        rx.await?
54    }
55
56    /// Close the connection.
57    pub async fn close(&mut self) -> Result<()> {
58        let (tx, rx) = oneshot::channel();
59        if self.sender.send(ControlCommand::CloseConnection(tx)).await.is_err() {
60            // The receiver is closed which means the connection is already closed.
61            return Ok(());
62        }
63        // A dropped `oneshot::Sender` means the `Connection` is gone,
64        // so we do not treat receive errors differently here.
65        let _ = rx.await;
66        Ok(())
67    }
68}
69
70impl Clone for Control {
71    fn clone(&self) -> Self {
72        Control {
73            sender: self.sender.clone(),
74        }
75    }
76}