intiface_engine/
engine.rs1use crate::{
9 ButtplugRemoteServer, ButtplugRepeater,
10 backdoor_server::BackdoorServer,
11 buttplug_server::{reset_buttplug_server, run_server, setup_buttplug_server},
12 error::IntifaceEngineError,
13 frontend::{
14 Frontend, frontend_external_event_loop, frontend_server_event_loop,
15 process_messages::EngineMessage,
16 },
17 mdns::IntifaceMdns,
18 options::EngineOptions,
19 remote_server::ButtplugRemoteServerEvent,
20 rest_server::IntifaceRestServer,
21};
22
23use buttplug_server_device_config::{DeviceConfigurationManager, save_user_config};
24use futures::{StreamExt, pin_mut};
25use once_cell::sync::OnceCell;
26use std::{path::Path, sync::Arc, time::Duration};
27use tokio::{fs, select};
28use tokio_util::sync::CancellationToken;
29
30#[cfg(debug_assertions)]
31pub fn maybe_crash_main_thread(options: &EngineOptions) {
32 if options.crash_main_thread() {
33 panic!("Crashing main thread by request");
34 }
35}
36
37#[allow(dead_code)]
38#[cfg(debug_assertions)]
39pub fn maybe_crash_task_thread(options: &EngineOptions) {
40 if options.crash_task_thread() {
41 tokio::spawn(async {
42 tokio::time::sleep(Duration::from_millis(100)).await;
43 panic!("Crashing a task thread by request");
44 });
45 }
46}
47
48#[derive(Default)]
49pub struct IntifaceEngine {
50 stop_token: Arc<CancellationToken>,
51 backdoor_server: OnceCell<Arc<BackdoorServer>>,
52}
53
54impl IntifaceEngine {
55 pub fn backdoor_server(&self) -> Option<Arc<BackdoorServer>> {
56 Some(self.backdoor_server.get()?.clone())
57 }
58
59 pub async fn run(
60 &self,
61 options: &EngineOptions,
62 frontend: Option<Arc<dyn Frontend>>,
63 dcm: &Option<Arc<DeviceConfigurationManager>>,
64 ) -> Result<(), IntifaceEngineError> {
65 if let Some(frontend) = &frontend {
67 let frontend_loop = frontend_external_event_loop(frontend.clone(), self.stop_token.clone());
68 tokio::spawn(async move {
69 frontend_loop.await;
70 });
71
72 frontend.connect().await.unwrap();
73 frontend.send(EngineMessage::EngineStarted {}).await;
74 }
75
76 let _mdns_server = if options.broadcast_server_mdns() {
78 Some(IntifaceMdns::new())
82 } else {
83 None
84 };
85
86 if options.repeater_mode() {
88 info!("Starting repeater");
89
90 let repeater = ButtplugRepeater::new(
91 options.repeater_local_port().unwrap(),
92 options.repeater_remote_address().as_ref().unwrap(),
93 self.stop_token.child_token(),
94 );
95 select! {
96 _ = self.stop_token.cancelled() => {
97 info!("Owner requested process exit, exiting.");
98 }
99 _ = repeater.listen() => {
100 info!("Repeater listener stopped, exiting.");
101 }
102 };
103 if let Some(frontend) = &frontend {
104 frontend.send(EngineMessage::EngineStopped {}).await;
105 tokio::time::sleep(Duration::from_millis(100)).await;
106 frontend.disconnect();
107 }
108 return Ok(());
109 }
110
111 info!("Intiface CLI Setup finished, running server tasks until all joined.");
117 let server = setup_buttplug_server(options, &self.backdoor_server, dcm).await?;
118 let dcm = server
119 .device_manager()
120 .device_configuration_manager()
121 .clone();
122
123 if let Some(rest_port) = options.rest_api_port() {
124 select! {
125 _ = self.stop_token.cancelled() => {
126 info!("Owner requested process exit, exiting.");
127 }
128 res = IntifaceRestServer::run(rest_port, server) => {
129 info!("Rest API listener stopped, exiting.");
130 if let Err(e) = res {
131 error!("Error running Intiface Central RestAPI Server: {:?}", e);
132 }
133 }
134 };
135 if let Some(frontend) = &frontend {
136 frontend.send(EngineMessage::EngineStopped {}).await;
137 tokio::time::sleep(Duration::from_millis(100)).await;
138 frontend.disconnect();
139 }
140 return Ok(());
141 }
142
143 let mut server = ButtplugRemoteServer::new(server, &None);
144
145 if let Some(config_path) = options.user_device_config_path() {
146 let stream = server.event_stream();
147 {
148 let config_path = config_path.to_owned();
149 tokio::spawn(async move {
150 pin_mut!(stream);
151 loop {
152 if let Some(event) = stream.next().await {
153 match event {
154 ButtplugRemoteServerEvent::DeviceAdded {
155 index: _,
156 identifier: _,
157 name: _,
158 display_name: _,
159 } => {
160 if let Ok(config_str) = save_user_config(&dcm) {
161 if let Err(e) = fs::write(&Path::new(&config_path), config_str).await {
163 error!("Error saving config file: {:?}", e);
164 }
165 }
166 }
167 _ => continue,
168 }
169 };
170 }
171 });
172 }
173 }
174 if let Some(frontend) = &frontend {
175 frontend.send(EngineMessage::EngineServerCreated {}).await;
176 let event_receiver = server.event_stream();
177 let frontend_clone = frontend.clone();
178 let stop_child_token = self.stop_token.child_token();
179 tokio::spawn(async move {
180 frontend_server_event_loop(event_receiver, frontend_clone, stop_child_token).await;
181 });
182 }
183
184 loop {
185 let session_connection_token = CancellationToken::new();
186 info!("Starting server");
187
188 #[cfg(debug_assertions)]
191 maybe_crash_main_thread(options);
192
193 let mut exit_requested = false;
194 select! {
195 _ = self.stop_token.cancelled() => {
196 info!("Owner requested process exit, exiting.");
197 exit_requested = true;
198 }
199 result = run_server(&server, options) => {
200 match result {
201 Ok(_) => info!("Connection dropped, restarting stay open loop."),
202 Err(e) => {
203 error!("{}", format!("Process Error: {:?}", e));
204
205 if let Some(frontend) = &frontend {
206 frontend
207 .send(EngineMessage::EngineError{ error: format!("Process Error: {:?}", e).to_owned()})
208 .await;
209 }
210 }
211 }
212 }
213 };
214 match server.disconnect().await {
215 Ok(_) => {
216 info!("Client forcefully disconnected from server.");
217 if let Some(frontend) = &frontend {
218 frontend.send(EngineMessage::ClientDisconnected {}).await;
219 }
220 }
221 Err(_) => info!("Client already disconnected from server."),
222 };
223 session_connection_token.cancel();
224 if exit_requested {
225 info!("Breaking out of event loop in order to exit");
226 break;
227 }
228 let dm = server.server().device_manager();
230 server = reset_buttplug_server(options, &dm, server.event_sender()).await?;
231 info!("Server connection dropped, restarting");
232 }
233 info!("Shutting down server...");
234 if let Err(e) = server.shutdown().await {
235 error!("Shutdown failed: {:?}", e);
236 }
237 info!("Exiting");
238 if let Some(frontend) = &frontend {
239 frontend.send(EngineMessage::EngineStopped {}).await;
240 tokio::time::sleep(Duration::from_millis(100)).await;
241 frontend.disconnect();
242 }
243 Ok(())
244 }
245
246 pub fn stop(&self) {
247 info!("Engine stop called, cancelling token.");
248 self.stop_token.cancel();
249 }
250}