Skip to main content

csi_webclient/core/
mod.rs

1mod http;
2pub mod messages;
3mod ws;
4
5use crate::core::messages::{CoreCommand, CoreEvent};
6use std::sync::mpsc::{self, Receiver, Sender};
7
8/// App-facing handle for the background core worker.
9///
10/// The handle is intentionally lightweight and only exposes:
11///
12/// - command submission (`submit`)
13/// - non-blocking event polling (`try_recv`)
14pub struct CoreHandle {
15    cmd_tx: Sender<CoreCommand>,
16    event_rx: Receiver<CoreEvent>,
17}
18
19impl CoreHandle {
20    /// Spawn a new core worker thread and return a handle.
21    pub fn new() -> Self {
22        let (cmd_tx, cmd_rx) = mpsc::channel::<CoreCommand>();
23        let (event_tx, event_rx) = mpsc::channel::<CoreEvent>();
24
25        std::thread::Builder::new()
26            .name("csi-core-worker".to_owned())
27            .spawn(move || worker_loop(cmd_rx, event_tx))
28            .expect("failed to spawn core worker thread");
29
30        Self { cmd_tx, event_rx }
31    }
32
33    /// Submit a command to the core worker.
34    pub fn submit(&self, command: CoreCommand) {
35        let _ = self.cmd_tx.send(command);
36    }
37
38    /// Poll the next core event without blocking the UI thread.
39    pub fn try_recv(&self) -> Option<CoreEvent> {
40        self.event_rx.try_recv().ok()
41    }
42}
43
44impl Drop for CoreHandle {
45    fn drop(&mut self) {
46        let _ = self.cmd_tx.send(CoreCommand::Shutdown);
47    }
48}
49
50fn worker_loop(cmd_rx: Receiver<CoreCommand>, event_tx: Sender<CoreEvent>) {
51    let runtime = match tokio::runtime::Builder::new_multi_thread()
52        .enable_all()
53        .build()
54    {
55        Ok(rt) => rt,
56        Err(err) => {
57            let _ = event_tx.send(CoreEvent::Log(format!(
58                "Failed to initialize async runtime: {err}"
59            )));
60            return;
61        }
62    };
63
64    let mut ws_stop_tx: Option<tokio::sync::oneshot::Sender<()>> = None;
65    let mut ws_task: Option<tokio::task::JoinHandle<()>> = None;
66
67    while let Ok(command) = cmd_rx.recv() {
68        match command {
69            CoreCommand::ExecuteApi(request) => {
70                let event = runtime.block_on(http::execute_api_request(request));
71                let _ = event_tx.send(event);
72            }
73            CoreCommand::ConnectWebSocket { url } => {
74                stop_ws_task(&runtime, &mut ws_stop_tx, &mut ws_task);
75
76                let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
77                ws_stop_tx = Some(stop_tx);
78
79                let event_tx_clone = event_tx.clone();
80                ws_task = Some(runtime.spawn(async move {
81                    ws::run_ws_loop(url, stop_rx, event_tx_clone).await;
82                }));
83            }
84            CoreCommand::DisconnectWebSocket => {
85                stop_ws_task(&runtime, &mut ws_stop_tx, &mut ws_task);
86                let _ = event_tx.send(CoreEvent::WebSocketDisconnected {
87                    reason: "Disconnected".to_owned(),
88                });
89            }
90            CoreCommand::Shutdown => {
91                stop_ws_task(&runtime, &mut ws_stop_tx, &mut ws_task);
92                break;
93            }
94        }
95    }
96}
97
98fn stop_ws_task(
99    runtime: &tokio::runtime::Runtime,
100    ws_stop_tx: &mut Option<tokio::sync::oneshot::Sender<()>>,
101    ws_task: &mut Option<tokio::task::JoinHandle<()>>,
102) {
103    if let Some(stop_tx) = ws_stop_tx.take() {
104        let _ = stop_tx.send(());
105    }
106
107    if let Some(task) = ws_task.take() {
108        let _ = runtime.block_on(task);
109    }
110}