quantoxide/tui/sync/
mod.rs1use std::{
2 sync::{Arc, Mutex},
3 time::Duration,
4};
5
6use async_trait::async_trait;
7use tokio::{
8 sync::{OnceCell, broadcast::error::RecvError, mpsc},
9 time,
10};
11
12use crate::{
13 sync::{SyncEngine, SyncMode, SyncReader, SyncUpdate},
14 util::AbortOnDropHandle,
15};
16
17use super::{
18 config::TuiConfig,
19 core::{self, TuiControllerShutdown, TuiLogger},
20 error::{Result, TuiError},
21 status::{TuiStatus, TuiStatusManager, TuiStatusStopped},
22 terminal::TuiTerminal,
23};
24
25mod view;
26
27use view::SyncTuiView;
28
29#[derive(Debug)]
30pub enum SyncUiMessage {
31 LogEntry(String),
32 StateUpdate(String),
33 ShutdownCompleted,
34}
35
36pub struct SyncTui {
42 event_check_interval: Duration,
43 shutdown_timeout: Duration,
44 status_manager: Arc<TuiStatusManager<SyncTuiView>>,
45 tui_terminal: Arc<TuiTerminal>,
47 ui_tx: mpsc::Sender<SyncUiMessage>,
48 ui_task_handle: Arc<Mutex<Option<AbortOnDropHandle<()>>>>,
51 _shutdown_listener_handle: AbortOnDropHandle<()>,
52 sync_controller: Arc<OnceCell<Arc<dyn TuiControllerShutdown>>>,
53 sync_update_listener_handle: OnceCell<AbortOnDropHandle<()>>,
54}
55
56impl SyncTui {
57 pub async fn launch(config: TuiConfig, log_file_path: Option<&str>) -> Result<Arc<Self>> {
61 let log_file = core::open_log_file(log_file_path)?;
62
63 let (ui_tx, ui_rx) = mpsc::channel::<SyncUiMessage>(100);
64 let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
65
66 let tui_terminal = TuiTerminal::new()?;
67
68 let tui_view = SyncTuiView::new(config.max_tui_log_len(), log_file);
69
70 let status_manager = TuiStatusManager::new_running(tui_view.clone());
71
72 let ui_task_handle = core::spawn_ui_task(
73 config.event_check_interval(),
74 tui_view,
75 status_manager.clone(),
76 tui_terminal.clone(),
77 ui_rx,
78 shutdown_tx,
79 );
80
81 let sync_controller = Arc::new(OnceCell::new());
82
83 let _shutdown_listener_handle = core::spawn_shutdown_signal_listener(
84 config.shutdown_timeout(),
85 status_manager.clone(),
86 shutdown_rx,
87 ui_task_handle.clone(),
88 {
89 let ui_tx = ui_tx.clone();
90 || async move { ui_tx.send(SyncUiMessage::ShutdownCompleted).await }
91 },
92 sync_controller.clone(),
93 );
94
95 Ok(Arc::new(Self {
96 event_check_interval: config.event_check_interval(),
97 shutdown_timeout: config.shutdown_timeout(),
98 status_manager,
99 tui_terminal,
100 ui_tx,
101 ui_task_handle,
102 _shutdown_listener_handle,
103 sync_controller,
104 sync_update_listener_handle: OnceCell::new(),
105 }))
106 }
107
108 pub fn status(&self) -> TuiStatus {
110 self.status_manager.status()
111 }
112
113 fn spawn_sync_update_listener(
114 status_manager: Arc<TuiStatusManager<SyncTuiView>>,
115 sync_reader: Arc<dyn SyncReader>,
116 ui_tx: mpsc::Sender<SyncUiMessage>,
117 ) -> AbortOnDropHandle<()> {
118 tokio::spawn(async move {
119 let send_ui_msg = async |ui_msg: SyncUiMessage| -> Result<()> {
120 ui_tx
121 .send(ui_msg)
122 .await
123 .map_err(|e| TuiError::SyncTuiSendFailed(Box::new(e)))
124 };
125
126 let handle_sync_update = async |sync_update: SyncUpdate| -> Result<()> {
127 match sync_update {
128 SyncUpdate::Status(sync_status) => {
129 send_ui_msg(SyncUiMessage::LogEntry(format!(
130 "Sync status: {sync_status}"
131 )))
132 .await?;
133 }
134 SyncUpdate::PriceTick(tick) => {
135 send_ui_msg(SyncUiMessage::LogEntry(tick.to_string())).await?;
136 }
137 SyncUpdate::PriceHistoryState(price_history_state) => {
138 send_ui_msg(SyncUiMessage::StateUpdate(format!(
139 "\n{}",
140 price_history_state.summary()
141 )))
142 .await?;
143 }
144 }
145 Ok(())
146 };
147
148 if matches!(sync_reader.mode(), SyncMode::Live(None))
149 && let Err(e) =
150 send_ui_msg(SyncUiMessage::StateUpdate("Not evaluated.".to_string())).await
151 {
152 status_manager.set_crashed(e);
153 return;
154 }
155
156 let mut sync_rx = sync_reader.update_receiver();
157
158 loop {
159 match sync_rx.recv().await {
160 Ok(live_update) => {
161 if let Err(e) = handle_sync_update(live_update).await {
162 status_manager.set_crashed(e);
163 return;
164 }
165 }
166 Err(RecvError::Lagged(skipped)) => {
167 let log_msg = format!("Sync updates lagged by {skipped} messages");
168
169 if let Err(e) = send_ui_msg(SyncUiMessage::LogEntry(log_msg)).await {
170 status_manager.set_crashed(e);
171 return;
172 }
173
174 }
176 Err(e) => {
177 let status = status_manager.status();
180 if status.is_shutdown_initiated() || status.is_shutdown() {
181 return;
182 }
183
184 status_manager.set_crashed(TuiError::SyncRecv(e));
185
186 return;
187 }
188 }
189 }
190 })
191 .into()
192 }
193
194 pub fn couple(&self, engine: SyncEngine) -> Result<()> {
201 if self.sync_controller.initialized() {
202 return Err(TuiError::SyncEngineAlreadyCoupled);
203 }
204
205 let sync_update_listener_handle = Self::spawn_sync_update_listener(
206 self.status_manager.clone(),
207 engine.reader(),
208 self.ui_tx.clone(),
209 );
210
211 let sync_controller = engine.start();
212
213 self.sync_controller
214 .set(sync_controller)
215 .map_err(|_| TuiError::SyncEngineAlreadyCoupled)?;
216
217 self.sync_update_listener_handle
218 .set(sync_update_listener_handle)
219 .map_err(|_| TuiError::SyncEngineAlreadyCoupled)?;
220
221 Ok(())
222 }
223
224 pub async fn shutdown(&self) -> Result<()> {
231 self.status_manager.require_running()?;
232
233 let sync_controller = self.sync_controller.get().cloned();
234
235 core::shutdown_inner(
236 self.shutdown_timeout,
237 self.status_manager.clone(),
238 self.ui_task_handle.clone(),
239 || self.ui_tx.send(SyncUiMessage::ShutdownCompleted),
240 sync_controller,
241 )
242 .await
243 }
244
245 pub async fn until_stopped(&self) -> Arc<TuiStatusStopped> {
252 loop {
253 if let TuiStatus::Stopped(status_stopped) = self.status() {
254 let _ = self.tui_terminal.restore();
255 return status_stopped;
256 }
257
258 time::sleep(self.event_check_interval).await;
259 }
260 }
261
262 pub async fn log(&self, text: String) -> Result<()> {
266 self.status_manager.require_running()?;
267
268 self.ui_tx
271 .send(SyncUiMessage::LogEntry(text))
272 .await
273 .map_err(|e| TuiError::SyncTuiSendFailed(Box::new(e)))
274 }
275
276 pub fn as_logger(self: &Arc<Self>) -> Arc<dyn TuiLogger> {
280 self.clone()
281 }
282}
283
284#[async_trait]
285impl TuiLogger for SyncTui {
286 async fn log(&self, log_entry: String) -> Result<()> {
287 self.log(log_entry).await
288 }
289}
290
291impl Drop for SyncTui {
292 fn drop(&mut self) {
293 if let Some(ui_handle) = self
294 .ui_task_handle
295 .lock()
296 .expect("`ui_task_handle` mutex can't be poisoned")
297 .take()
298 {
299 ui_handle.abort();
300 };
301 }
302}