1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
use crate::{
  backdoor_server::BackdoorServer,
  buttplug_server::{run_server, setup_buttplug_server},
  error::IntifaceEngineError,
  frontend::{
    frontend_external_event_loop, frontend_server_event_loop, process_messages::EngineMessage,
    Frontend,
  },
  mdns::IntifaceMdns,
  options::EngineOptions,
  ButtplugRepeater,
};

use once_cell::sync::OnceCell;
use std::{sync::Arc, time::Duration};
use tokio::select;
use tokio_util::sync::CancellationToken;

#[cfg(debug_assertions)]
pub fn maybe_crash_main_thread(options: &EngineOptions) {
  if options.crash_main_thread() {
    panic!("Crashing main thread by request");
  }
}

#[allow(dead_code)]
#[cfg(debug_assertions)]
pub fn maybe_crash_task_thread(options: &EngineOptions) {
  if options.crash_task_thread() {
    tokio::spawn(async {
      tokio::time::sleep(Duration::from_millis(100)).await;
      panic!("Crashing a task thread by request");
    });
  }
}

#[derive(Default)]
pub struct IntifaceEngine {
  stop_token: Arc<CancellationToken>,
  backdoor_server: OnceCell<Arc<BackdoorServer>>,
}

impl IntifaceEngine {
  pub fn backdoor_server(&self) -> Option<Arc<BackdoorServer>> {
    Some(self.backdoor_server.get()?.clone())
  }

  pub async fn run(
    &self,
    options: &EngineOptions,
    frontend: Option<Arc<dyn Frontend>>,
  ) -> Result<(), IntifaceEngineError> {
    // Set up Frontend
    if let Some(frontend) = &frontend {
      let frontend_loop = frontend_external_event_loop(frontend.clone(), self.stop_token.clone());
      tokio::spawn(async move {
        frontend_loop.await;
      });

      frontend.connect().await.unwrap();
      frontend.send(EngineMessage::EngineStarted {}).await;
    }

    // Set up mDNS
    let _mdns_server = if options.broadcast_server_mdns() {
      // TODO Unregister whenever we have a live connection

      // TODO Support different services for engine versus repeater
      Some(IntifaceMdns::new())
    } else {
      None
    };

    // Set up Repeater (if in repeater mode)
    if options.repeater_mode() {
      info!("Starting repeater");

      let repeater = ButtplugRepeater::new(
        options.repeater_local_port().unwrap(),
        &options.repeater_remote_address().as_ref().unwrap(),
        self.stop_token.child_token(),
      );
      select! {
        _ = self.stop_token.cancelled() => {
          info!("Owner requested process exit, exiting.");
        }
        _ = repeater.listen() => {
          info!("Repeater listener stopped, exiting.");
        }
      };
      if let Some(frontend) = &frontend {
        frontend.send(EngineMessage::EngineStopped {}).await;
        tokio::time::sleep(Duration::from_millis(100)).await;
        frontend.disconnect();
      }
      return Ok(());
    }

    // Set up Engine (if in engine mode)

    // At this point we will have received and validated options.

    // Hang out until those listeners get sick of listening.
    info!("Intiface CLI Setup finished, running server tasks until all joined.");
    let server = setup_buttplug_server(options, &self.backdoor_server).await?;

    if let Some(frontend) = &frontend {
      frontend.send(EngineMessage::EngineServerCreated {}).await;
      let event_receiver = server.event_stream();
      let frontend_clone = frontend.clone();
      let stop_child_token = self.stop_token.child_token();
      tokio::spawn(async move {
        frontend_server_event_loop(event_receiver, frontend_clone, stop_child_token).await;
      });
    }

    loop {
      let session_connection_token = CancellationToken::new();
      info!("Starting server");

      // Let everything spin up, then try crashing.

      #[cfg(debug_assertions)]
      maybe_crash_main_thread(options);

      let mut exit_requested = false;
      select! {
        _ = self.stop_token.cancelled() => {
          info!("Owner requested process exit, exiting.");
          exit_requested = true;
        }
        result = run_server(&server, options) => {
          match result {
            Ok(_) => info!("Connection dropped, restarting stay open loop."),
            Err(e) => {
              error!("{}", format!("Process Error: {:?}", e));
              if let Some(frontend) = &frontend {
                frontend
                  .send(EngineMessage::EngineError{ error: format!("Process Error: {:?}", e).to_owned()})
                  .await;
              }
              exit_requested = true;
            }
          }
        }
      };
      match server.disconnect().await {
        Ok(_) => {
          info!("Client forcefully disconnected from server.");
          if let Some(frontend) = &frontend {
            frontend.send(EngineMessage::ClientDisconnected {}).await;
          }
        }
        Err(_) => info!("Client already disconnected from server."),
      };
      session_connection_token.cancel();
      if exit_requested {
        info!("Breaking out of event loop in order to exit");
        break;
      }
      info!("Server connection dropped, restarting");
    }
    info!("Shutting down server...");
    if let Err(e) = server.shutdown().await {
      error!("Shutdown failed: {:?}", e);
    }
    info!("Exiting");
    if let Some(frontend) = &frontend {
      frontend.send(EngineMessage::EngineStopped {}).await;
      tokio::time::sleep(Duration::from_millis(100)).await;
      frontend.disconnect();
    }
    Ok(())
  }

  pub fn stop(&self) {
    info!("Engine stop called, cancelling token.");
    self.stop_token.cancel();
  }
}