intiface_engine/frontend/
mod.rs1pub mod process_messages;
9use crate::error::IntifaceError;
10use crate::remote_server::ButtplugRemoteServerEvent;
11use async_trait::async_trait;
12use futures::{Stream, StreamExt, pin_mut};
13pub use process_messages::{EngineMessage, IntifaceMessage};
14use std::sync::Arc;
15use tokio::{
16 select,
17 sync::{Notify, broadcast},
18};
19use tokio_util::sync::CancellationToken;
20
21const VERSION: &str = env!("CARGO_PKG_VERSION");
22
23#[async_trait]
24pub trait Frontend: Sync + Send {
25 async fn send(&self, msg: EngineMessage);
26 async fn connect(&self) -> Result<(), IntifaceError>;
27 fn disconnect_notifier(&self) -> Arc<Notify>;
28 fn disconnect(&self);
29 fn event_stream(&self) -> broadcast::Receiver<IntifaceMessage>;
30}
31
32pub async fn frontend_external_event_loop(
33 frontend: Arc<dyn Frontend>,
34 connection_cancellation_token: Arc<CancellationToken>,
35) {
36 let mut external_receiver = frontend.event_stream();
37 loop {
38 select! {
39 external_message = external_receiver.recv() => {
40 match external_message {
41 Ok(message) => match message {
42 IntifaceMessage::RequestEngineVersion{expected_version:_} => {
43 info!("Engine version request received from frontend.");
45 frontend
46 .send(EngineMessage::EngineVersion{ version: VERSION.to_owned() })
47 .await;
48 },
49 IntifaceMessage::Stop{} => {
50 connection_cancellation_token.cancel();
51 info!("Got external stop request");
52 break;
53 }
54 },
55 Err(_) => {
56 info!("Frontend sender dropped, assuming connection lost, breaking.");
57 break;
58 }
59 }
60 },
61 _ = connection_cancellation_token.cancelled() => {
62 info!("Connection cancellation token activated, breaking from frontend external event loop.");
63 break;
64 }
65 }
66 }
67}
68
69pub async fn frontend_server_event_loop(
70 receiver: impl Stream<Item = ButtplugRemoteServerEvent>,
71 frontend: Arc<dyn Frontend>,
72 connection_cancellation_token: CancellationToken,
73) {
74 pin_mut!(receiver);
75
76 loop {
77 select! {
78 maybe_event = receiver.next() => {
79 match maybe_event {
80 Some(event) => match event {
81 ButtplugRemoteServerEvent::ClientConnected(client_name) => {
82 info!("Client connected: {}", client_name);
83 frontend.send(EngineMessage::ClientConnected{client_name}).await;
84 }
85 ButtplugRemoteServerEvent::ClientDisconnected => {
86 info!("Client disconnected.");
87 frontend
88 .send(EngineMessage::ClientDisconnected{})
89 .await;
90 }
91 ButtplugRemoteServerEvent::DeviceAdded { index: device_id, name: device_name, identifier: device_address, display_name: device_display_name, needs_keepalive: device_needs_keepalive } => {
92 info!("Device Added: {} - {} - {:?}", device_id, device_name, device_address);
93 frontend
94 .send(EngineMessage::DeviceConnected { name: device_name, index: device_id, identifier: device_address, display_name: device_display_name, needs_keepalive: device_needs_keepalive })
95 .await;
96 }
97 ButtplugRemoteServerEvent::DeviceRemoved { index: device_id } => {
98 info!("Device Removed: {}", device_id);
99 frontend
100 .send(EngineMessage::DeviceDisconnected{index: device_id})
101 .await;
102 }
103 },
104 None => {
105 info!("Lost connection with main thread, breaking.");
106 break;
107 },
108 }
109 },
110 _ = connection_cancellation_token.cancelled() => {
111 info!("Connection cancellation token activated, breaking from frontend server event loop");
112 break;
113 }
114 }
115 }
116 info!("Exiting server event receiver loop");
117}
118