buttplug_core 10.0.2

Buttplug Intimate Hardware Control Library - Core Library
// Buttplug Rust Source Code File - See https://buttplug.io for more info.
//
// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved.
//
// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
// for full license information.

//! Stream based transport, used in cases where we may need to hop FFI boundaries within the same
//! process space.

use crate::{
  connector::{
    ButtplugConnectorError,
    ButtplugConnectorResultFuture,
    transport::{ButtplugConnectorTransport, ButtplugTransportIncomingMessage},
  },
  message::serializer::ButtplugSerializedMessage,
};
use futures::{
  FutureExt,
  future::{self, BoxFuture},
};

use std::sync::Arc;
use tokio::{
  select,
  sync::{
    Mutex,
    mpsc::{Receiver, Sender},
  },
};

#[derive(Debug)]
pub struct ButtplugStreamTransport {
  sender: Sender<ButtplugSerializedMessage>,
  receiver: Arc<Mutex<Option<Receiver<ButtplugSerializedMessage>>>>,
}

impl ButtplugStreamTransport {
  pub fn new(
    sender: Sender<ButtplugSerializedMessage>,
    receiver: Receiver<ButtplugSerializedMessage>,
  ) -> Self {
    Self {
      sender,
      receiver: Arc::new(Mutex::new(Some(receiver))),
    }
  }
}

impl ButtplugConnectorTransport for ButtplugStreamTransport {
  fn connect(
    &self,
    mut outgoing_receiver: Receiver<ButtplugSerializedMessage>,
    incoming_sender: Sender<ButtplugTransportIncomingMessage>,
  ) -> BoxFuture<'static, Result<(), ButtplugConnectorError>> {
    let incoming_recv = self.receiver.clone();
    let sender = self.sender.clone();
    async move {
      let mut incoming_recv = incoming_recv
        .lock()
        .await
        .take()
        .ok_or(ButtplugConnectorError::ConnectorAlreadyConnected)?;
      crate::spawn!("ButtplugStreamTransport", async move {
        loop {
          select! {
            msg = outgoing_receiver.recv() => {
              match msg {
                Some(m) => {
                  if sender.send(m).await.is_err() {
                    break;
                  }
                }
                None => break
              }
            },
            msg = incoming_recv.recv() => {
              match msg {
                Some(m) => {
                  if incoming_sender.send(ButtplugTransportIncomingMessage::Message(m)).await.is_err() {
                    break;
                  }
                }
                None => break
              }
            }
          }
        }
      });
      Ok(())
    }.boxed()
  }

  fn disconnect(self) -> ButtplugConnectorResultFuture {
    future::ready(Ok(())).boxed()
  }
}