use std::path::Path;
use std::sync::mpsc;
use crate::blob::BlobKind;
use crate::blob_meta::ElemKind;
use crate::error::Result;
use crate::read::header_walker::HeaderWalker;
use super::descriptor::{BlobDescriptor, DrainItem, ScannedBlob};
use super::diff_ranges::DiffRanges;
pub(super) struct ScannerChannels {
pub candidate_tx: mpsc::SyncSender<ScannedBlob>,
pub drain_tx: Option<mpsc::SyncSender<DrainItem>>,
}
pub(super) struct ScannerConfig<'a> {
pub base_pbf: &'a Path,
pub ranges: &'a DiffRanges,
pub use_copy_range: bool,
pub locations_on_ways: bool,
pub channels: ScannerChannels,
pub barrier_rx: Option<mpsc::Receiver<()>>,
pub last_node_seq_tx: Option<mpsc::SyncSender<u64>>,
}
#[allow(clippy::too_many_lines)]
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub(super) fn run_scanner(cfg: ScannerConfig<'_>) -> Result<u64> {
let ScannerConfig {
base_pbf,
ranges,
use_copy_range,
locations_on_ways,
channels,
barrier_rx,
last_node_seq_tx,
} = cfg;
crate::debug::emit_marker("MERGE_SCANNER_START");
let mut walker = HeaderWalker::open(base_pbf)?;
let mut seq: u64 = 0;
let mut pending_post_barrier: Vec<ScannerEmit> = Vec::new();
let mut barrier_open = !locations_on_ways; let mut bytes_high_water_fastpath: usize = 0;
let mut bytes_high_water_slowpath: usize = 0;
let mut last_node_seq: Option<u64> = None;
let mut last_node_seq_sent = false;
while let Some(meta) = walker.next_header()? {
if meta.blob_type != BlobKind::OsmData {
continue;
}
let (kind, id_range, has_indexdata) = match meta.index.as_ref() {
Some(idx) => (idx.kind, Some((idx.min_id, idx.max_id)), true),
None => (ElemKind::Node, None, false), };
let blob_offset = usize::try_from(meta.data_offset - meta.frame_start).map_err(|_| {
crate::error::new_error(crate::error::ErrorKind::Io(std::io::Error::other(
"blob header too large to fit in usize - malformed PBF?",
)))
})?;
let descriptor = BlobDescriptor {
seq,
frame_start: meta.frame_start,
frame_len: meta.frame_size,
blob_offset,
data_size: meta.data_size,
kind,
id_range,
index: meta.index,
tagdata: meta.tagdata,
};
let low_node_must_decompress = locations_on_ways && matches!(kind, ElemKind::Node);
let is_fastpath = has_indexdata
&& use_copy_range
&& !low_node_must_decompress
&& !ranges.range_overlaps(
kind,
id_range.map_or(0, |r| r.0),
id_range.map_or(0, |r| r.1),
);
let item = if is_fastpath {
ScannerEmit::Drain(
descriptor
.into_drain_copy_range()
.ok_or_else(|| {
crate::error::new_error(crate::error::ErrorKind::Io(
std::io::Error::other(
"scanner: fast-path descriptor missing indexdata - \
should be unreachable",
),
))
})?,
)
} else if !low_node_must_decompress
&& has_indexdata
&& !ranges.range_overlaps(
kind,
id_range.map_or(0, |r| r.0),
id_range.map_or(0, |r| r.1),
)
{
ScannerEmit::Candidate(ScannedBlob::Passthrough(descriptor))
} else {
ScannerEmit::Candidate(ScannedBlob::Candidate(descriptor))
};
let approx_cost = scanner_emit_cost(&item);
if !barrier_open && matches!(kind, ElemKind::Way | ElemKind::Relation) {
if !last_node_seq_sent
&& let Some(tx) = last_node_seq_tx.as_ref()
{
let payload = last_node_seq.unwrap_or(u64::MAX);
tx.send(payload).map_err(send_err)?;
last_node_seq_sent = true;
}
pending_post_barrier.push(item);
seq += 1;
continue;
}
if matches!(kind, ElemKind::Node) {
last_node_seq = Some(seq);
}
dispatch_item(
&channels,
item,
&mut bytes_high_water_fastpath,
&mut bytes_high_water_slowpath,
approx_cost,
)?;
seq += 1;
if !barrier_open
&& let Some(ref rx) = barrier_rx
&& let Ok(()) = rx.try_recv()
{
barrier_open = true;
}
}
if locations_on_ways
&& !last_node_seq_sent
&& let Some(tx) = last_node_seq_tx.as_ref()
{
let payload = last_node_seq.unwrap_or(u64::MAX);
tx.send(payload).map_err(send_err)?;
let _ = last_node_seq_sent;
}
if !barrier_open
&& let Some(ref rx) = barrier_rx
{
crate::debug::emit_marker("MERGE_SCANNER_BARRIER_WAIT_START");
rx.recv().map_err(|_| {
crate::error::new_error(crate::error::ErrorKind::Io(std::io::Error::other(
"drain closed barrier channel before signalling loc_map ready",
)))
})?;
crate::debug::emit_marker("MERGE_SCANNER_BARRIER_WAIT_END");
}
for item in pending_post_barrier.drain(..) {
let approx_cost = scanner_emit_cost(&item);
dispatch_item(
&channels,
item,
&mut bytes_high_water_fastpath,
&mut bytes_high_water_slowpath,
approx_cost,
)?;
}
crate::debug::emit_marker("MERGE_SCANNER_END");
crate::debug::emit_counter(
"merge_scanner_blobs_emitted",
i64::try_from(seq).unwrap_or(i64::MAX),
);
crate::debug::emit_counter(
"merge_scanner_to_drain_bytes_high_water",
i64::try_from(bytes_high_water_fastpath).unwrap_or(i64::MAX),
);
crate::debug::emit_counter(
"merge_scanner_to_workers_bytes_high_water",
i64::try_from(bytes_high_water_slowpath).unwrap_or(i64::MAX),
);
Ok(seq)
}
enum ScannerEmit {
Drain(DrainItem),
Candidate(ScannedBlob),
}
fn dispatch_item(
channels: &ScannerChannels,
item: ScannerEmit,
bytes_hw_fast: &mut usize,
bytes_hw_slow: &mut usize,
approx_cost: usize,
) -> Result<()> {
match item {
ScannerEmit::Drain(drain_item) => {
let tx = channels.drain_tx.as_ref().ok_or_else(|| {
crate::error::new_error(crate::error::ErrorKind::Io(std::io::Error::other(
"scanner: fast-path emitted but drain_tx is None - misconfigured channels",
)))
})?;
*bytes_hw_fast = (*bytes_hw_fast).max(approx_cost);
tx.send(drain_item).map_err(send_err)?;
}
ScannerEmit::Candidate(scanned) => {
*bytes_hw_slow = (*bytes_hw_slow).max(approx_cost);
channels.candidate_tx.send(scanned).map_err(send_err)?;
}
}
Ok(())
}
fn scanner_emit_cost(item: &ScannerEmit) -> usize {
const DESCRIPTOR_OVERHEAD: usize = 64;
match item {
ScannerEmit::Drain(d) => d.byte_cost(),
ScannerEmit::Candidate(ScannedBlob::Passthrough(d) | ScannedBlob::Candidate(d)) => {
DESCRIPTOR_OVERHEAD + d.tagdata.as_ref().map_or(0, |t| t.len())
}
}
}
fn send_err<T>(_: mpsc::SendError<T>) -> crate::error::Error {
crate::error::new_error(crate::error::ErrorKind::Io(std::io::Error::other(
"scanner dispatch channel closed by downstream",
)))
}