Skip to main content

quantoxide/tui/sync/
mod.rs

1use std::{
2    sync::{Arc, Mutex},
3    time::Duration,
4};
5
6use async_trait::async_trait;
7use tokio::{
8    sync::{OnceCell, broadcast::error::RecvError, mpsc},
9    time,
10};
11
12use crate::{
13    sync::{SyncEngine, SyncMode, SyncReader, SyncUpdate},
14    util::AbortOnDropHandle,
15};
16
17use super::{
18    config::TuiConfig,
19    core::{self, TuiControllerShutdown, TuiLogger},
20    error::{Result, TuiError},
21    status::{TuiStatus, TuiStatusManager, TuiStatusStopped},
22    terminal::TuiTerminal,
23};
24
25mod view;
26
27use view::SyncTuiView;
28
29#[derive(Debug)]
30pub enum SyncUiMessage {
31    LogEntry(String),
32    StateUpdate(String),
33    ShutdownCompleted,
34}
35
36/// Terminal user interface for synchronization operations.
37///
38/// `SyncTui` provides a visual interface for monitoring price data synchronization, including sync
39/// status, price ticks, and price history state. It must be coupled with a [`SyncEngine`] before
40/// synchronization begins.
41pub struct SyncTui {
42    event_check_interval: Duration,
43    shutdown_timeout: Duration,
44    status_manager: Arc<TuiStatusManager<SyncTuiView>>,
45    // Ownership ensures the `TuiTerminal` destructor is executed when `SyncTui` is dropped
46    tui_terminal: Arc<TuiTerminal>,
47    ui_tx: mpsc::Sender<SyncUiMessage>,
48    // Explicitly aborted on drop, to ensure the terminal is restored before
49    // `SyncTui`'s drop is completed.
50    ui_task_handle: Arc<Mutex<Option<AbortOnDropHandle<()>>>>,
51    _shutdown_listener_handle: AbortOnDropHandle<()>,
52    sync_controller: Arc<OnceCell<Arc<dyn TuiControllerShutdown>>>,
53    sync_update_listener_handle: OnceCell<AbortOnDropHandle<()>>,
54}
55
56impl SyncTui {
57    /// Launches a new sync TUI with the specified configuration.
58    ///
59    /// Optionally writes TUI logs to a file if `log_file_path` is provided.
60    pub async fn launch(config: TuiConfig, log_file_path: Option<&str>) -> Result<Arc<Self>> {
61        let log_file = core::open_log_file(log_file_path)?;
62
63        let (ui_tx, ui_rx) = mpsc::channel::<SyncUiMessage>(100);
64        let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
65
66        let tui_terminal = TuiTerminal::new()?;
67
68        let tui_view = SyncTuiView::new(config.max_tui_log_len(), log_file);
69
70        let status_manager = TuiStatusManager::new_running(tui_view.clone());
71
72        let ui_task_handle = core::spawn_ui_task(
73            config.event_check_interval(),
74            tui_view,
75            status_manager.clone(),
76            tui_terminal.clone(),
77            ui_rx,
78            shutdown_tx,
79        );
80
81        let sync_controller = Arc::new(OnceCell::new());
82
83        let _shutdown_listener_handle = core::spawn_shutdown_signal_listener(
84            config.shutdown_timeout(),
85            status_manager.clone(),
86            shutdown_rx,
87            ui_task_handle.clone(),
88            {
89                let ui_tx = ui_tx.clone();
90                || async move { ui_tx.send(SyncUiMessage::ShutdownCompleted).await }
91            },
92            sync_controller.clone(),
93        );
94
95        Ok(Arc::new(Self {
96            event_check_interval: config.event_check_interval(),
97            shutdown_timeout: config.shutdown_timeout(),
98            status_manager,
99            tui_terminal,
100            ui_tx,
101            ui_task_handle,
102            _shutdown_listener_handle,
103            sync_controller,
104            sync_update_listener_handle: OnceCell::new(),
105        }))
106    }
107
108    /// Returns the current [`TuiStatus`] as a snapshot.
109    pub fn status(&self) -> TuiStatus {
110        self.status_manager.status()
111    }
112
113    fn spawn_sync_update_listener(
114        status_manager: Arc<TuiStatusManager<SyncTuiView>>,
115        sync_reader: Arc<dyn SyncReader>,
116        ui_tx: mpsc::Sender<SyncUiMessage>,
117    ) -> AbortOnDropHandle<()> {
118        tokio::spawn(async move {
119            let send_ui_msg = async |ui_msg: SyncUiMessage| -> Result<()> {
120                ui_tx
121                    .send(ui_msg)
122                    .await
123                    .map_err(|e| TuiError::SyncTuiSendFailed(Box::new(e)))
124            };
125
126            let handle_sync_update = async |sync_update: SyncUpdate| -> Result<()> {
127                match sync_update {
128                    SyncUpdate::Status(sync_status) => {
129                        send_ui_msg(SyncUiMessage::LogEntry(format!(
130                            "Sync status: {sync_status}"
131                        )))
132                        .await?;
133                    }
134                    SyncUpdate::PriceTick(tick) => {
135                        send_ui_msg(SyncUiMessage::LogEntry(tick.to_string())).await?;
136                    }
137                    SyncUpdate::PriceHistoryState(price_history_state) => {
138                        send_ui_msg(SyncUiMessage::StateUpdate(format!(
139                            "\n{}",
140                            price_history_state.summary()
141                        )))
142                        .await?;
143                    }
144                }
145                Ok(())
146            };
147
148            if matches!(sync_reader.mode(), SyncMode::Live(None))
149                && let Err(e) =
150                    send_ui_msg(SyncUiMessage::StateUpdate("Not evaluated.".to_string())).await
151            {
152                status_manager.set_crashed(e);
153                return;
154            }
155
156            let mut sync_rx = sync_reader.update_receiver();
157
158            loop {
159                match sync_rx.recv().await {
160                    Ok(live_update) => {
161                        if let Err(e) = handle_sync_update(live_update).await {
162                            status_manager.set_crashed(e);
163                            return;
164                        }
165                    }
166                    Err(RecvError::Lagged(skipped)) => {
167                        let log_msg = format!("Sync updates lagged by {skipped} messages");
168
169                        if let Err(e) = send_ui_msg(SyncUiMessage::LogEntry(log_msg)).await {
170                            status_manager.set_crashed(e);
171                            return;
172                        }
173
174                        // Keep trying to receive
175                    }
176                    Err(e) => {
177                        // `sync_rx` is expected to be dropped during shutdown
178
179                        let status = status_manager.status();
180                        if status.is_shutdown_initiated() || status.is_shutdown() {
181                            return;
182                        }
183
184                        status_manager.set_crashed(TuiError::SyncRecv(e));
185
186                        return;
187                    }
188                }
189            }
190        })
191        .into()
192    }
193
194    /// Couples a [`SyncEngine`] to this TUI instance.
195    ///
196    /// This method starts the sync engine and begins listening for sync updates. It can only be
197    /// called once per TUI instance.
198    ///
199    /// Returns an error if a sync engine has already been coupled.
200    pub fn couple(&self, engine: SyncEngine) -> Result<()> {
201        if self.sync_controller.initialized() {
202            return Err(TuiError::SyncEngineAlreadyCoupled);
203        }
204
205        let sync_update_listener_handle = Self::spawn_sync_update_listener(
206            self.status_manager.clone(),
207            engine.reader(),
208            self.ui_tx.clone(),
209        );
210
211        let sync_controller = engine.start();
212
213        self.sync_controller
214            .set(sync_controller)
215            .map_err(|_| TuiError::SyncEngineAlreadyCoupled)?;
216
217        self.sync_update_listener_handle
218            .set(sync_update_listener_handle)
219            .map_err(|_| TuiError::SyncEngineAlreadyCoupled)?;
220
221        Ok(())
222    }
223
224    /// Performs a graceful shutdown of the sync TUI.
225    ///
226    /// This method shuts down the coupled sync engine and stops the UI task. If shutdown does not
227    /// complete within the configured timeout, the task is aborted.
228    ///
229    /// Returns an error if the TUI is not running or if shutdown fails.
230    pub async fn shutdown(&self) -> Result<()> {
231        self.status_manager.require_running()?;
232
233        let sync_controller = self.sync_controller.get().cloned();
234
235        core::shutdown_inner(
236            self.shutdown_timeout,
237            self.status_manager.clone(),
238            self.ui_task_handle.clone(),
239            || self.ui_tx.send(SyncUiMessage::ShutdownCompleted),
240            sync_controller,
241        )
242        .await
243    }
244
245    /// Waits until the TUI has stopped and returns the final stopped status.
246    ///
247    /// This method blocks until the TUI reaches a stopped state, either through graceful shutdown
248    /// or a crash.
249    ///
250    /// The terminal is automatically restored before this method returns.
251    pub async fn until_stopped(&self) -> Arc<TuiStatusStopped> {
252        loop {
253            if let TuiStatus::Stopped(status_stopped) = self.status() {
254                let _ = self.tui_terminal.restore();
255                return status_stopped;
256            }
257
258            time::sleep(self.event_check_interval).await;
259        }
260    }
261
262    /// Logs a message to the TUI.
263    ///
264    /// Returns an error if the TUI is not running or if sending the log entry fails.
265    pub async fn log(&self, text: String) -> Result<()> {
266        self.status_manager.require_running()?;
267
268        // An error here would be an edge case
269
270        self.ui_tx
271            .send(SyncUiMessage::LogEntry(text))
272            .await
273            .map_err(|e| TuiError::SyncTuiSendFailed(Box::new(e)))
274    }
275
276    /// Returns this TUI as a [`TuiLogger`] trait object.
277    ///
278    /// This is useful for passing the TUI to components that accept a generic logger.
279    pub fn as_logger(self: &Arc<Self>) -> Arc<dyn TuiLogger> {
280        self.clone()
281    }
282}
283
284#[async_trait]
285impl TuiLogger for SyncTui {
286    async fn log(&self, log_entry: String) -> Result<()> {
287        self.log(log_entry).await
288    }
289}
290
291impl Drop for SyncTui {
292    fn drop(&mut self) {
293        if let Some(ui_handle) = self
294            .ui_task_handle
295            .lock()
296            .expect("`ui_task_handle` mutex can't be poisoned")
297            .take()
298        {
299            ui_handle.abort();
300        };
301    }
302}