csi_webclient/core/
mod.rs1mod http;
2pub mod messages;
3mod ws;
4
5use crate::core::messages::{CoreCommand, CoreEvent};
6use std::sync::mpsc::{self, Receiver, Sender};
7
8pub struct CoreHandle {
15 cmd_tx: Sender<CoreCommand>,
16 event_rx: Receiver<CoreEvent>,
17}
18
19impl CoreHandle {
20 pub fn new() -> Self {
22 let (cmd_tx, cmd_rx) = mpsc::channel::<CoreCommand>();
23 let (event_tx, event_rx) = mpsc::channel::<CoreEvent>();
24
25 std::thread::Builder::new()
26 .name("csi-core-worker".to_owned())
27 .spawn(move || worker_loop(cmd_rx, event_tx))
28 .expect("failed to spawn core worker thread");
29
30 Self { cmd_tx, event_rx }
31 }
32
33 pub fn submit(&self, command: CoreCommand) {
35 let _ = self.cmd_tx.send(command);
36 }
37
38 pub fn try_recv(&self) -> Option<CoreEvent> {
40 self.event_rx.try_recv().ok()
41 }
42}
43
44impl Drop for CoreHandle {
45 fn drop(&mut self) {
46 let _ = self.cmd_tx.send(CoreCommand::Shutdown);
47 }
48}
49
50fn worker_loop(cmd_rx: Receiver<CoreCommand>, event_tx: Sender<CoreEvent>) {
51 let runtime = match tokio::runtime::Builder::new_multi_thread()
52 .enable_all()
53 .build()
54 {
55 Ok(rt) => rt,
56 Err(err) => {
57 let _ = event_tx.send(CoreEvent::Log(format!(
58 "Failed to initialize async runtime: {err}"
59 )));
60 return;
61 }
62 };
63
64 let mut ws_stop_tx: Option<tokio::sync::oneshot::Sender<()>> = None;
65 let mut ws_task: Option<tokio::task::JoinHandle<()>> = None;
66
67 while let Ok(command) = cmd_rx.recv() {
68 match command {
69 CoreCommand::ExecuteApi(request) => {
70 let event = runtime.block_on(http::execute_api_request(request));
71 let _ = event_tx.send(event);
72 }
73 CoreCommand::ConnectWebSocket { url } => {
74 stop_ws_task(&runtime, &mut ws_stop_tx, &mut ws_task);
75
76 let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
77 ws_stop_tx = Some(stop_tx);
78
79 let event_tx_clone = event_tx.clone();
80 ws_task = Some(runtime.spawn(async move {
81 ws::run_ws_loop(url, stop_rx, event_tx_clone).await;
82 }));
83 }
84 CoreCommand::DisconnectWebSocket => {
85 stop_ws_task(&runtime, &mut ws_stop_tx, &mut ws_task);
86 let _ = event_tx.send(CoreEvent::WebSocketDisconnected {
87 reason: "Disconnected".to_owned(),
88 });
89 }
90 CoreCommand::Shutdown => {
91 stop_ws_task(&runtime, &mut ws_stop_tx, &mut ws_task);
92 break;
93 }
94 }
95 }
96}
97
98fn stop_ws_task(
99 runtime: &tokio::runtime::Runtime,
100 ws_stop_tx: &mut Option<tokio::sync::oneshot::Sender<()>>,
101 ws_task: &mut Option<tokio::task::JoinHandle<()>>,
102) {
103 if let Some(stop_tx) = ws_stop_tx.take() {
104 let _ = stop_tx.send(());
105 }
106
107 if let Some(task) = ws_task.take() {
108 let _ = runtime.block_on(task);
109 }
110}