use coord::{TileCoord, View};
use image::DynamicImage;
use image;
use reqwest::Client;
use std::cmp::Ordering;
use std::cmp;
use std::collections::hash_set::HashSet;
use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::mpsc::TryRecvError;
use std::sync::{Arc, mpsc, Mutex};
use std::thread;
use tile::Tile;
use tile_source::TileSource;
#[derive(Debug)]
pub struct TileLoader {
client: Option<Client>,
join_handle: thread::JoinHandle<()>,
request_tx: mpsc::Sender<LoaderMessage>,
result_rx: mpsc::Receiver<(Tile, Option<DynamicImage>)>,
pending: HashSet<Tile>,
use_network: bool,
}
impl TileLoader {
pub fn new<F>(notice_func: F, use_network: bool) -> Self
where F: Fn(Tile) + Sync + Send + 'static,
{
let (request_tx, request_rx) = mpsc::channel();
let (result_tx, result_rx) = mpsc::channel();
TileLoader {
client: None,
join_handle: thread::spawn(move || Self::work(&request_rx, &result_tx, notice_func, use_network)),
request_tx,
result_rx,
pending: HashSet::new(),
use_network,
}
}
fn work<F>(
request_rx: &mpsc::Receiver<LoaderMessage>,
result_tx: &mpsc::Sender<(Tile, Option<DynamicImage>)>,
notice_func: F,
use_network: bool,
)
where F: Fn(Tile) + Sync + Send + 'static,
{
let mut queue: Vec<TileRequest> = vec![];
let remote_queue: Arc<Mutex<Vec<TileRequest>>> = Arc::new(Mutex::new(vec![]));
let mut view_opt: Option<View> = None;
let arc_notice_func = Arc::new(notice_func);
let (remote_request_tx, remote_request_rx) = mpsc::channel();
{
let arc_request_rx = Arc::new(Mutex::new(remote_request_rx));
for id in 0..2 {
let remote_queue = Arc::clone(&remote_queue);
let arc_request_rx = Arc::clone(&arc_request_rx);
let result_tx = result_tx.clone();
let arc_notice_func = Arc::clone(&arc_notice_func);
thread::spawn(move || Self::work_remote(id, &remote_queue, &arc_request_rx, &result_tx, &arc_notice_func));
}
}
'outer: while let Ok(message) = request_rx.recv() {
let mut need_to_sort = true;
match message {
LoaderMessage::SetView(view) => {
view_opt = Some(view);
},
LoaderMessage::GetTile(request) => {
queue.push(request);
}
}
loop {
loop {
let message = request_rx.try_recv();
match message {
Ok(LoaderMessage::SetView(view)) => {
view_opt = Some(view);
need_to_sort = true;
},
Ok(LoaderMessage::GetTile(request)) => {
queue.push(request);
need_to_sort = true;
},
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => break 'outer,
}
}
if need_to_sort {
if let Some(view) = view_opt {
need_to_sort = false;
queue.as_mut_slice().sort_by(|a, b| {
compare_tiles(a.tile, b.tile, view)
});
if let Ok(mut remote_queue) = remote_queue.lock() {
remote_queue.as_mut_slice().sort_by(|a, b| {
compare_tiles(a.tile, b.tile, view)
});
}
}
}
match queue.pop() {
None => break,
Some(request) => {
match image::open(&request.path) {
Ok(img) => {
if result_tx.send((request.tile, Some(img))).is_err() {
break 'outer;
}
arc_notice_func(request.tile);
continue;
},
Err(_) => {
if use_network {
if let Ok(mut remote_queue) = remote_queue.lock() {
remote_queue.push(request);
if let Some(view) = view_opt {
remote_queue.as_mut_slice().sort_by(|a, b| {
compare_tiles(a.tile, b.tile, view)
});
}
if let Err(e) = remote_request_tx.send(RemoteLoaderMessage::PopQueue) {
error!("remote worker terminated, {}", e);
}
}
} else if result_tx.send((request.tile, None)).is_err() {
break 'outer;
}
},
}
},
}
}
}
}
fn work_remote<F>(
thread_id: u32,
queue: &Arc<Mutex<Vec<TileRequest>>>,
request_rx: &Arc<Mutex<mpsc::Receiver<RemoteLoaderMessage>>>,
result_tx: &mpsc::Sender<(Tile, Option<DynamicImage>)>,
notice_func: &Arc<F>,
)
where F: Fn(Tile) + Sync + Send + 'static,
{
let mut client_opt = None;
loop {
let message = request_rx.lock().ok().and_then(|r| r.recv().ok());
match message {
None => break,
Some(RemoteLoaderMessage::PopQueue) => {
let ele: Option<TileRequest> = queue.lock().ok().and_then(|mut q| q.pop());
if let Some(request) = ele {
if client_opt.is_none() {
client_opt = Client::builder().build().ok();
}
info!("thread {}, download {:?}", thread_id, request.url);
if let Some(Ok(mut response)) = client_opt.as_ref().map(|c| c.get(&request.url).send()) {
let mut buf: Vec<u8> = vec![];
if response.copy_to(&mut buf).is_ok() {
if let Ok(img) = image::load_from_memory(&buf) {
if result_tx.send((request.tile, Some(img))).is_err() {
break;
}
notice_func(request.tile);
if request.write_to_file {
if let Err(e) = Self::write_to_file(&request.path, &buf) {
warn!("could not write file {}, {}", request.path.display(), e);
}
}
continue;
}
}
}
info!("thread {}, fail {:?}", thread_id, request.url);
if result_tx.send((request.tile, None)).is_err() {
break;
}
}
},
}
}
}
pub fn async_request(&mut self, tile_coord: TileCoord, source: &TileSource, write_to_file: bool) {
if tile_coord.zoom > source.max_tile_zoom() ||
tile_coord.zoom < source.min_tile_zoom()
{
return;
}
let tile = Tile::new(tile_coord, source.id());
if !self.pending.contains(&tile) {
if let Some(url) = source.remote_tile_url(tile_coord) {
if self.request_tx.send(LoaderMessage::GetTile(
TileRequest {
tile,
url,
path: source.local_tile_path(tile_coord),
write_to_file,
}
)).is_ok()
{
self.pending.insert(tile);
}
}
}
}
pub fn async_result(&mut self) -> Option<(Tile, DynamicImage)> {
match self.result_rx.try_recv() {
Err(_) => None,
Ok((tile, None)) => {
self.pending.remove(&tile);
debug!("async_result none, pending.len: {}, {:?}", self.pending.len(), tile);
None
},
Ok((tile, Some(img))) => {
self.pending.remove(&tile);
debug!("async_result some, pending.len: {}, {:?}", self.pending.len(), tile);
Some((tile, img))
},
}
}
pub fn get_sync(&mut self, tile: TileCoord, source: &TileSource, write_to_file: bool) -> Option<DynamicImage> {
if tile.zoom > source.max_tile_zoom() ||
tile.zoom < source.min_tile_zoom()
{
return None;
}
match image::open(source.local_tile_path(tile)) {
Ok(img) => {
debug!("sync ok from path {:?}", tile);
Some(img)
},
Err(_) => {
if self.use_network {
if self.client.is_none() {
self.client = Client::builder().build().ok();
}
if let (Some(client), Some(url)) = (self.client.as_ref(), source.remote_tile_url(tile)) {
if let Ok(mut response) = client.get(&url).send() {
let mut buf: Vec<u8> = vec![];
if response.copy_to(&mut buf).is_ok() {
if let Ok(img) = image::load_from_memory(&buf) {
if write_to_file {
let path = source.local_tile_path(tile);
if let Err(e) = Self::write_to_file(&path, &buf) {
warn!("could not write file {}, {}", &path.display(), e);
}
}
debug!("sync ok from network {:?}", tile);
return Some(img);
}
}
}
}
debug!("sync fail from network {:?}", tile);
None
} else {
debug!("sync fail from path {:?}", tile);
None
}
},
}
}
pub fn set_view_location(&mut self, view: View) {
let _ = self.request_tx.send(LoaderMessage::SetView(view));
}
fn write_to_file<P: AsRef<Path>>(path: P, img_data: &[u8]) -> ::std::io::Result<()> {
if let Some(dir) = path.as_ref().parent() {
::std::fs::create_dir_all(dir)?;
}
let mut file = File::create(path)?;
file.write_all(img_data)
}
}
#[derive(Debug)]
struct TileRequest {
pub tile: Tile,
pub url: String,
pub path: PathBuf,
pub write_to_file: bool,
}
#[derive(Debug)]
enum LoaderMessage {
GetTile(TileRequest),
SetView(View),
}
#[derive(Debug)]
enum RemoteLoaderMessage {
PopQueue,
}
fn compare_tiles(a: Tile, b: Tile, view: View) -> Ordering {
let source_a = view.source_id == a.source_id;
let source_b = view.source_id == b.source_id;
match (source_a, source_b) {
(true, false) => Ordering::Greater,
(false, true) => Ordering::Less,
_ => {
let zoom_diff_a = cmp::max(a.coord.zoom, view.zoom) - cmp::min(a.coord.zoom, view.zoom);
let zoom_diff_b = cmp::max(b.coord.zoom, view.zoom) - cmp::min(b.coord.zoom, view.zoom);
if zoom_diff_a < zoom_diff_b {
Ordering::Greater
} else if zoom_diff_a > zoom_diff_b {
Ordering::Less
} else {
let map_a = a.coord.map_coord_center();
let map_b = b.coord.map_coord_center();
let diff_xa = (view.center.x - map_a.x).abs();
let diff_xa = if diff_xa > 0.5 { 1.0 - diff_xa } else { diff_xa };
let diff_xb = (view.center.x - map_b.x).abs();
let diff_xb = if diff_xb > 0.5 { 1.0 - diff_xb } else { diff_xb };
let diff_ya = view.center.y - map_a.y;
let diff_yb = view.center.y - map_b.y;
let center_diff_a = (diff_xa * diff_xa) + (diff_ya * diff_ya);
let center_diff_b = (diff_xb * diff_xb) + (diff_yb * diff_yb);
center_diff_b.partial_cmp(¢er_diff_a).unwrap_or(Ordering::Equal)
}
},
}
}