intiface_engine/
engine.rs

1use crate::{
2  backdoor_server::BackdoorServer,
3  buttplug_server::{run_server, setup_buttplug_server},
4  error::IntifaceEngineError,
5  frontend::{
6    frontend_external_event_loop, frontend_server_event_loop, process_messages::EngineMessage,
7    Frontend,
8  },
9  mdns::IntifaceMdns,
10  options::EngineOptions,
11  remote_server::ButtplugRemoteServerEvent,
12  ButtplugRepeater,
13};
14
15use buttplug::{
16  server::device::configuration::DeviceConfigurationManager,
17  util::device_configuration::save_user_config,
18};
19use futures::{pin_mut, StreamExt};
20use once_cell::sync::OnceCell;
21use std::{path::Path, sync::Arc, time::Duration};
22use tokio::{fs, select};
23use tokio_util::sync::CancellationToken;
24
25#[cfg(debug_assertions)]
26pub fn maybe_crash_main_thread(options: &EngineOptions) {
27  if options.crash_main_thread() {
28    panic!("Crashing main thread by request");
29  }
30}
31
32#[allow(dead_code)]
33#[cfg(debug_assertions)]
34pub fn maybe_crash_task_thread(options: &EngineOptions) {
35  if options.crash_task_thread() {
36    tokio::spawn(async {
37      tokio::time::sleep(Duration::from_millis(100)).await;
38      panic!("Crashing a task thread by request");
39    });
40  }
41}
42
43#[derive(Default)]
44pub struct IntifaceEngine {
45  stop_token: Arc<CancellationToken>,
46  backdoor_server: OnceCell<Arc<BackdoorServer>>,
47}
48
49impl IntifaceEngine {
50  pub fn backdoor_server(&self) -> Option<Arc<BackdoorServer>> {
51    Some(self.backdoor_server.get()?.clone())
52  }
53
54  pub async fn run(
55    &self,
56    options: &EngineOptions,
57    frontend: Option<Arc<dyn Frontend>>,
58    dcm: &Option<Arc<DeviceConfigurationManager>>,
59  ) -> Result<(), IntifaceEngineError> {
60    // Set up Frontend
61    if let Some(frontend) = &frontend {
62      let frontend_loop = frontend_external_event_loop(frontend.clone(), self.stop_token.clone());
63      tokio::spawn(async move {
64        frontend_loop.await;
65      });
66
67      frontend.connect().await.unwrap();
68      frontend.send(EngineMessage::EngineStarted {}).await;
69    }
70
71    // Set up mDNS
72    let _mdns_server = if options.broadcast_server_mdns() {
73      // TODO Unregister whenever we have a live connection
74
75      // TODO Support different services for engine versus repeater
76      Some(IntifaceMdns::new())
77    } else {
78      None
79    };
80
81    // Set up Repeater (if in repeater mode)
82    if options.repeater_mode() {
83      info!("Starting repeater");
84
85      let repeater = ButtplugRepeater::new(
86        options.repeater_local_port().unwrap(),
87        &options.repeater_remote_address().as_ref().unwrap(),
88        self.stop_token.child_token(),
89      );
90      select! {
91        _ = self.stop_token.cancelled() => {
92          info!("Owner requested process exit, exiting.");
93        }
94        _ = repeater.listen() => {
95          info!("Repeater listener stopped, exiting.");
96        }
97      };
98      if let Some(frontend) = &frontend {
99        frontend.send(EngineMessage::EngineStopped {}).await;
100        tokio::time::sleep(Duration::from_millis(100)).await;
101        frontend.disconnect();
102      }
103      return Ok(());
104    }
105
106    // Set up Engine (if in engine mode)
107
108    // At this point we will have received and validated options.
109
110    // Hang out until those listeners get sick of listening.
111    info!("Intiface CLI Setup finished, running server tasks until all joined.");
112    let server = setup_buttplug_server(options, &self.backdoor_server, &dcm).await?;
113    let dcm = server
114      .server()
115      .device_manager()
116      .device_configuration_manager()
117      .clone();
118    if let Some(config_path) = options.user_device_config_path() {
119      let stream = server.event_stream();
120      {
121        let config_path = config_path.to_owned();
122        tokio::spawn(async move {
123          pin_mut!(stream);
124          loop {
125            if let Some(event) = stream.next().await {
126              match event {
127                ButtplugRemoteServerEvent::DeviceAdded {
128                  index: _,
129                  identifier: _,
130                  name: _,
131                  display_name: _,
132                } => {
133                  if let Ok(config_str) = save_user_config(&dcm) {
134                    // Should probably at least log if we fail to write the config file
135                    let _ = fs::write(&Path::new(&config_path), config_str).await;
136                  }
137                }
138                _ => continue,
139              }
140            };
141          }
142        });
143      }
144    }
145    if let Some(frontend) = &frontend {
146      frontend.send(EngineMessage::EngineServerCreated {}).await;
147      let event_receiver = server.event_stream();
148      let frontend_clone = frontend.clone();
149      let stop_child_token = self.stop_token.child_token();
150      tokio::spawn(async move {
151        frontend_server_event_loop(event_receiver, frontend_clone, stop_child_token).await;
152      });
153    }
154
155    loop {
156      let session_connection_token = CancellationToken::new();
157      info!("Starting server");
158
159      // Let everything spin up, then try crashing.
160
161      #[cfg(debug_assertions)]
162      maybe_crash_main_thread(options);
163
164      let mut exit_requested = false;
165      select! {
166        _ = self.stop_token.cancelled() => {
167          info!("Owner requested process exit, exiting.");
168          exit_requested = true;
169        }
170        result = run_server(&server, options) => {
171          match result {
172            Ok(_) => info!("Connection dropped, restarting stay open loop."),
173            Err(e) => {
174              error!("{}", format!("Process Error: {:?}", e));
175              if let Some(frontend) = &frontend {
176                frontend
177                  .send(EngineMessage::EngineError{ error: format!("Process Error: {:?}", e).to_owned()})
178                  .await;
179              }
180              exit_requested = true;
181            }
182          }
183        }
184      };
185      match server.disconnect().await {
186        Ok(_) => {
187          info!("Client forcefully disconnected from server.");
188          if let Some(frontend) = &frontend {
189            frontend.send(EngineMessage::ClientDisconnected {}).await;
190          }
191        }
192        Err(_) => info!("Client already disconnected from server."),
193      };
194      session_connection_token.cancel();
195      if exit_requested {
196        info!("Breaking out of event loop in order to exit");
197        break;
198      }
199      info!("Server connection dropped, restarting");
200    }
201    info!("Shutting down server...");
202    if let Err(e) = server.shutdown().await {
203      error!("Shutdown failed: {:?}", e);
204    }
205    info!("Exiting");
206    if let Some(frontend) = &frontend {
207      frontend.send(EngineMessage::EngineStopped {}).await;
208      tokio::time::sleep(Duration::from_millis(100)).await;
209      frontend.disconnect();
210    }
211    Ok(())
212  }
213
214  pub fn stop(&self) {
215    info!("Engine stop called, cancelling token.");
216    self.stop_token.cancel();
217  }
218}