Skip to main content

intiface_engine/
engine.rs

1// Buttplug Rust Source Code File - See https://buttplug.io for more info.
2//
3// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved.
4//
5// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
6// for full license information.
7
8use crate::{
9  ButtplugRemoteServer, ButtplugRepeater,
10  backdoor_server::BackdoorServer,
11  buttplug_server::{reset_buttplug_server, run_server, setup_buttplug_server},
12  error::IntifaceEngineError,
13  frontend::{
14    Frontend, frontend_external_event_loop, frontend_server_event_loop,
15    process_messages::EngineMessage,
16  },
17  mdns::IntifaceMdns,
18  options::EngineOptions,
19  remote_server::ButtplugRemoteServerEvent,
20  rest_server::IntifaceRestServer,
21};
22
23use buttplug_server_device_config::{DeviceConfigurationManager, save_user_config};
24use futures::{StreamExt, pin_mut};
25use once_cell::sync::OnceCell;
26use std::{path::Path, sync::Arc, time::Duration};
27use tokio::{fs, select};
28use tokio_util::sync::CancellationToken;
29
30#[cfg(debug_assertions)]
31pub fn maybe_crash_main_thread(options: &EngineOptions) {
32  if options.crash_main_thread() {
33    panic!("Crashing main thread by request");
34  }
35}
36
37#[allow(dead_code)]
38#[cfg(debug_assertions)]
39pub fn maybe_crash_task_thread(options: &EngineOptions) {
40  if options.crash_task_thread() {
41    tokio::spawn(async {
42      tokio::time::sleep(Duration::from_millis(100)).await;
43      panic!("Crashing a task thread by request");
44    });
45  }
46}
47
48#[derive(Default)]
49pub struct IntifaceEngine {
50  stop_token: Arc<CancellationToken>,
51  backdoor_server: OnceCell<Arc<BackdoorServer>>,
52}
53
54impl IntifaceEngine {
55  pub fn backdoor_server(&self) -> Option<Arc<BackdoorServer>> {
56    Some(self.backdoor_server.get()?.clone())
57  }
58
59  pub async fn run(
60    &self,
61    options: &EngineOptions,
62    frontend: Option<Arc<dyn Frontend>>,
63    dcm: &Option<Arc<DeviceConfigurationManager>>,
64  ) -> Result<(), IntifaceEngineError> {
65    // Set up Frontend
66    if let Some(frontend) = &frontend {
67      let frontend_loop = frontend_external_event_loop(frontend.clone(), self.stop_token.clone());
68      tokio::spawn(async move {
69        frontend_loop.await;
70      });
71
72      frontend.connect().await.unwrap();
73      frontend.send(EngineMessage::EngineStarted {}).await;
74    }
75
76    // Set up mDNS
77    let _mdns_server = if options.broadcast_server_mdns() {
78      // TODO Unregister whenever we have a live connection
79
80      // TODO Support different services for engine versus repeater
81      Some(IntifaceMdns::new())
82    } else {
83      None
84    };
85
86    // Set up Repeater (if in repeater mode)
87    if options.repeater_mode() {
88      info!("Starting repeater");
89
90      let repeater = ButtplugRepeater::new(
91        options.repeater_local_port().unwrap(),
92        options.repeater_remote_address().as_ref().unwrap(),
93        self.stop_token.child_token(),
94      );
95      select! {
96        _ = self.stop_token.cancelled() => {
97          info!("Owner requested process exit, exiting.");
98        }
99        _ = repeater.listen() => {
100          info!("Repeater listener stopped, exiting.");
101        }
102      };
103      if let Some(frontend) = &frontend {
104        frontend.send(EngineMessage::EngineStopped {}).await;
105        tokio::time::sleep(Duration::from_millis(100)).await;
106        frontend.disconnect();
107      }
108      return Ok(());
109    }
110
111    // Set up Engine (if in engine mode)
112
113    // At this point we will have received and validated options.
114
115    // Hang out until those listeners get sick of listening.
116    info!("Intiface CLI Setup finished, running server tasks until all joined.");
117    let server = setup_buttplug_server(options, &self.backdoor_server, dcm).await?;
118    let dcm = server
119      .device_manager()
120      .device_configuration_manager()
121      .clone();
122
123    if let Some(rest_port) = options.rest_api_port() {
124      select! {
125        _ = self.stop_token.cancelled() => {
126          info!("Owner requested process exit, exiting.");
127        }
128        res = IntifaceRestServer::run(rest_port, server) => {
129          info!("Rest API listener stopped, exiting.");
130          if let Err(e) = res {
131            error!("Error running Intiface Central RestAPI Server: {:?}", e);
132          }
133        }
134      };
135      if let Some(frontend) = &frontend {
136        frontend.send(EngineMessage::EngineStopped {}).await;
137        tokio::time::sleep(Duration::from_millis(100)).await;
138        frontend.disconnect();
139      }
140      return Ok(());
141    }
142
143    let mut server = ButtplugRemoteServer::new(server, &None);
144
145    if let Some(config_path) = options.user_device_config_path() {
146      let stream = server.event_stream();
147      {
148        let config_path = config_path.to_owned();
149        tokio::spawn(async move {
150          pin_mut!(stream);
151          loop {
152            if let Some(event) = stream.next().await {
153              match event {
154                ButtplugRemoteServerEvent::DeviceAdded {
155                  index: _,
156                  identifier: _,
157                  name: _,
158                  display_name: _,
159                } => {
160                  if let Ok(config_str) = save_user_config(&dcm) {
161                    // Should probably at least log if we fail to write the config file
162                    if let Err(e) = fs::write(&Path::new(&config_path), config_str).await {
163                      error!("Error saving config file: {:?}", e);
164                    }
165                  }
166                }
167                _ => continue,
168              }
169            };
170          }
171        });
172      }
173    }
174    if let Some(frontend) = &frontend {
175      frontend.send(EngineMessage::EngineServerCreated {}).await;
176      let event_receiver = server.event_stream();
177      let frontend_clone = frontend.clone();
178      let stop_child_token = self.stop_token.child_token();
179      tokio::spawn(async move {
180        frontend_server_event_loop(event_receiver, frontend_clone, stop_child_token).await;
181      });
182    }
183
184    loop {
185      let session_connection_token = CancellationToken::new();
186      info!("Starting server");
187
188      // Let everything spin up, then try crashing.
189
190      #[cfg(debug_assertions)]
191      maybe_crash_main_thread(options);
192
193      let mut exit_requested = false;
194      select! {
195        _ = self.stop_token.cancelled() => {
196          info!("Owner requested process exit, exiting.");
197          exit_requested = true;
198        }
199        result = run_server(&server, options) => {
200          match result {
201            Ok(_) => info!("Connection dropped, restarting stay open loop."),
202            Err(e) => {
203              error!("{}", format!("Process Error: {:?}", e));
204
205              if let Some(frontend) = &frontend {
206                frontend
207                  .send(EngineMessage::EngineError{ error: format!("Process Error: {:?}", e).to_owned()})
208                  .await;
209              }
210            }
211          }
212        }
213      };
214      match server.disconnect().await {
215        Ok(_) => {
216          info!("Client forcefully disconnected from server.");
217          if let Some(frontend) = &frontend {
218            frontend.send(EngineMessage::ClientDisconnected {}).await;
219          }
220        }
221        Err(_) => info!("Client already disconnected from server."),
222      };
223      session_connection_token.cancel();
224      if exit_requested {
225        info!("Breaking out of event loop in order to exit");
226        break;
227      }
228      // We're not exiting, rebuild our server.
229      let dm = server.server().device_manager();
230      server = reset_buttplug_server(options, &dm, server.event_sender()).await?;
231      info!("Server connection dropped, restarting");
232    }
233    info!("Shutting down server...");
234    if let Err(e) = server.shutdown().await {
235      error!("Shutdown failed: {:?}", e);
236    }
237    info!("Exiting");
238    if let Some(frontend) = &frontend {
239      frontend.send(EngineMessage::EngineStopped {}).await;
240      tokio::time::sleep(Duration::from_millis(100)).await;
241      frontend.disconnect();
242    }
243    Ok(())
244  }
245
246  pub fn stop(&self) {
247    info!("Engine stop called, cancelling token.");
248    self.stop_token.cancel();
249  }
250}