use crate::BoxResult as Result;
pub(crate) type ScheduleEntry = (usize, u64, usize);
fn resolve_thread_count(threads: Option<usize>) -> usize {
match threads {
Some(n) if n > 0 => n,
_ => std::thread::available_parallelism()
.map(|n| n.get().saturating_sub(2).max(1))
.unwrap_or(4),
}
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub(crate) fn build_classify_schedule(
input: &std::path::Path,
kind_filter: Option<crate::blob_meta::ElemKind>,
) -> Result<(Vec<ScheduleEntry>, std::sync::Arc<std::fs::File>)> {
crate::debug::emit_marker("SCHEDULE_SCANNER_OPEN_START");
let mut walker = crate::read::header_walker::HeaderWalker::open(input)?;
let _ = walker.next_header()?
.ok_or_else(|| crate::error::new_error(crate::error::ErrorKind::MissingHeader))?;
crate::debug::emit_marker("SCHEDULE_SCANNER_OPEN_END");
crate::debug::emit_marker("SCHEDULE_SCAN_LOOP_START");
let file_size = walker.file_size();
let mut schedule: Vec<ScheduleEntry> = Vec::new();
let mut seq: usize = 0;
while let Some(meta) = walker.next_header()? {
if !matches!(meta.blob_type, crate::blob::BlobKind::OsmData) { continue; }
if let Some(filter_kind) = kind_filter {
if let Some(idx) = &meta.index {
if idx.kind != filter_kind { continue; }
}
}
if meta.data_offset + meta.data_size as u64 > file_size {
return Err(format!(
"blob at offset {} claims data_size {} but file is only {} bytes",
meta.data_offset, meta.data_size, file_size,
)
.into());
}
schedule.push((seq, meta.data_offset, meta.data_size));
seq += 1;
}
crate::debug::emit_marker("SCHEDULE_SCAN_LOOP_END");
crate::debug::emit_marker("SCHEDULE_SCANNER_DROP_START");
let shared_file = std::sync::Arc::clone(walker.shared_file());
drop(walker);
crate::debug::emit_marker("SCHEDULE_SCANNER_DROP_END");
#[allow(clippy::cast_possible_wrap)]
crate::debug::emit_counter("schedule_blobs", schedule.len() as i64);
Ok((schedule, shared_file))
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
#[allow(clippy::type_complexity)]
pub(crate) fn build_classify_schedules_split(
input: &std::path::Path,
) -> Result<(
Vec<ScheduleEntry>,
Vec<ScheduleEntry>,
Vec<ScheduleEntry>,
std::sync::Arc<std::fs::File>,
)> {
crate::debug::emit_marker("SCHEDULE_SCANNER_OPEN_START");
let mut walker = crate::read::header_walker::HeaderWalker::open(input)?;
let _ = walker.next_header()?
.ok_or_else(|| crate::error::new_error(crate::error::ErrorKind::MissingHeader))?;
crate::debug::emit_marker("SCHEDULE_SCANNER_OPEN_END");
crate::debug::emit_marker("SCHEDULE_SCAN_LOOP_START");
let file_size = walker.file_size();
let mut nodes: Vec<ScheduleEntry> = Vec::new();
let mut ways: Vec<ScheduleEntry> = Vec::new();
let mut rels: Vec<ScheduleEntry> = Vec::new();
while let Some(meta) = walker.next_header()? {
if !matches!(meta.blob_type, crate::blob::BlobKind::OsmData) { continue; }
if meta.data_offset + meta.data_size as u64 > file_size {
return Err(format!(
"blob at offset {} claims data_size {} but file is only {} bytes",
meta.data_offset, meta.data_size, file_size,
)
.into());
}
match meta.index.as_ref().map(|i| i.kind) {
Some(crate::blob_meta::ElemKind::Node) => {
nodes.push((nodes.len(), meta.data_offset, meta.data_size));
}
Some(crate::blob_meta::ElemKind::Way) => {
ways.push((ways.len(), meta.data_offset, meta.data_size));
}
Some(crate::blob_meta::ElemKind::Relation) => {
rels.push((rels.len(), meta.data_offset, meta.data_size));
}
None => {
nodes.push((nodes.len(), meta.data_offset, meta.data_size));
ways.push((ways.len(), meta.data_offset, meta.data_size));
rels.push((rels.len(), meta.data_offset, meta.data_size));
}
}
}
crate::debug::emit_marker("SCHEDULE_SCAN_LOOP_END");
crate::debug::emit_marker("SCHEDULE_SCANNER_DROP_START");
let shared_file = std::sync::Arc::clone(walker.shared_file());
drop(walker);
crate::debug::emit_marker("SCHEDULE_SCANNER_DROP_END");
#[allow(clippy::cast_possible_wrap)]
{
crate::debug::emit_counter("schedule_node_blobs", nodes.len() as i64);
crate::debug::emit_counter("schedule_way_blobs", ways.len() as i64);
crate::debug::emit_counter("schedule_relation_blobs", rels.len() as i64);
}
Ok((nodes, ways, rels, shared_file))
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub(crate) fn parallel_classify_phase<S: Send, R: Send>(
shared_file: &std::sync::Arc<std::fs::File>,
schedule: &[ScheduleEntry],
threads: Option<usize>,
worker_init: impl Fn() -> S + Send + Sync,
classify: impl Fn(&crate::PrimitiveBlock, &mut S) -> R + Send + Sync,
mut merge: impl FnMut(usize, R),
) -> Result<()> {
use std::os::unix::fs::FileExt as _;
if schedule.is_empty() { return Ok(()); }
let decode_threads = resolve_thread_count(threads);
let (desc_tx, desc_rx) = std::sync::mpsc::sync_channel::<ScheduleEntry>(16);
let desc_rx = std::sync::Arc::new(std::sync::Mutex::new(desc_rx));
let (result_tx, result_rx) = std::sync::mpsc::sync_channel::<(usize, crate::error::Result<R>)>(32);
std::thread::scope(|scope| -> Result<()> {
scope.spawn(move || {
for &item in schedule {
if desc_tx.send(item).is_err() { break; }
}
});
for _ in 0..decode_threads {
let rx = std::sync::Arc::clone(&desc_rx);
let tx = result_tx.clone();
let file = std::sync::Arc::clone(shared_file);
let classify_ref = &classify;
let worker_init_ref = &worker_init;
scope.spawn(move || {
let mut read_buf: Vec<u8> = Vec::new();
let worker_pool = crate::blob::DecompressPool::new();
let mut st_scratch: Vec<(u32, u32)> = Vec::new();
let mut gr_scratch: Vec<(u32, u32)> = Vec::new();
let mut state = worker_init_ref();
loop {
let (s, data_offset, data_size) = {
let guard = rx.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
match guard.recv() {
Ok(d) => d,
Err(_) => break,
}
};
let r: crate::error::Result<R> = (|| {
read_buf.resize(data_size, 0);
file.read_exact_at(&mut read_buf, data_offset)
.map_err(|e| crate::error::new_error(crate::error::ErrorKind::Io(e)))?;
let mut buf = crate::blob::pool_get_pub(&worker_pool, data_size * 4);
crate::blob::decompress_blob_raw(&read_buf, &mut buf)?;
let block = crate::block::PrimitiveBlock::from_vec_pooled_with_scratch(
buf, &worker_pool, &mut st_scratch, &mut gr_scratch,
)?;
Ok(classify_ref(&block, &mut state))
})();
if tx.send((s, r)).is_err() { break; }
}
});
}
drop(desc_rx);
drop(result_tx);
for (seq, result) in result_rx {
merge(seq, result?);
}
Ok(())
})?;
Ok(())
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub(crate) fn parallel_classify_accumulate<S: Send>(
shared_file: &std::sync::Arc<std::fs::File>,
schedule: &[ScheduleEntry],
threads: Option<usize>,
worker_init: impl Fn() -> S + Send + Sync,
classify: impl Fn(&crate::PrimitiveBlock, &mut S) + Send + Sync,
mut merge: impl FnMut(S),
) -> Result<()> {
use std::os::unix::fs::FileExt as _;
if schedule.is_empty() { return Ok(()); }
let decode_threads = resolve_thread_count(threads);
let (desc_tx, desc_rx) = std::sync::mpsc::sync_channel::<ScheduleEntry>(16);
let desc_rx = std::sync::Arc::new(std::sync::Mutex::new(desc_rx));
let (result_tx, result_rx) = std::sync::mpsc::sync_channel::<crate::error::Result<S>>(decode_threads);
std::thread::scope(|scope| -> Result<()> {
scope.spawn(move || {
for &item in schedule {
if desc_tx.send(item).is_err() { break; }
}
});
for _ in 0..decode_threads {
let rx = std::sync::Arc::clone(&desc_rx);
let tx = result_tx.clone();
let file = std::sync::Arc::clone(shared_file);
let classify_ref = &classify;
let worker_init_ref = &worker_init;
scope.spawn(move || {
let mut read_buf: Vec<u8> = Vec::new();
let worker_pool = crate::blob::DecompressPool::new();
let mut st_scratch: Vec<(u32, u32)> = Vec::new();
let mut gr_scratch: Vec<(u32, u32)> = Vec::new();
let mut state = worker_init_ref();
let result: crate::error::Result<()> = (|| {
loop {
let (_s, data_offset, data_size) = {
let guard = rx.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
match guard.recv() {
Ok(d) => d,
Err(_) => return Ok(()),
}
};
read_buf.resize(data_size, 0);
file.read_exact_at(&mut read_buf, data_offset)
.map_err(|e| crate::error::new_error(crate::error::ErrorKind::Io(e)))?;
let mut buf = crate::blob::pool_get_pub(&worker_pool, data_size * 4);
crate::blob::decompress_blob_raw(&read_buf, &mut buf)?;
let block = crate::block::PrimitiveBlock::from_vec_pooled_with_scratch(
buf, &worker_pool, &mut st_scratch, &mut gr_scratch,
)?;
classify_ref(&block, &mut state);
}
})();
match result {
Ok(()) => { tx.send(Ok(state)).ok(); }
Err(e) => { tx.send(Err(e)).ok(); }
}
});
}
drop(desc_rx);
drop(result_tx);
for result in result_rx {
merge(result?);
}
Ok(())
})?;
Ok(())
}