intiface_engine/
engine.rs1use crate::{
2 backdoor_server::BackdoorServer,
3 buttplug_server::{run_server, setup_buttplug_server},
4 error::IntifaceEngineError,
5 frontend::{
6 frontend_external_event_loop, frontend_server_event_loop, process_messages::EngineMessage,
7 Frontend,
8 },
9 mdns::IntifaceMdns,
10 options::EngineOptions,
11 remote_server::ButtplugRemoteServerEvent,
12 ButtplugRepeater,
13};
14
15use buttplug::{
16 server::device::configuration::DeviceConfigurationManager,
17 util::device_configuration::save_user_config,
18};
19use futures::{pin_mut, StreamExt};
20use once_cell::sync::OnceCell;
21use std::{path::Path, sync::Arc, time::Duration};
22use tokio::{fs, select};
23use tokio_util::sync::CancellationToken;
24
25#[cfg(debug_assertions)]
26pub fn maybe_crash_main_thread(options: &EngineOptions) {
27 if options.crash_main_thread() {
28 panic!("Crashing main thread by request");
29 }
30}
31
32#[allow(dead_code)]
33#[cfg(debug_assertions)]
34pub fn maybe_crash_task_thread(options: &EngineOptions) {
35 if options.crash_task_thread() {
36 tokio::spawn(async {
37 tokio::time::sleep(Duration::from_millis(100)).await;
38 panic!("Crashing a task thread by request");
39 });
40 }
41}
42
43#[derive(Default)]
44pub struct IntifaceEngine {
45 stop_token: Arc<CancellationToken>,
46 backdoor_server: OnceCell<Arc<BackdoorServer>>,
47}
48
49impl IntifaceEngine {
50 pub fn backdoor_server(&self) -> Option<Arc<BackdoorServer>> {
51 Some(self.backdoor_server.get()?.clone())
52 }
53
54 pub async fn run(
55 &self,
56 options: &EngineOptions,
57 frontend: Option<Arc<dyn Frontend>>,
58 dcm: &Option<Arc<DeviceConfigurationManager>>,
59 ) -> Result<(), IntifaceEngineError> {
60 if let Some(frontend) = &frontend {
62 let frontend_loop = frontend_external_event_loop(frontend.clone(), self.stop_token.clone());
63 tokio::spawn(async move {
64 frontend_loop.await;
65 });
66
67 frontend.connect().await.unwrap();
68 frontend.send(EngineMessage::EngineStarted {}).await;
69 }
70
71 let _mdns_server = if options.broadcast_server_mdns() {
73 Some(IntifaceMdns::new())
77 } else {
78 None
79 };
80
81 if options.repeater_mode() {
83 info!("Starting repeater");
84
85 let repeater = ButtplugRepeater::new(
86 options.repeater_local_port().unwrap(),
87 &options.repeater_remote_address().as_ref().unwrap(),
88 self.stop_token.child_token(),
89 );
90 select! {
91 _ = self.stop_token.cancelled() => {
92 info!("Owner requested process exit, exiting.");
93 }
94 _ = repeater.listen() => {
95 info!("Repeater listener stopped, exiting.");
96 }
97 };
98 if let Some(frontend) = &frontend {
99 frontend.send(EngineMessage::EngineStopped {}).await;
100 tokio::time::sleep(Duration::from_millis(100)).await;
101 frontend.disconnect();
102 }
103 return Ok(());
104 }
105
106 info!("Intiface CLI Setup finished, running server tasks until all joined.");
112 let server = setup_buttplug_server(options, &self.backdoor_server, &dcm).await?;
113 let dcm = server
114 .server()
115 .device_manager()
116 .device_configuration_manager()
117 .clone();
118 if let Some(config_path) = options.user_device_config_path() {
119 let stream = server.event_stream();
120 {
121 let config_path = config_path.to_owned();
122 tokio::spawn(async move {
123 pin_mut!(stream);
124 loop {
125 if let Some(event) = stream.next().await {
126 match event {
127 ButtplugRemoteServerEvent::DeviceAdded {
128 index: _,
129 identifier: _,
130 name: _,
131 display_name: _,
132 } => {
133 if let Ok(config_str) = save_user_config(&dcm) {
134 let _ = fs::write(&Path::new(&config_path), config_str).await;
136 }
137 }
138 _ => continue,
139 }
140 };
141 }
142 });
143 }
144 }
145 if let Some(frontend) = &frontend {
146 frontend.send(EngineMessage::EngineServerCreated {}).await;
147 let event_receiver = server.event_stream();
148 let frontend_clone = frontend.clone();
149 let stop_child_token = self.stop_token.child_token();
150 tokio::spawn(async move {
151 frontend_server_event_loop(event_receiver, frontend_clone, stop_child_token).await;
152 });
153 }
154
155 loop {
156 let session_connection_token = CancellationToken::new();
157 info!("Starting server");
158
159 #[cfg(debug_assertions)]
162 maybe_crash_main_thread(options);
163
164 let mut exit_requested = false;
165 select! {
166 _ = self.stop_token.cancelled() => {
167 info!("Owner requested process exit, exiting.");
168 exit_requested = true;
169 }
170 result = run_server(&server, options) => {
171 match result {
172 Ok(_) => info!("Connection dropped, restarting stay open loop."),
173 Err(e) => {
174 error!("{}", format!("Process Error: {:?}", e));
175 if let Some(frontend) = &frontend {
176 frontend
177 .send(EngineMessage::EngineError{ error: format!("Process Error: {:?}", e).to_owned()})
178 .await;
179 }
180 exit_requested = true;
181 }
182 }
183 }
184 };
185 match server.disconnect().await {
186 Ok(_) => {
187 info!("Client forcefully disconnected from server.");
188 if let Some(frontend) = &frontend {
189 frontend.send(EngineMessage::ClientDisconnected {}).await;
190 }
191 }
192 Err(_) => info!("Client already disconnected from server."),
193 };
194 session_connection_token.cancel();
195 if exit_requested {
196 info!("Breaking out of event loop in order to exit");
197 break;
198 }
199 info!("Server connection dropped, restarting");
200 }
201 info!("Shutting down server...");
202 if let Err(e) = server.shutdown().await {
203 error!("Shutdown failed: {:?}", e);
204 }
205 info!("Exiting");
206 if let Some(frontend) = &frontend {
207 frontend.send(EngineMessage::EngineStopped {}).await;
208 tokio::time::sleep(Duration::from_millis(100)).await;
209 frontend.disconnect();
210 }
211 Ok(())
212 }
213
214 pub fn stop(&self) {
215 info!("Engine stop called, cancelling token.");
216 self.stop_token.cancel();
217 }
218}