romm_cli/tui/app/background/
tasks.rs1use std::path::PathBuf;
4use std::time::{Duration, Instant};
5
6use crate::commands::library_scan::ScanCacheInvalidate;
7use crate::core::cache::RomCacheKey;
8use crate::core::startup_library_snapshot;
9use crate::types::SaveMetadata;
10
11use super::super::AppScreen;
12use super::types::{
13 CoverLoadDone, LibraryMetadataRefreshDone, LibraryUploadComplete, RomLoadEvent, SaveListDone,
14 SearchLoadEvent,
15};
16use crate::tui::theme::MessageTone;
17
18impl super::super::App {
19 pub(in crate::tui::app) fn spawn_library_metadata_refresh(&mut self) {
20 self.library_metadata_refresh_gen = self.library_metadata_refresh_gen.saturating_add(1);
21 let gen = self.library_metadata_refresh_gen;
22 let client = self.client.clone();
23 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
24 self.library_metadata_rx = Some(rx);
25 tokio::spawn(async move {
26 let fetch = startup_library_snapshot::fetch_merged_library_metadata(&client).await;
27 let _ = tx.send(LibraryMetadataRefreshDone {
28 gen,
29 platforms: fetch.platforms,
30 collections: fetch.collections,
31 collection_digest: fetch.collection_digest,
32 warnings: fetch.warnings,
33 });
34 });
35 }
36
37 pub fn poll_background_tasks(&mut self) {
39 self.poll_library_metadata_refresh();
40 self.poll_rom_load_results();
41 self.poll_collection_prefetch_results();
42 self.poll_search_load_results();
43 self.poll_cover_load_results();
44 self.poll_save_results();
45 self.poll_settings_results();
46 self.poll_library_upload();
47 self.poll_library_scan();
48 self.drive_collection_prefetch_scheduler();
49 if let AppScreen::LibraryBrowse(ref mut lib) = self.screen {
50 lib.poll_footer_clear();
51 }
52 }
53
54 pub(in crate::tui::app) fn spawn_library_rescan_worker(
55 &mut self,
56 cache_on_success: ScanCacheInvalidate,
57 ) {
58 if self.library_scan_inflight {
59 return;
60 }
61 self.library_scan_inflight = true;
62 self.library_scan_pending_invalidate = Some(cache_on_success);
63 if let AppScreen::LibraryBrowse(ref mut lib) = self.screen {
64 lib.set_metadata_footer(Some("Server library scan running…".into()));
65 }
66 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
67 self.library_scan_rx = Some(rx);
68 let client = self.client.clone();
69 tokio::spawn(async move {
70 let result = async {
71 let start =
72 crate::commands::library_scan::start_scan_library(&client, None).await?;
73 crate::commands::library_scan::wait_for_task_terminal(
74 &client,
75 &start.task_id,
76 Duration::from_secs(3600),
77 None,
78 |_| {},
79 )
80 .await?;
81 Ok::<(), anyhow::Error>(())
82 }
83 .await
84 .map_err(|e| e.to_string());
85 let _ = tx.send(result);
86 });
87 }
88
89 fn poll_library_scan(&mut self) {
90 let Some(rx) = &mut self.library_scan_rx else {
91 return;
92 };
93 match rx.try_recv() {
94 Ok(result) => {
95 self.library_scan_rx = None;
96 self.library_scan_inflight = false;
97 match result {
98 Ok(()) => self.on_library_scan_completed_success(),
99 Err(e) => {
100 self.library_scan_pending_invalidate = None;
101 if let AppScreen::LibraryBrowse(ref mut lib) = self.screen {
102 lib.set_metadata_footer(Some(format!("Library scan failed: {e}")));
103 } else {
104 self.global_error = Some(format!("Library scan failed: {e}"));
105 }
106 }
107 }
108 }
109 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {}
110 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
111 self.library_scan_rx = None;
112 self.library_scan_inflight = false;
113 self.library_scan_pending_invalidate = None;
114 }
115 }
116 }
117
118 pub(in crate::tui::app) fn apply_library_scan_cache_invalidate(
119 &mut self,
120 inv: &ScanCacheInvalidate,
121 ) {
122 match inv {
123 ScanCacheInvalidate::None => {}
124 ScanCacheInvalidate::Platform(pid) => {
125 self.rom_cache.remove(&RomCacheKey::Platform(*pid));
126 }
127 ScanCacheInvalidate::AllPlatforms => {
128 self.rom_cache.remove_all_platform_entries();
129 if let AppScreen::LibraryBrowse(lib) = &self.screen {
130 if let Some(ref k) = lib.cache_key() {
131 if !matches!(k, RomCacheKey::Platform(_)) {
132 self.rom_cache.remove(k);
133 }
134 }
135 }
136 }
137 }
138 }
139
140 pub(in crate::tui::app) fn on_library_scan_completed_success(&mut self) {
141 let inv = self
142 .library_scan_pending_invalidate
143 .take()
144 .unwrap_or(ScanCacheInvalidate::AllPlatforms);
145 self.apply_library_scan_cache_invalidate(&inv);
146 if matches!(self.screen, AppScreen::LibraryBrowse(_)) {
147 self.force_rom_reload_after_metadata = true;
148 self.spawn_library_metadata_refresh();
149 }
150 }
151
152 pub(in crate::tui::app) fn format_upload_bytes(n: u64) -> String {
153 const KB: u64 = 1024;
154 const MB: u64 = KB * 1024;
155 const GB: u64 = MB * 1024;
156 if n >= GB {
157 format!("{:.2} GiB", n as f64 / GB as f64)
158 } else if n >= MB {
159 format!("{:.2} MiB", n as f64 / MB as f64)
160 } else if n >= KB {
161 format!("{:.1} KiB", n as f64 / KB as f64)
162 } else {
163 format!("{n} B")
164 }
165 }
166
167 pub(in crate::tui::app) fn spawn_library_upload_worker(
168 &mut self,
169 platform_id: u64,
170 path: PathBuf,
171 scan_after: bool,
172 ) {
173 if self.library_upload_inflight || self.library_scan_inflight {
174 return;
175 }
176 self.library_upload_inflight = true;
177 self.library_upload_progress_rx = None;
178 if let AppScreen::LibraryBrowse(ref mut lib) = self.screen {
179 lib.set_metadata_footer(Some("Preparing upload…".into()));
180 }
181 let (prog_tx, prog_rx) = tokio::sync::mpsc::unbounded_channel();
182 let (done_tx, done_rx) = tokio::sync::mpsc::unbounded_channel();
183 self.library_upload_progress_rx = Some(prog_rx);
184 self.library_upload_done_rx = Some(done_rx);
185 let client = self.client.clone();
186 tokio::spawn(async move {
187 let result: Result<LibraryUploadComplete, String> = async {
188 client
189 .upload_rom(platform_id, &path, move |uploaded, total| {
190 let _ = prog_tx.send((uploaded, total));
191 })
192 .await
193 .map_err(|e| e.to_string())?;
194 Ok(LibraryUploadComplete {
195 platform_id,
196 scan_after,
197 })
198 }
199 .await;
200 let _ = done_tx.send(result);
201 });
202 }
203
204 fn poll_library_upload(&mut self) {
205 if let Some(rx) = &mut self.library_upload_progress_rx {
206 while let Ok((up, tot)) = rx.try_recv() {
207 if let AppScreen::LibraryBrowse(ref mut lib) = self.screen {
208 lib.set_metadata_footer(Some(format!(
209 "Uploading {} / {}…",
210 Self::format_upload_bytes(up),
211 Self::format_upload_bytes(tot)
212 )));
213 }
214 }
215 }
216
217 let Some(rx) = &mut self.library_upload_done_rx else {
218 return;
219 };
220 match rx.try_recv() {
221 Ok(result) => {
222 self.library_upload_done_rx = None;
223 self.library_upload_progress_rx = None;
224 self.library_upload_inflight = false;
225 match result {
226 Ok(done) => {
227 if let AppScreen::LibraryBrowse(ref mut lib) = self.screen {
228 if done.scan_after {
229 lib.set_metadata_footer(Some(
230 "Upload complete. Starting library scan…".into(),
231 ));
232 self.spawn_library_rescan_worker(ScanCacheInvalidate::Platform(
233 done.platform_id,
234 ));
235 } else {
236 lib.set_metadata_footer(Some("Upload complete.".into()));
237 }
238 }
239 }
240 Err(e) => {
241 if let AppScreen::LibraryBrowse(ref mut lib) = self.screen {
242 lib.set_metadata_footer(Some(format!("Upload failed: {e}")));
243 } else {
244 self.global_error = Some(format!("Upload failed: {e}"));
245 }
246 }
247 }
248 }
249 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {}
250 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
251 self.library_upload_done_rx = None;
252 self.library_upload_progress_rx = None;
253 self.library_upload_inflight = false;
254 }
255 }
256 }
257
258 fn poll_search_load_results(&mut self) {
259 loop {
260 match self.search_load_rx.try_recv() {
261 Ok(done) => {
262 if let AppScreen::Search(ref mut search) = self.screen {
263 match done.event {
264 SearchLoadEvent::Batch(roms) => {
265 search.set_results_for_query(done.query, roms);
266 }
267 SearchLoadEvent::Failed(err) => {
268 search.loading = false;
269 self.global_error = Some(err);
270 }
271 SearchLoadEvent::Complete => {
272 search.loading = false;
273 }
274 }
275 }
276 }
277 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
278 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
279 }
280 }
281 }
282
283 pub(in crate::tui::app) fn spawn_cover_load_worker(&mut self, rom_id: u64, url: String) {
284 if let Some(task) = self.cover_load_task.take() {
285 task.abort();
286 }
287 let tx = self.cover_load_tx.clone();
288 self.cover_load_task = Some(tokio::spawn(async move {
289 let result = async {
290 let response = reqwest::get(&url).await.map_err(|e| e.to_string())?;
291 let status = response.status();
292 if !status.is_success() {
293 return Err(format!("HTTP {}", status.as_u16()));
294 }
295 let bytes = response.bytes().await.map_err(|e| e.to_string())?;
296 image::load_from_memory(&bytes).map_err(|e| e.to_string())
297 }
298 .await;
299 let _ = tx.send(CoverLoadDone { rom_id, result });
300 }));
301 }
302
303 fn poll_cover_load_results(&mut self) {
304 loop {
305 match self.cover_load_rx.try_recv() {
306 Ok(done) => {
307 if let AppScreen::GameDetail(detail) = &mut self.screen {
308 if detail.rom.id != done.rom_id {
309 continue;
310 }
311 match done.result {
312 Ok(image) => detail.apply_cover_image(image),
313 Err(err) => detail.apply_cover_error(format!(
314 "Cover failed: {}",
315 crate::tui::utils::truncate(&err, 120)
316 )),
317 }
318 }
319 }
320 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
321 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
322 }
323 }
324 }
325
326 pub(in crate::tui::app) fn maybe_start_game_detail_cover_load(&mut self) {
327 let (rom_id, url) = match &mut self.screen {
328 AppScreen::GameDetail(detail) => {
329 if !detail.should_request_cover_load() {
330 return;
331 }
332 detail.set_cover_loading();
333 let Some(url) = detail.cover_last_url.clone() else {
334 return;
335 };
336 (detail.rom.id, url)
337 }
338 _ => return,
339 };
340 self.spawn_cover_load_worker(rom_id, url);
341 }
342
343 pub(in crate::tui::app) fn spawn_save_list_worker(&mut self, rom_id: u64) {
344 if let AppScreen::GameDetail(detail) = &mut self.screen {
345 detail.set_saves_loading();
346 }
347 let client = self.client.clone();
348 let tx = self.save_list_tx.clone();
349 tokio::spawn(async move {
350 let result = async {
351 let value = client
352 .request_json(
353 "GET",
354 "/api/saves",
355 &[("rom_id".to_string(), rom_id.to_string())],
356 None,
357 )
358 .await?;
359 SaveMetadata::from_api_value(value)
360 }
361 .await
362 .map_err(|e| format!("{e:#}"));
363 let _ = tx.send(SaveListDone { rom_id, result });
364 });
365 }
366
367 pub(in crate::tui::app) fn refresh_current_game_saves(&mut self) {
368 if let AppScreen::GameDetail(detail) = &self.screen {
369 self.spawn_save_list_worker(detail.rom.id);
370 }
371 }
372
373 fn poll_save_results(&mut self) {
374 while let Ok(done) = self.save_list_rx.try_recv() {
375 if let AppScreen::GameDetail(detail) = &mut self.screen {
376 if detail.rom.id == done.rom_id {
377 match done.result {
378 Ok(rows) => detail.apply_saves(rows),
379 Err(e) => detail.apply_saves_error(e),
380 }
381 }
382 }
383 }
384 while let Ok(done) = self.save_upload_rx.try_recv() {
385 if let AppScreen::GameDetail(detail) = &mut self.screen {
386 if detail.rom.id == done.rom_id {
387 match done.result {
388 Ok(()) => {
389 detail.message = Some("Save uploaded. Refreshing saves...".into());
390 detail.message_clear_at = Some(Instant::now() + Duration::from_secs(3));
391 self.spawn_save_list_worker(done.rom_id);
392 }
393 Err(e) => {
394 detail.message = Some(format!("Save upload failed: {e}"));
395 detail.message_clear_at = Some(Instant::now() + Duration::from_secs(5));
396 }
397 }
398 }
399 }
400 }
401 while let Ok(done) = self.save_download_rx.try_recv() {
402 if let AppScreen::GameDetail(detail) = &mut self.screen {
403 if detail.rom.id == done.rom_id {
404 match done.result {
405 Ok(path) => {
406 detail.message = Some(format!("Save downloaded: {}", path.display()));
407 detail.message_clear_at = Some(Instant::now() + Duration::from_secs(5));
408 self.spawn_save_list_worker(done.rom_id);
409 }
410 Err(e) => {
411 detail.message = Some(format!("Save download failed: {e}"));
412 detail.message_clear_at = Some(Instant::now() + Duration::from_secs(5));
413 }
414 }
415 }
416 }
417 }
418 }
419
420 fn poll_settings_results(&mut self) {
421 while let Ok(done) = self.device_list_rx.try_recv() {
422 if let AppScreen::Settings(settings) = &mut self.screen {
423 match done.result {
424 Ok(devices) => {
425 settings.set_devices(devices);
426 settings.message = None;
427 }
428 Err(e) => {
429 settings.set_device_error(e.clone());
430 settings.message =
431 Some((format!("Device load failed: {e}"), MessageTone::Error));
432 }
433 }
434 }
435 }
436 while let Ok(done) = self.platform_list_rx.try_recv() {
437 if let AppScreen::Settings(settings) = &mut self.screen {
438 match done.result {
439 Ok(platforms) => {
440 settings.set_console_platforms(platforms);
441 settings.message = None;
442 }
443 Err(e) => {
444 settings.set_console_platform_error(e.clone());
445 settings.message =
446 Some((format!("Platform load failed: {e}"), MessageTone::Error));
447 }
448 }
449 }
450 }
451 while let Ok(done) = self.sync_push_pull_rx.try_recv() {
452 if let AppScreen::Settings(settings) = &mut self.screen {
453 settings.sync_inflight = false;
454 match done.result {
455 Ok(session) => {
456 settings.message = Some((
457 format!("Sync session #{}: {}", session.id, session.status),
458 MessageTone::Success,
459 ));
460 }
461 Err(e) => {
462 settings.message = Some((format!("Sync failed: {e}"), MessageTone::Error));
463 }
464 }
465 }
466 }
467 }
468
469 fn poll_rom_load_results(&mut self) {
470 loop {
471 match self.rom_load_rx.try_recv() {
472 Ok(done) => {
473 if !crate::tui::app::rom_load::primary_rom_load_result_is_current(
474 done.gen,
475 self.rom_load_gen,
476 ) {
477 continue;
478 }
479 let AppScreen::LibraryBrowse(ref mut lib) = self.screen else {
480 continue;
481 };
482 if !crate::tui::app::rom_load::primary_rom_load_result_matches_selection(
483 lib, &done.key,
484 ) {
485 if matches!(done.event, RomLoadEvent::Complete | RomLoadEvent::Failed(_)) {
486 lib.set_rom_loading(false);
487 }
488 tracing::debug!(
489 "rom-list-render skipped stale completion context={}",
490 done.context
491 );
492 continue;
493 }
494 match done.event {
495 RomLoadEvent::Batch(roms) => {
496 if let Some(ref k) = done.key {
497 self.rom_cache
498 .insert(k.clone(), roms.clone(), done.expected);
499 }
500 lib.set_roms(roms);
501 tracing::debug!(
502 "rom-list-render batch context={} latency_ms={}",
503 done.context,
504 done.started.elapsed().as_millis()
505 );
506 }
507 RomLoadEvent::Failed(e) => {
508 lib.set_metadata_footer(Some(format!("Could not load games: {e}")));
509 lib.set_rom_loading(false);
510 }
511 RomLoadEvent::Complete => {
512 lib.set_rom_loading(false);
513 }
514 }
515 }
516 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
517 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
518 }
519 }
520 }
521
522 fn poll_library_metadata_refresh(&mut self) {
523 let mut batch = Vec::new();
524 let mut disconnected = false;
525 if let Some(rx) = &mut self.library_metadata_rx {
526 loop {
527 match rx.try_recv() {
528 Ok(msg) => batch.push(msg),
529 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
530 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
531 disconnected = true;
532 break;
533 }
534 }
535 }
536 }
537 if disconnected {
538 self.library_metadata_rx = None;
539 }
540 for msg in batch {
541 self.apply_library_metadata_refresh(msg);
542 }
543 }
544
545 pub(in crate::tui::app) fn apply_library_metadata_refresh(
546 &mut self,
547 msg: LibraryMetadataRefreshDone,
548 ) {
549 if msg.gen != self.library_metadata_refresh_gen {
550 return;
551 }
552
553 let (selection_reload, post_scan_reload) = {
554 let AppScreen::LibraryBrowse(ref mut lib) = self.screen else {
555 return;
556 };
557
558 let had_cached_lists = !lib.platforms.is_empty() || !lib.collections.is_empty();
559 let live_empty = msg.collections.is_empty();
560 if live_empty && had_cached_lists && !msg.warnings.is_empty() {
561 lib.set_temporary_metadata_footer(
562 "Could not refresh library metadata (keeping cached list).".into(),
563 std::time::Duration::from_secs(3),
564 );
565 self.force_rom_reload_after_metadata = false;
566 return;
567 }
568
569 let old_digest = startup_library_snapshot::build_collection_digest_from_collections(
570 &lib.collections,
571 );
572 let digest_changed = old_digest != msg.collection_digest;
573 let update_platforms = !msg.platforms.is_empty();
574 let selection_changed = lib.replace_metadata_preserving_selection(
575 msg.platforms,
576 msg.collections,
577 update_platforms,
578 true,
579 );
580 startup_library_snapshot::save_snapshot(&lib.platforms, &lib.collections);
581
582 let footer = if msg.warnings.is_empty() {
583 if digest_changed {
584 Some("Collection metadata updated.".into())
585 } else {
586 None
587 }
588 } else {
589 let w = msg.warnings.join(" | ");
590 let short: String = if w.chars().count() > 160 {
591 let prefix: String = w.chars().take(157).collect();
592 format!("{prefix}…")
593 } else {
594 w
595 };
596 Some(format!("Partial refresh: {}", short))
597 };
598 lib.set_metadata_footer(footer);
599
600 let force_reload = std::mem::take(&mut self.force_rom_reload_after_metadata);
601 let selection_reload = if selection_changed && lib.list_len() > 0 {
602 lib.clear_roms();
603 let key = lib.cache_key();
604 let expected = lib.expected_rom_count();
605 let req = Self::selected_rom_request_for_library(lib);
606 lib.set_rom_loading(expected > 0);
607 Some((key, req, expected, "refresh_selection"))
608 } else {
609 None
610 };
611 let post_scan_reload = if force_reload && lib.list_len() > 0 && !selection_changed {
612 lib.clear_roms();
613 let key = lib.cache_key();
614 let expected = lib.expected_rom_count();
615 let req = Self::selected_rom_request_for_library(lib);
616 lib.set_rom_loading(expected > 0);
617 Some((key, req, expected, "post_scan_reload"))
618 } else {
619 None
620 };
621 (selection_reload, post_scan_reload)
622 };
623
624 if let Some((key, req, expected, context)) = selection_reload {
625 self.queue_primary_rom_load(key, req, expected, context);
626 } else if let Some((key, req, expected, context)) = post_scan_reload {
627 self.queue_primary_rom_load(key, req, expected, context);
628 }
629
630 self.queue_collection_prefetches_from_screen(1, "refresh_warmup");
631 }
632
633 fn poll_collection_prefetch_results(&mut self) {
634 loop {
635 match self.collection_prefetch_rx.try_recv() {
636 Ok(done) => {
637 self.collection_prefetch_inflight_keys.remove(&done.key);
638 if let Some(roms) = done.roms {
639 self.rom_cache.insert(done.key, roms, done.expected);
640 } else if let Some(warning) = done.warning {
641 tracing::debug!("{warning}");
642 }
643 }
644 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
645 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
646 }
647 }
648 }
649}