use crate::{
connector::{
ButtplugConnectorError,
ButtplugConnectorResultFuture,
transport::{ButtplugConnectorTransport, ButtplugTransportIncomingMessage},
},
message::serializer::ButtplugSerializedMessage,
};
use futures::{
FutureExt,
future::{self, BoxFuture},
};
use std::sync::Arc;
use tokio::{
select,
sync::{
Mutex,
mpsc::{Receiver, Sender},
},
};
#[derive(Debug)]
pub struct ButtplugStreamTransport {
sender: Sender<ButtplugSerializedMessage>,
receiver: Arc<Mutex<Option<Receiver<ButtplugSerializedMessage>>>>,
}
impl ButtplugStreamTransport {
pub fn new(
sender: Sender<ButtplugSerializedMessage>,
receiver: Receiver<ButtplugSerializedMessage>,
) -> Self {
Self {
sender,
receiver: Arc::new(Mutex::new(Some(receiver))),
}
}
}
impl ButtplugConnectorTransport for ButtplugStreamTransport {
fn connect(
&self,
mut outgoing_receiver: Receiver<ButtplugSerializedMessage>,
incoming_sender: Sender<ButtplugTransportIncomingMessage>,
) -> BoxFuture<'static, Result<(), ButtplugConnectorError>> {
let incoming_recv = self.receiver.clone();
let sender = self.sender.clone();
async move {
let mut incoming_recv = incoming_recv
.lock()
.await
.take()
.ok_or(ButtplugConnectorError::ConnectorAlreadyConnected)?;
crate::spawn!("ButtplugStreamTransport", async move {
loop {
select! {
msg = outgoing_receiver.recv() => {
match msg {
Some(m) => {
if sender.send(m).await.is_err() {
break;
}
}
None => break
}
},
msg = incoming_recv.recv() => {
match msg {
Some(m) => {
if incoming_sender.send(ButtplugTransportIncomingMessage::Message(m)).await.is_err() {
break;
}
}
None => break
}
}
}
}
});
Ok(())
}.boxed()
}
fn disconnect(self) -> ButtplugConnectorResultFuture {
future::ready(Ok(())).boxed()
}
}