1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
use futures::{
channel::{mpsc, oneshot},
sink::SinkExt,
};
use crate::{error::Error, stream::StreamHandle};
pub(crate) enum Command {
OpenStream(oneshot::Sender<Result<StreamHandle, Error>>),
Shutdown(oneshot::Sender<()>),
}
#[derive(Clone)]
pub struct Control(mpsc::Sender<Command>);
impl Control {
pub(crate) fn new(sender: mpsc::Sender<Command>) -> Self {
Control(sender)
}
pub async fn open_stream(&mut self) -> Result<StreamHandle, Error> {
let (tx, rx) = oneshot::channel();
self.0
.send(Command::OpenStream(tx))
.await
.map_err(|_| Error::SessionShutdown)?;
rx.await.map_err(|_| Error::SessionShutdown)?
}
pub async fn close(&mut self) {
if self.0.is_closed() {
return;
}
let (tx, rx) = oneshot::channel();
let _ignore = self.0.send(Command::Shutdown(tx)).await;
let _ignore = rx.await;
}
}