use crate::cli::args::EnumerateArgs;
use crate::mbt::TilesFilter;
use crate::{TileStringFormatter, UtilesResult};
use std::io;
use std::io::{BufWriter, Write};
use tokio::task::JoinHandle;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
use tracing::debug;
async fn enumerate_db(
fspath: &str,
tformatter: TileStringFormatter,
tfilter: Option<&TilesFilter>,
tx: tokio::sync::mpsc::Sender<String>,
) -> UtilesResult<()> {
let mbt = crate::mbt::MbtilesClientAsync::open_existing(fspath).await?;
let query: String = match tfilter {
Some(tfilter) => {
let where_clause = tfilter.where_clause(Some("tiles."))?;
format!(
"SELECT zoom_level, tile_column, tile_row FROM tiles {where_clause}"
)
}
None => "SELECT zoom_level, tile_column, tile_row FROM tiles".to_string(),
};
let s = mbt.enumerate_rx(Some(&query))?;
let mut tiles = ReceiverStream::new(s);
while let Some(tile) = tiles.next().await {
let tile_str = tformatter.fmt_tile(&tile);
if let Err(e) = tx.send(tile_str).await {
debug!("recv dropped: {:?}", e);
break;
}
}
Ok(())
}
pub(crate) async fn enumerate_main(args: &EnumerateArgs) -> UtilesResult<()> {
debug!("args: {:?}", args);
for fspath in &args.fspaths {
if !std::path::Path::new(fspath).exists() {
return Err(crate::UtilesError::Error(format!(
"file not found: {fspath:?}"
)));
}
}
debug!("fspaths: {:?}", args.fspaths);
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(100);
let write_task: JoinHandle<Result<(), io::Error>> =
tokio::task::spawn_blocking(move || {
let stdout = io::stdout();
let lock = stdout.lock();
let mut buf = BufWriter::with_capacity(32 * 1024, lock);
let mut count: usize = 0;
while let Some(tile_str) = rx.blocking_recv() {
let tile_str_newline = format!("{tile_str}\n");
if let Err(e) = buf.write_all(tile_str_newline.as_bytes()) {
debug!("write_all err: {:?}", e);
break;
}
count += 1;
if count.is_multiple_of(1024)
&& let Err(e) = buf.flush()
{
debug!("flushing err: {:?}", e);
break;
}
}
if let Err(e) = buf.flush() {
debug!("final flush err: {:?}", e);
}
Ok(())
});
let tfilter = args.filter_args.tiles_filter_maybe();
let fspaths = args.fspaths.clone();
let tippecanoe = args.tippecanoe;
let enum_task: JoinHandle<UtilesResult<()>> = tokio::task::spawn(async move {
let tf = tfilter.clone();
let nfiles = fspaths.len();
for fspath in fspaths {
let formatter = if tippecanoe {
let xyz_fmt_str = "{x} {y} {z}";
let fmt_str = format!("{fspath} {xyz_fmt_str}");
TileStringFormatter::new(&fmt_str)
} else if nfiles == 1 {
TileStringFormatter::default()
} else {
let xyz_fmt_str = "{json_arr}";
let fmt_str = format!("{fspath} {xyz_fmt_str}");
TileStringFormatter::new(&fmt_str)
};
enumerate_db(&fspath, formatter, tf.as_ref(), tx.clone()).await?;
}
Ok(())
});
let (enum_res, write_res) = tokio::try_join!(enum_task, write_task)?;
enum_res?;
write_res?;
Ok(())
}