utiles 0.8.0

Web map tile utils (aka utiles)
Documentation
use std::time::Duration;

use futures::StreamExt;
use indicatif::ProgressStyle;
use tokio::join;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{info, warn};

use crate::UtilesResult;
use crate::cli::args::WebpifyArgs;
use crate::img::webpify_image;
use crate::mbt::{MbtStreamWriterSync, MbtWriterStats};
use crate::mbt::{Mbtiles, MbtilesAsync, MbtilesClientAsync};
use crate::sqlite::InsertStrategy;

pub(crate) async fn webpify_main(args: WebpifyArgs) -> UtilesResult<()> {
    let mbt = MbtilesClientAsync::open_existing(args.common.filepath.as_str()).await?;
    mbt.assert_mbtiles().await?;
    let total_count = mbt.tiles_count().await?;

    let mbt_metadata = mbt.metadata_rows().await?;
    let dst_mbtiles = Mbtiles::open_new(args.dst, None)?;
    dst_mbtiles.metadata_set_many(&mbt_metadata)?;
    dst_mbtiles.metadata_set("format", "webp")?;
    let tiles_stream = mbt.tiles_stream(None)?;
    let (tx_progress, mut rx_progress) = tokio::sync::mpsc::channel(100);
    let (tx_writer, rx_writer) = tokio::sync::mpsc::channel(100);
    let start_time = std::time::Instant::now();
    let mut writer = MbtStreamWriterSync {
        stream: ReceiverStream::new(rx_writer),
        mbt: dst_mbtiles,
        on_conflict: InsertStrategy::None,
        stats: MbtWriterStats::default(),
    };
    let jobs: usize = args.jobs.unwrap_or(4) as usize;
    info!("webpify ~ total_count: {total_count} ~ jobs: {jobs}");
    let proc_future = tokio::spawn(async move {
        // TODO: cli flag for concurrency
        tiles_stream
            .for_each_concurrent(jobs, |(tile, tile_data)| {
                let tx_writer = tx_writer.clone();
                let tx_progress = tx_progress.clone();
                let initial_size = tile_data.len() as i64;

                async move {
                    let blocking_res =
                        tokio::task::spawn_blocking(move || webpify_image(&tile_data))
                            .await;
                    match blocking_res {
                        Err(je) => {
                            warn!("join-error: {:?}", je);
                        }
                        Ok(webpify_result) => match webpify_result {
                            Ok(webp_bytes) => {
                                let size_diff =
                                    initial_size - (webp_bytes.len() as i64);
                                let send_res = tx_writer
                                    .send((tile, webp_bytes, None).into())
                                    .await;
                                if let Err(e) = send_res {
                                    warn!("send_res: {:?}", e);
                                }
                                let send_res = tx_progress.send(size_diff).await;
                                if let Err(e) = send_res {
                                    warn!("progress send_res: {:?}", e);
                                }
                            }
                            Err(e) => {
                                warn!("webpify_image: {:?}", e);
                            }
                        },
                    }
                }
            })
            .await;
    });

    let progress_future = tokio::spawn(async move {
        let mut total_size_diff = 0;
        let mut processed = 0;
        let pb = indicatif::ProgressBar::new(total_count as u64);
        let pb_style = ProgressStyle::with_template(
            "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
        );
        if args.quiet {
            pb.set_draw_target(indicatif::ProgressDrawTarget::hidden());
        }
        pb.set_message("webpify");
        match pb_style {
            Err(e) => {
                warn!("pb_style error: {:?}", e);
            }
            Ok(s) => {
                pb.set_style(s);
            }
        }
        pb.enable_steady_tick(Duration::from_millis(100));
        while let Some(size_diff) = rx_progress.recv().await {
            total_size_diff += size_diff;
            processed += 1;
            pb.inc(1);

            let size_saved_msg = if total_size_diff > 0 {
                format!(
                    "-{}",
                    size::Size::from_bytes(total_size_diff.unsigned_abs())
                )
            } else {
                format!(
                    "+{}",
                    size::Size::from_bytes(total_size_diff.unsigned_abs())
                )
            };
            pb.set_message(format!("webpify ~ size-diff: {size_saved_msg}"));
        }
        let total_size_str = if total_size_diff > 0 {
            format!(
                "-{}",
                size::Size::from_bytes(total_size_diff.unsigned_abs())
            )
        } else {
            format!(
                "+{}",
                size::Size::from_bytes(total_size_diff.unsigned_abs())
            )
        };

        pb.finish_with_message(format!(
            "Processed {processed} tiles, saved {total_size_str} ({total_size_diff}b)"
        ));
    });
    let (result, writer_result, progress_res) =
        join!(proc_future, writer.write(), progress_future);
    let elapsed = start_time.elapsed();
    info!("elapsed: {:?}", elapsed);
    result?;
    writer_result?;
    progress_res?;

    Ok(())
}