intiface_engine/frontend/
mod.rs1pub mod process_messages;
2use crate::error::IntifaceError;
3use crate::remote_server::ButtplugRemoteServerEvent;
4use async_trait::async_trait;
5use futures::{pin_mut, Stream, StreamExt};
6pub use process_messages::{EngineMessage, IntifaceMessage};
7use std::sync::Arc;
8use tokio::{
9 select,
10 sync::{broadcast, Notify},
11};
12use tokio_util::sync::CancellationToken;
13
14const VERSION: &str = env!("CARGO_PKG_VERSION");
15
16#[async_trait]
17pub trait Frontend: Sync + Send {
18 async fn send(&self, msg: EngineMessage);
19 async fn connect(&self) -> Result<(), IntifaceError>;
20 fn disconnect_notifier(&self) -> Arc<Notify>;
21 fn disconnect(&self);
22 fn event_stream(&self) -> broadcast::Receiver<IntifaceMessage>;
23}
24
25pub async fn frontend_external_event_loop(
26 frontend: Arc<dyn Frontend>,
27 connection_cancellation_token: Arc<CancellationToken>,
28) {
29 let mut external_receiver = frontend.event_stream();
30 loop {
31 select! {
32 external_message = external_receiver.recv() => {
33 match external_message {
34 Ok(message) => match message {
35 IntifaceMessage::RequestEngineVersion{expected_version:_} => {
36 info!("Engine version request received from frontend.");
38 frontend
39 .send(EngineMessage::EngineVersion{ version: VERSION.to_owned() })
40 .await;
41 },
42 IntifaceMessage::Stop{} => {
43 connection_cancellation_token.cancel();
44 info!("Got external stop request");
45 break;
46 }
47 },
48 Err(_) => {
49 info!("Frontend sender dropped, assuming connection lost, breaking.");
50 break;
51 }
52 }
53 },
54 _ = connection_cancellation_token.cancelled() => {
55 info!("Connection cancellation token activated, breaking from frontend external event loop.");
56 break;
57 }
58 }
59 }
60}
61
62pub async fn frontend_server_event_loop(
63 receiver: impl Stream<Item = ButtplugRemoteServerEvent>,
64 frontend: Arc<dyn Frontend>,
65 connection_cancellation_token: CancellationToken,
66) {
67 pin_mut!(receiver);
68
69 loop {
70 select! {
71 maybe_event = receiver.next() => {
72 match maybe_event {
73 Some(event) => match event {
74 ButtplugRemoteServerEvent::ClientConnected(client_name) => {
75 info!("Client connected: {}", client_name);
76 frontend.send(EngineMessage::ClientConnected{client_name}).await;
77 }
78 ButtplugRemoteServerEvent::ClientDisconnected => {
79 info!("Client disconnected.");
80 frontend
81 .send(EngineMessage::ClientDisconnected{})
82 .await;
83 }
84 ButtplugRemoteServerEvent::DeviceAdded { index: device_id, name: device_name, identifier: device_address, display_name: device_display_name } => {
85 info!("Device Added: {} - {} - {:?}", device_id, device_name, device_address);
86 frontend
87 .send(EngineMessage::DeviceConnected { name: device_name, index: device_id, identifier: device_address, display_name: device_display_name })
88 .await;
89 }
90 ButtplugRemoteServerEvent::DeviceRemoved { index: device_id } => {
91 info!("Device Removed: {}", device_id);
92 frontend
93 .send(EngineMessage::DeviceDisconnected{index: device_id})
94 .await;
95 }
96 },
97 None => {
98 info!("Lost connection with main thread, breaking.");
99 break;
100 },
101 }
102 },
103 _ = connection_cancellation_token.cancelled() => {
104 info!("Connection cancellation token activated, breaking from frontend server event loop");
105 break;
106 }
107 }
108 }
109 info!("Exiting server event receiver loop");
110}
111
112#[derive(Default)]
113struct NullFrontend {
114 notify: Arc<Notify>,
115}
116
117#[async_trait]
118impl Frontend for NullFrontend {
119 async fn send(&self, _: EngineMessage) {}
120 async fn connect(&self) -> Result<(), IntifaceError> {
121 Ok(())
122 }
123 fn disconnect(&self) {
124 self.notify.notify_waiters();
125 }
126 fn disconnect_notifier(&self) -> Arc<Notify> {
127 self.notify.clone()
128 }
129 fn event_stream(&self) -> broadcast::Receiver<IntifaceMessage> {
130 let (_, receiver) = broadcast::channel(255);
131 receiver
132 }
133}