libp2prs_mplex/connection/control.rs
1// Copyright 2020 Netwarps Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use futures::{
22 channel::{mpsc, oneshot},
23 SinkExt,
24};
25
26use crate::{
27 connection::{stream::Stream, ControlCommand},
28 error::ConnectionError,
29};
30
31pub struct Control {
32 sender: mpsc::Sender<ControlCommand>,
33}
34
35type Result<T> = std::result::Result<T, ConnectionError>;
36
37impl Control {
38 pub fn new(sender: mpsc::Sender<ControlCommand>) -> Self {
39 Control { sender }
40 }
41
42 /// Open a new stream to the remote.
43 pub async fn open_stream(&mut self) -> Result<Stream> {
44 let (tx, rx) = oneshot::channel();
45 self.sender.send(ControlCommand::OpenStream(tx)).await?;
46 rx.await?
47 }
48
49 /// Accept a new stream from the remote.
50 pub async fn accept_stream(&mut self) -> Result<Stream> {
51 let (tx, rx) = oneshot::channel();
52 self.sender.send(ControlCommand::AcceptStream(tx)).await?;
53 rx.await?
54 }
55
56 /// Close the connection.
57 pub async fn close(&mut self) -> Result<()> {
58 let (tx, rx) = oneshot::channel();
59 if self.sender.send(ControlCommand::CloseConnection(tx)).await.is_err() {
60 // The receiver is closed which means the connection is already closed.
61 return Ok(());
62 }
63 // A dropped `oneshot::Sender` means the `Connection` is gone,
64 // so we do not treat receive errors differently here.
65 let _ = rx.await;
66 Ok(())
67 }
68}
69
70impl Clone for Control {
71 fn clone(&self) -> Self {
72 Control {
73 sender: self.sender.clone(),
74 }
75 }
76}