Skip to main content

intiface_engine/
engine.rs

1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8use crate::{
9  ButtplugRemoteServer, ButtplugRepeater,
10  backdoor_server::BackdoorServer,
11  buttplug_server::{reset_buttplug_server, run_server, setup_buttplug_server},
12  error::IntifaceEngineError,
13  frontend::{
14    Frontend, frontend_external_event_loop, frontend_server_event_loop,
15    process_messages::EngineMessage,
16  },
17  mdns::IntifaceMdns,
18  options::EngineOptions,
19  remote_server::ButtplugRemoteServerEvent,
20  rest_server::IntifaceRestServer,
21};
22
23use buttplug_server_device_config::{DeviceConfigurationManager, save_user_config};
24use futures::{StreamExt, pin_mut};
25use once_cell::sync::OnceCell;
26use std::{path::Path, sync::Arc, time::Duration};
27use tokio::{fs, select};
28use tokio_util::sync::CancellationToken;
29
30#[cfg(debug_assertions)]
31pub fn maybe_crash_main_thread(options: &EngineOptions) {
32  if options.crash_main_thread() {
33    panic!("Crashing main thread by request");
34  }
35}
36
37#[allow(dead_code)]
38#[cfg(debug_assertions)]
39pub fn maybe_crash_task_thread(options: &EngineOptions) {
40  if options.crash_task_thread() {
41    tokio::spawn(async {
42      tokio::time::sleep(Duration::from_millis(100)).await;
43      panic!("Crashing a task thread by request");
44    });
45  }
46}
47
48#[derive(Default)]
49pub struct IntifaceEngine {
50  stop_token: Arc<CancellationToken>,
51  backdoor_server: OnceCell<Arc<BackdoorServer>>,
52}
53
54impl IntifaceEngine {
55  pub fn backdoor_server(&self) -> Option<Arc<BackdoorServer>> {
56    Some(self.backdoor_server.get()?.clone())
57  }
58
59  pub async fn run(
60    &self,
61    options: &EngineOptions,
62    frontend: Option<Arc<dyn Frontend>>,
63    dcm: &Option<Arc<DeviceConfigurationManager>>,
64  ) -> Result<(), IntifaceEngineError> {
65    // Set up Frontend
66    if let Some(frontend) = &frontend {
67      let frontend_loop = frontend_external_event_loop(frontend.clone(), self.stop_token.clone());
68      tokio::spawn(async move {
69        frontend_loop.await;
70      });
71
72      frontend.connect().await.unwrap();
73      frontend.send(EngineMessage::EngineStarted {}).await;
74    }
75
76    // Set up mDNS
77    let _mdns_server = if options.broadcast_server_mdns() {
78      // TODO Unregister whenever we have a live connection
79
80      // TODO Support different services for engine versus repeater
81      Some(IntifaceMdns::new())
82    } else {
83      None
84    };
85
86    // Set up Repeater (if in repeater mode)
87    if options.repeater_mode() {
88      info!("Starting repeater");
89
90      let repeater = ButtplugRepeater::new(
91        options.repeater_local_port().unwrap(),
92        options.repeater_remote_address().as_ref().unwrap(),
93        self.stop_token.child_token(),
94      );
95      select! {
96        _ = self.stop_token.cancelled() => {
97          info!("Owner requested process exit, exiting.");
98        }
99        _ = repeater.listen() => {
100          info!("Repeater listener stopped, exiting.");
101        }
102      };
103      if let Some(frontend) = &frontend {
104        frontend.send(EngineMessage::EngineStopped {}).await;
105        tokio::time::sleep(Duration::from_millis(100)).await;
106        frontend.disconnect();
107      }
108      return Ok(());
109    }
110
111    // Set up Engine (if in engine mode)
112
113    // At this point we will have received and validated options.
114
115    // Hang out until those listeners get sick of listening.
116    info!("Intiface CLI Setup finished, running server tasks until all joined.");
117    let server = setup_buttplug_server(options, &self.backdoor_server, dcm).await?;
118    let dcm = server
119      .device_manager()
120      .device_configuration_manager()
121      .clone();
122
123    if let Some(rest_port) = options.rest_api_port() {
124      select! {
125        _ = self.stop_token.cancelled() => {
126          info!("Owner requested process exit, exiting.");
127        }
128        res = IntifaceRestServer::run(rest_port, server) => {
129          info!("Rest API listener stopped, exiting.");
130          if let Err(e) = res {
131            error!("Error running Intiface Central RestAPI Server: {:?}", e);
132          }
133        }
134      };
135      if let Some(frontend) = &frontend {
136        frontend.send(EngineMessage::EngineStopped {}).await;
137        tokio::time::sleep(Duration::from_millis(100)).await;
138        frontend.disconnect();
139      }
140      return Ok(());
141    }
142
143    let mut server = ButtplugRemoteServer::new(server, &None);
144
145    if let Some(config_path) = options.user_device_config_path() {
146      let stream = server.event_stream();
147      {
148        let config_path = config_path.to_owned();
149        tokio::spawn(async move {
150          pin_mut!(stream);
151          loop {
152            if let Some(event) = stream.next().await {
153              match event {
154                ButtplugRemoteServerEvent::DeviceAdded {
155                  index: _,
156                  identifier: _,
157                  name: _,
158                  display_name: _,
159                  needs_keepalive: _,
160                } => {
161                  if let Ok(config_str) = save_user_config(&dcm) {
162                    // Should probably at least log if we fail to write the config file
163                    if let Err(e) = fs::write(&Path::new(&config_path), config_str).await {
164                      error!("Error saving config file: {:?}", e);
165                    }
166                  }
167                }
168                _ => continue,
169              }
170            };
171          }
172        });
173      }
174    }
175    if let Some(frontend) = &frontend {
176      frontend.send(EngineMessage::EngineServerCreated {}).await;
177      let event_receiver = server.event_stream();
178      let frontend_clone = frontend.clone();
179      let stop_child_token = self.stop_token.child_token();
180      tokio::spawn(async move {
181        frontend_server_event_loop(event_receiver, frontend_clone, stop_child_token).await;
182      });
183    }
184
185    loop {
186      let session_connection_token = CancellationToken::new();
187      info!("Starting server");
188
189      // Let everything spin up, then try crashing.
190
191      #[cfg(debug_assertions)]
192      maybe_crash_main_thread(options);
193
194      let mut exit_requested = false;
195      select! {
196        _ = self.stop_token.cancelled() => {
197          info!("Owner requested process exit, exiting.");
198          exit_requested = true;
199        }
200        result = run_server(&server, options) => {
201          match result {
202            Ok(_) => info!("Connection dropped, restarting stay open loop."),
203            Err(e) => {
204              error!("{}", format!("Process Error: {:?}", e));
205
206              if let Some(frontend) = &frontend {
207                frontend
208                  .send(EngineMessage::EngineError{ error: format!("Process Error: {:?}", e).to_owned()})
209                  .await;
210              }
211            }
212          }
213        }
214      };
215      match server.disconnect().await {
216        Ok(_) => {
217          info!("Client forcefully disconnected from server.");
218          if let Some(frontend) = &frontend {
219            frontend.send(EngineMessage::ClientDisconnected {}).await;
220          }
221        }
222        Err(_) => info!("Client already disconnected from server."),
223      };
224      session_connection_token.cancel();
225      if exit_requested {
226        info!("Breaking out of event loop in order to exit");
227        break;
228      }
229      // We're not exiting, rebuild our server.
230      let dm = server.server().device_manager();
231      server = reset_buttplug_server(options, &dm, server.event_sender()).await?;
232      info!("Server connection dropped, restarting");
233    }
234    info!("Shutting down server...");
235    if let Err(e) = server.shutdown().await {
236      error!("Shutdown failed: {:?}", e);
237    }
238    info!("Exiting");
239    if let Some(frontend) = &frontend {
240      frontend.send(EngineMessage::EngineStopped {}).await;
241      tokio::time::sleep(Duration::from_millis(100)).await;
242      frontend.disconnect();
243    }
244    Ok(())
245  }
246
247  pub fn stop(&self) {
248    info!("Engine stop called, cancelling token.");
249    self.stop_token.cancel();
250  }
251}