intiface_engine/frontend/
mod.rs

1pub mod process_messages;
2use crate::error::IntifaceError;
3use crate::remote_server::ButtplugRemoteServerEvent;
4use async_trait::async_trait;
5use futures::{pin_mut, Stream, StreamExt};
6pub use process_messages::{EngineMessage, IntifaceMessage};
7use std::sync::Arc;
8use tokio::{
9  select,
10  sync::{broadcast, Notify},
11};
12use tokio_util::sync::CancellationToken;
13
14const VERSION: &str = env!("CARGO_PKG_VERSION");
15
16#[async_trait]
17pub trait Frontend: Sync + Send {
18  async fn send(&self, msg: EngineMessage);
19  async fn connect(&self) -> Result<(), IntifaceError>;
20  fn disconnect_notifier(&self) -> Arc<Notify>;
21  fn disconnect(&self);
22  fn event_stream(&self) -> broadcast::Receiver<IntifaceMessage>;
23}
24
25pub async fn frontend_external_event_loop(
26  frontend: Arc<dyn Frontend>,
27  connection_cancellation_token: Arc<CancellationToken>,
28) {
29  let mut external_receiver = frontend.event_stream();
30  loop {
31    select! {
32      external_message = external_receiver.recv() => {
33        match external_message {
34          Ok(message) => match message {
35            IntifaceMessage::RequestEngineVersion{expected_version:_} => {
36              // TODO We should check the version here and shut down on mismatch.
37              info!("Engine version request received from frontend.");
38              frontend
39                .send(EngineMessage::EngineVersion{ version: VERSION.to_owned() })
40                .await;
41            },
42            IntifaceMessage::Stop{} => {
43              connection_cancellation_token.cancel();
44              info!("Got external stop request");
45              break;
46            }
47          },
48          Err(_) => {
49            info!("Frontend sender dropped, assuming connection lost, breaking.");
50            break;
51          }
52        }
53      },
54      _ = connection_cancellation_token.cancelled() => {
55        info!("Connection cancellation token activated, breaking from frontend external event loop.");
56        break;
57      }
58    }
59  }
60}
61
62pub async fn frontend_server_event_loop(
63  receiver: impl Stream<Item = ButtplugRemoteServerEvent>,
64  frontend: Arc<dyn Frontend>,
65  connection_cancellation_token: CancellationToken,
66) {
67  pin_mut!(receiver);
68
69  loop {
70    select! {
71      maybe_event = receiver.next() => {
72        match maybe_event {
73          Some(event) => match event {
74            ButtplugRemoteServerEvent::ClientConnected(client_name) => {
75              info!("Client connected: {}", client_name);
76              frontend.send(EngineMessage::ClientConnected{client_name}).await;
77            }
78            ButtplugRemoteServerEvent::ClientDisconnected => {
79              info!("Client disconnected.");
80              frontend
81                .send(EngineMessage::ClientDisconnected{})
82                .await;
83            }
84            ButtplugRemoteServerEvent::DeviceAdded { index: device_id, name: device_name, identifier: device_address, display_name: device_display_name } => {
85              info!("Device Added: {} - {} - {:?}", device_id, device_name, device_address);
86              frontend
87                .send(EngineMessage::DeviceConnected { name: device_name, index: device_id, identifier: device_address, display_name: device_display_name })
88                .await;
89            }
90            ButtplugRemoteServerEvent::DeviceRemoved { index: device_id } => {
91              info!("Device Removed: {}", device_id);
92              frontend
93                .send(EngineMessage::DeviceDisconnected{index: device_id})
94                .await;
95            }
96          },
97          None => {
98            info!("Lost connection with main thread, breaking.");
99            break;
100          },
101        }
102      },
103      _ = connection_cancellation_token.cancelled() => {
104        info!("Connection cancellation token activated, breaking from frontend server event loop");
105        break;
106      }
107    }
108  }
109  info!("Exiting server event receiver loop");
110}
111
112#[derive(Default)]
113struct NullFrontend {
114  notify: Arc<Notify>,
115}
116
117#[async_trait]
118impl Frontend for NullFrontend {
119  async fn send(&self, _: EngineMessage) {}
120  async fn connect(&self) -> Result<(), IntifaceError> {
121    Ok(())
122  }
123  fn disconnect(&self) {
124    self.notify.notify_waiters();
125  }
126  fn disconnect_notifier(&self) -> Arc<Notify> {
127    self.notify.clone()
128  }
129  fn event_stream(&self) -> broadcast::Receiver<IntifaceMessage> {
130    let (_, receiver) = broadcast::channel(255);
131    receiver
132  }
133}