intiface_engine/
engine.rs1use crate::{
9 ButtplugRemoteServer, ButtplugRepeater,
10 backdoor_server::BackdoorServer,
11 buttplug_server::{reset_buttplug_server, run_server, setup_buttplug_server},
12 error::IntifaceEngineError,
13 frontend::{
14 Frontend, frontend_external_event_loop, frontend_server_event_loop,
15 process_messages::EngineMessage,
16 },
17 mdns::IntifaceMdns,
18 options::EngineOptions,
19 remote_server::ButtplugRemoteServerEvent,
20 rest_server::IntifaceRestServer,
21};
22
23use buttplug_server_device_config::{DeviceConfigurationManager, save_user_config};
24use futures::{StreamExt, pin_mut};
25use once_cell::sync::OnceCell;
26use std::{path::Path, sync::Arc, time::Duration};
27use tokio::{fs, select};
28use tokio_util::sync::CancellationToken;
29
30#[cfg(debug_assertions)]
31pub fn maybe_crash_main_thread(options: &EngineOptions) {
32 if options.crash_main_thread() {
33 panic!("Crashing main thread by request");
34 }
35}
36
37#[allow(dead_code)]
38#[cfg(debug_assertions)]
39pub fn maybe_crash_task_thread(options: &EngineOptions) {
40 if options.crash_task_thread() {
41 tokio::spawn(async {
42 tokio::time::sleep(Duration::from_millis(100)).await;
43 panic!("Crashing a task thread by request");
44 });
45 }
46}
47
48#[derive(Default)]
49pub struct IntifaceEngine {
50 stop_token: Arc<CancellationToken>,
51 backdoor_server: OnceCell<Arc<BackdoorServer>>,
52}
53
54impl IntifaceEngine {
55 pub fn backdoor_server(&self) -> Option<Arc<BackdoorServer>> {
56 Some(self.backdoor_server.get()?.clone())
57 }
58
59 pub async fn run(
60 &self,
61 options: &EngineOptions,
62 frontend: Option<Arc<dyn Frontend>>,
63 dcm: &Option<Arc<DeviceConfigurationManager>>,
64 ) -> Result<(), IntifaceEngineError> {
65 if let Some(frontend) = &frontend {
67 let frontend_loop = frontend_external_event_loop(frontend.clone(), self.stop_token.clone());
68 tokio::spawn(async move {
69 frontend_loop.await;
70 });
71
72 frontend.connect().await.unwrap();
73 frontend.send(EngineMessage::EngineStarted {}).await;
74 }
75
76 let _mdns_server = if options.broadcast_server_mdns() {
78 Some(IntifaceMdns::new())
82 } else {
83 None
84 };
85
86 if options.repeater_mode() {
88 info!("Starting repeater");
89
90 let repeater = ButtplugRepeater::new(
91 options.repeater_local_port().unwrap(),
92 options.repeater_remote_address().as_ref().unwrap(),
93 self.stop_token.child_token(),
94 );
95 select! {
96 _ = self.stop_token.cancelled() => {
97 info!("Owner requested process exit, exiting.");
98 }
99 _ = repeater.listen() => {
100 info!("Repeater listener stopped, exiting.");
101 }
102 };
103 if let Some(frontend) = &frontend {
104 frontend.send(EngineMessage::EngineStopped {}).await;
105 tokio::time::sleep(Duration::from_millis(100)).await;
106 frontend.disconnect();
107 }
108 return Ok(());
109 }
110
111 info!("Intiface CLI Setup finished, running server tasks until all joined.");
117 let server = setup_buttplug_server(options, &self.backdoor_server, dcm).await?;
118 let dcm = server
119 .device_manager()
120 .device_configuration_manager()
121 .clone();
122
123 if let Some(rest_port) = options.rest_api_port() {
124 select! {
125 _ = self.stop_token.cancelled() => {
126 info!("Owner requested process exit, exiting.");
127 }
128 res = IntifaceRestServer::run(rest_port, server) => {
129 info!("Rest API listener stopped, exiting.");
130 if let Err(e) = res {
131 error!("Error running Intiface Central RestAPI Server: {:?}", e);
132 }
133 }
134 };
135 if let Some(frontend) = &frontend {
136 frontend.send(EngineMessage::EngineStopped {}).await;
137 tokio::time::sleep(Duration::from_millis(100)).await;
138 frontend.disconnect();
139 }
140 return Ok(());
141 }
142
143 let mut server = ButtplugRemoteServer::new(server, &None);
144
145 if let Some(config_path) = options.user_device_config_path() {
146 let stream = server.event_stream();
147 {
148 let config_path = config_path.to_owned();
149 tokio::spawn(async move {
150 pin_mut!(stream);
151 loop {
152 if let Some(event) = stream.next().await {
153 match event {
154 ButtplugRemoteServerEvent::DeviceAdded {
155 index: _,
156 identifier: _,
157 name: _,
158 display_name: _,
159 needs_keepalive: _,
160 } => {
161 if let Ok(config_str) = save_user_config(&dcm) {
162 if let Err(e) = fs::write(&Path::new(&config_path), config_str).await {
164 error!("Error saving config file: {:?}", e);
165 }
166 }
167 }
168 _ => continue,
169 }
170 };
171 }
172 });
173 }
174 }
175 if let Some(frontend) = &frontend {
176 frontend.send(EngineMessage::EngineServerCreated {}).await;
177 let event_receiver = server.event_stream();
178 let frontend_clone = frontend.clone();
179 let stop_child_token = self.stop_token.child_token();
180 tokio::spawn(async move {
181 frontend_server_event_loop(event_receiver, frontend_clone, stop_child_token).await;
182 });
183 }
184
185 loop {
186 let session_connection_token = CancellationToken::new();
187 info!("Starting server");
188
189 #[cfg(debug_assertions)]
192 maybe_crash_main_thread(options);
193
194 let mut exit_requested = false;
195 select! {
196 _ = self.stop_token.cancelled() => {
197 info!("Owner requested process exit, exiting.");
198 exit_requested = true;
199 }
200 result = run_server(&server, options) => {
201 match result {
202 Ok(_) => info!("Connection dropped, restarting stay open loop."),
203 Err(e) => {
204 error!("{}", format!("Process Error: {:?}", e));
205
206 if let Some(frontend) = &frontend {
207 frontend
208 .send(EngineMessage::EngineError{ error: format!("Process Error: {:?}", e).to_owned()})
209 .await;
210 }
211 }
212 }
213 }
214 };
215 match server.disconnect().await {
216 Ok(_) => {
217 info!("Client forcefully disconnected from server.");
218 if let Some(frontend) = &frontend {
219 frontend.send(EngineMessage::ClientDisconnected {}).await;
220 }
221 }
222 Err(_) => info!("Client already disconnected from server."),
223 };
224 session_connection_token.cancel();
225 if exit_requested {
226 info!("Breaking out of event loop in order to exit");
227 break;
228 }
229 let dm = server.server().device_manager();
231 server = reset_buttplug_server(options, &dm, server.event_sender()).await?;
232 info!("Server connection dropped, restarting");
233 }
234 info!("Shutting down server...");
235 if let Err(e) = server.shutdown().await {
236 error!("Shutdown failed: {:?}", e);
237 }
238 info!("Exiting");
239 if let Some(frontend) = &frontend {
240 frontend.send(EngineMessage::EngineStopped {}).await;
241 tokio::time::sleep(Duration::from_millis(100)).await;
242 frontend.disconnect();
243 }
244 Ok(())
245 }
246
247 pub fn stop(&self) {
248 info!("Engine stop called, cancelling token.");
249 self.stop_token.cancel();
250 }
251}