use std::sync::{Arc, Mutex};
use egui::Context;
use futures::channel::mpsc::{Receiver, Sender, TryRecvError, TrySendError, channel};
use lru::LruCache;
use crate::{
Tile, TileId,
io::{
Fetch,
fetch::{TileFactory, fetch_continuously},
runtime::Runtime,
},
};
pub struct TilesIo {
request_tx: Sender<TileId>,
tile_rx: Receiver<(TileId, Tile)>,
pub cache: LruCache<TileId, Option<Tile>>,
pub stats: Arc<Mutex<Stats>>,
#[allow(dead_code)] runtime: Runtime,
}
impl TilesIo {
pub fn new(
fetch: impl Fetch + Send + Sync + 'static,
tile_factory: impl TileFactory + Send + Sync + 'static,
egui_ctx: Context,
) -> Self {
let stats = Arc::new(Mutex::new(Stats { in_progress: 0 }));
let channel_size = fetch.max_concurrency();
let (request_tx, request_rx) = channel(channel_size);
let (tile_tx, tile_rx) = channel(channel_size);
let runtime = Runtime::new(fetch_continuously(
fetch,
stats.clone(),
request_rx,
tile_tx,
egui_ctx,
tile_factory,
));
#[allow(clippy::unwrap_used)]
let cache_size = std::num::NonZeroUsize::new(256).unwrap();
Self {
cache: LruCache::new(cache_size),
stats,
request_tx,
tile_rx,
runtime,
}
}
pub fn put_single_fetched_tile_in_cache(&mut self) {
match self.tile_rx.try_recv() {
Ok((tile_id, tile)) => {
self.cache.put(tile_id, Some(tile));
}
Err(TryRecvError::Empty) => {
}
Err(TryRecvError::Closed) => {
log::error!("IO thread is dead");
}
}
}
pub fn make_sure_is_fetched(&mut self, tile_id: TileId) {
match self.cache.try_get_or_insert(
tile_id,
|| -> Result<Option<Tile>, TrySendError<TileId>> {
self.request_tx.try_send(tile_id)?;
log::trace!("Requested tile: {tile_id:?}");
Ok(None)
},
) {
Ok(_) => {}
Err(err) if err.is_full() => {
log::trace!("Request queue is full.");
}
Err(err) => {
panic!("Failed to send tile request for {tile_id:?}: {err}");
}
}
}
pub fn stats(&self) -> Stats {
if let Ok(stats) = self.stats.lock() {
stats.clone()
} else {
Stats::default()
}
}
}
#[derive(Clone, Default)]
pub struct Stats {
pub in_progress: usize,
}