use std::os::unix::fs::FileExt as _;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, mpsc};
use rustc_hash::{FxHashMap, FxHashSet};
use crate::blob::decompress_blob_raw;
use crate::blob_meta::ElemKind;
use crate::block_builder::BlockBuilder;
use crate::error::{Error, ErrorKind, Result, new_error};
use crate::writer::{Compression, frame_blob_pipelined};
use super::classify::block_overlaps_diff;
use super::descriptor::{BlobDescriptor, DrainItem, ScannedBlob, WorkerOutput};
use super::diff_ranges::DiffRanges;
use super::drain::LocMapHandle;
use super::rewrite_block::rewrite_block_parallel;
use crate::osc::CompactDiffOverlay;
pub(super) type CoordSlot = Arc<Mutex<FxHashMap<i64, (i32, i32)>>>;
pub(super) type CoordSlots = Vec<CoordSlot>;
pub(super) struct StreamingConfig {
pub base_pbf: Box<Path>,
pub ranges: Arc<DiffRanges>,
pub diff: Arc<CompactDiffOverlay>,
pub worker_count: usize,
pub locations_on_ways: bool,
pub coord_slots: Option<CoordSlots>,
pub loc_map_handle: Option<LocMapHandle>,
pub needed_set: Option<Arc<FxHashSet<i64>>>,
pub compression: Compression,
pub use_copy_range: bool,
#[cfg(feature = "test-hooks")]
pub panic_at_blob_seq: Option<u64>,
}
pub(super) struct StreamingChannels {
pub candidate_rx: mpsc::Receiver<ScannedBlob>,
pub drain_tx: mpsc::SyncSender<DrainItem>,
}
pub(super) struct WorkerCounters {
pub blobs_processed: AtomicU64,
pub blobs_rewritten: AtomicU64,
pub blobs_false_positive: AtomicU64,
pub blobs_owned_passthrough: AtomicU64,
pub decompress_ns: AtomicU64,
pub parse_ns: AtomicU64,
pub precise_ns: AtomicU64,
pub rewrite_ns: AtomicU64,
pub coord_extract_ns: AtomicU64,
pub coord_pairs_extracted: AtomicU64,
pub frame_ns: AtomicU64,
}
impl WorkerCounters {
pub(super) fn new() -> Self {
Self {
blobs_processed: AtomicU64::new(0),
blobs_rewritten: AtomicU64::new(0),
blobs_false_positive: AtomicU64::new(0),
blobs_owned_passthrough: AtomicU64::new(0),
decompress_ns: AtomicU64::new(0),
parse_ns: AtomicU64::new(0),
precise_ns: AtomicU64::new(0),
rewrite_ns: AtomicU64::new(0),
coord_extract_ns: AtomicU64::new(0),
coord_pairs_extracted: AtomicU64::new(0),
frame_ns: AtomicU64::new(0),
}
}
pub(super) fn emit(&self) {
macro_rules! emit {
($name:literal, $field:ident) => {
let v = self.$field.load(Ordering::Relaxed);
crate::debug::emit_counter($name, i64::try_from(v).unwrap_or(i64::MAX));
};
}
emit!("merge_streaming_blobs_processed", blobs_processed);
emit!("merge_streaming_blobs_rewritten", blobs_rewritten);
emit!("merge_streaming_blobs_false_positive", blobs_false_positive);
emit!("merge_streaming_blobs_owned_passthrough", blobs_owned_passthrough);
emit!("merge_streaming_decompress_ns", decompress_ns);
emit!("merge_streaming_parse_ns", parse_ns);
emit!("merge_streaming_precise_ns", precise_ns);
emit!("merge_streaming_rewrite_ns", rewrite_ns);
emit!("merge_streaming_coord_extract_ns", coord_extract_ns);
emit!("merge_streaming_coord_pairs_extracted", coord_pairs_extracted);
emit!("merge_streaming_frame_ns", frame_ns);
}
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub(super) fn run_workers(
cfg: StreamingConfig,
channels: StreamingChannels,
counters: &WorkerCounters,
) -> Result<()> {
let StreamingConfig {
base_pbf,
ranges,
diff,
worker_count,
locations_on_ways,
coord_slots,
loc_map_handle,
needed_set,
compression,
use_copy_range,
#[cfg(feature = "test-hooks")]
panic_at_blob_seq,
} = cfg;
let StreamingChannels { candidate_rx, drain_tx } = channels;
if locations_on_ways {
let slots_len = coord_slots.as_ref().map_or(0, Vec::len);
if slots_len != worker_count {
return Err(new_error(ErrorKind::Io(std::io::Error::other(format!(
"streaming: locations_on_ways requires coord_slots.len()={worker_count}, got {slots_len}",
)))));
}
}
crate::debug::emit_marker("MERGE_STREAMING_START");
let shared_file = std::fs::File::open(&*base_pbf).map_err(|e| {
new_error(ErrorKind::Io(std::io::Error::other(format!(
"streaming: failed to open {}: {e}",
base_pbf.display(),
))))
})?;
let candidate_rx = Mutex::new(candidate_rx);
let first_err: Mutex<Option<Error>> = Mutex::new(None);
std::thread::scope(|scope| {
for worker_id in 0..worker_count {
let drain_tx = drain_tx.clone();
let coord_slot = coord_slots.as_ref().map(|s| Arc::clone(&s[worker_id]));
let loc_map_handle = loc_map_handle.as_ref().map(Arc::clone);
let needed_set = needed_set.as_ref().map(Arc::clone);
let file = &shared_file;
let candidate_rx = &candidate_rx;
let first_err = &first_err;
let ranges = &*ranges;
let diff = &*diff;
scope.spawn(move || {
let result = worker_loop(
worker_id,
file,
candidate_rx,
&drain_tx,
counters,
ranges,
diff,
locations_on_ways,
coord_slot.as_ref(),
loc_map_handle.as_ref(),
needed_set.as_deref(),
&compression,
use_copy_range,
#[cfg(feature = "test-hooks")]
panic_at_blob_seq,
);
if let Err(e) = result {
let mut slot =
first_err.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
if slot.is_none() {
*slot = Some(e);
}
}
});
}
});
drop(drain_tx);
crate::debug::emit_marker("MERGE_STREAMING_END");
counters.emit();
if let Some(e) = first_err
.into_inner()
.unwrap_or_else(std::sync::PoisonError::into_inner)
{
return Err(e);
}
Ok(())
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
fn worker_loop(
worker_id: usize,
file: &std::fs::File,
candidate_rx: &Mutex<mpsc::Receiver<ScannedBlob>>,
drain_tx: &mpsc::SyncSender<DrainItem>,
counters: &WorkerCounters,
ranges: &DiffRanges,
diff: &CompactDiffOverlay,
locations_on_ways: bool,
coord_slot: Option<&CoordSlot>,
loc_map_handle: Option<&LocMapHandle>,
needed_set: Option<&FxHashSet<i64>>,
compression: &Compression,
use_copy_range: bool,
#[cfg(feature = "test-hooks")] panic_at_blob_seq: Option<u64>,
) -> Result<()> {
let mut read_buf: Vec<u8> = Vec::new();
let mut decompress_buf: Vec<u8> = Vec::new();
let mut st_scratch: Vec<(u32, u32)> = Vec::new();
let mut gr_scratch: Vec<(u32, u32)> = Vec::new();
let mut bb = BlockBuilder::new();
let mut local_coords: FxHashMap<i64, (i32, i32)> = FxHashMap::default();
let mut tuples_scratch: Vec<crate::scan::node::NodeTuple> = Vec::new();
let mut group_starts_scratch: Vec<(usize, usize)> = Vec::new();
loop {
let item = {
let rx = candidate_rx.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
match rx.recv() {
Ok(item) => item,
Err(_) => break, }
};
#[cfg(feature = "test-hooks")]
{
let seq = match &item {
ScannedBlob::Candidate(d) | ScannedBlob::Passthrough(d) => d.seq,
};
if panic_at_blob_seq == Some(seq) {
panic!(
"test-hooks: panic_at_blob_seq={seq} triggered in worker {worker_id}"
);
}
}
match item {
ScannedBlob::Candidate(desc) => {
handle_candidate(
file,
&desc,
drain_tx,
counters,
ranges,
diff,
&mut bb,
&mut read_buf,
&mut decompress_buf,
&mut st_scratch,
&mut gr_scratch,
locations_on_ways,
&mut local_coords,
&mut tuples_scratch,
&mut group_starts_scratch,
coord_slot,
loc_map_handle,
needed_set,
compression,
use_copy_range,
)?;
}
ScannedBlob::Passthrough(desc) => {
let count = desc.index.as_ref().map_or(0, |i| i.count);
handle_owned_passthrough(
file, &desc, drain_tx, counters, &mut read_buf, count, None,
)?;
}
}
}
if locations_on_ways && !local_coords.is_empty()
&& let Some(slot) = coord_slot
{
flush_local_coords(slot, &mut local_coords);
}
let _ = worker_id; Ok(())
}
#[allow(clippy::too_many_arguments)]
#[allow(clippy::too_many_lines)]
fn handle_candidate(
file: &std::fs::File,
desc: &BlobDescriptor,
drain_tx: &mpsc::SyncSender<DrainItem>,
counters: &WorkerCounters,
ranges: &DiffRanges,
diff: &CompactDiffOverlay,
bb: &mut BlockBuilder,
read_buf: &mut Vec<u8>,
decompress_buf: &mut Vec<u8>,
st_scratch: &mut Vec<(u32, u32)>,
gr_scratch: &mut Vec<(u32, u32)>,
locations_on_ways: bool,
local_coords: &mut FxHashMap<i64, (i32, i32)>,
tuples_scratch: &mut Vec<crate::scan::node::NodeTuple>,
group_starts_scratch: &mut Vec<(usize, usize)>,
coord_slot: Option<&CoordSlot>,
loc_map_handle: Option<&LocMapHandle>,
needed_set: Option<&FxHashSet<i64>>,
compression: &Compression,
use_copy_range: bool,
) -> Result<()> {
counters.blobs_processed.fetch_add(1, Ordering::Relaxed);
let body_offset = desc.frame_start + desc.blob_offset as u64;
pread_into(file, read_buf, body_offset, desc.data_size).map_err(io_err)?;
let t_decompress = std::time::Instant::now();
decompress_blob_raw(read_buf, decompress_buf).map_err(io_err)?;
counters
.decompress_ns
.fetch_add(elapsed_ns(t_decompress), Ordering::Relaxed);
if locations_on_ways && desc.kind == ElemKind::Node {
let t_extract = std::time::Instant::now();
tuples_scratch.clear();
group_starts_scratch.clear();
if crate::scan::node::extract_node_tuples(
decompress_buf,
tuples_scratch,
group_starts_scratch,
)
.is_ok()
{
let mut hits = 0u64;
for t in tuples_scratch.iter() {
if let Some(ns) = needed_set
&& !ns.contains(&t.id)
{
continue;
}
local_coords.insert(t.id, (t.lat, t.lon));
hits += 1;
}
counters
.coord_pairs_extracted
.fetch_add(hits, Ordering::Relaxed);
}
counters
.coord_extract_ns
.fetch_add(elapsed_ns(t_extract), Ordering::Relaxed);
}
let t_parse = std::time::Instant::now();
let raw = std::mem::take(decompress_buf);
let block =
crate::PrimitiveBlock::from_vec_with_scratch(raw, st_scratch, gr_scratch).map_err(io_err)?;
counters
.parse_ns
.fetch_add(elapsed_ns(t_parse), Ordering::Relaxed);
let t_precise = std::time::Instant::now();
let overlaps = block_overlaps_diff(&block, ranges);
counters
.precise_ns
.fetch_add(elapsed_ns(t_precise), Ordering::Relaxed);
if let Some(slot) = coord_slot
&& !local_coords.is_empty()
{
flush_local_coords(slot, local_coords);
}
if !overlaps {
counters.blobs_false_positive.fetch_add(1, Ordering::Relaxed);
let blob_count = desc
.index
.as_ref()
.map_or_else(|| count_block_elements(&block), |i| i.count);
let (effective_kind, effective_id_range) = if desc.index.is_some() {
(desc.kind, desc.id_range.unwrap_or((0, 0)))
} else {
let (k, min_id, max_id) = infer_kind_and_range(&block);
(k, (min_id, max_id))
};
if use_copy_range {
let mut patched = desc.clone();
patched.kind = effective_kind;
patched.id_range = Some(effective_id_range);
return send_drain(
drain_tx,
WorkerOutput::FalsePositive(patched).into_drain_item(blob_count),
);
}
return handle_owned_passthrough(
file,
desc,
drain_tx,
counters,
read_buf,
blob_count,
Some((effective_kind, effective_id_range)),
);
}
let (effective_kind, effective_min_id, effective_max_id) = match desc.id_range {
Some((min, max)) => (desc.kind, min, max),
None => infer_kind_and_range(&block),
};
let inline_upserts = upsert_slice(ranges, effective_kind, effective_min_id, effective_max_id);
let loc_arc = if locations_on_ways && effective_kind != ElemKind::Node {
loc_map_handle.and_then(|h| h.get().cloned())
} else {
None
};
let loc_map: Option<&FxHashMap<i64, (i32, i32)>> = loc_arc.as_deref();
let t_rewrite = std::time::Instant::now();
let output = rewrite_block_parallel(&block, diff, bb, inline_upserts, effective_kind, loc_map)
.map_err(|e| new_error(ErrorKind::Io(std::io::Error::other(e.to_string()))))?;
counters
.rewrite_ns
.fetch_add(elapsed_ns(t_rewrite), Ordering::Relaxed);
let t_frame = std::time::Instant::now();
let mut framed_chunks: Vec<Vec<u8>> = Vec::with_capacity(output.blocks.len());
for (block_bytes, index, tagdata) in output.blocks {
let indexdata = index.serialize();
let parts = frame_blob_pipelined(
&block_bytes,
compression,
Some(&indexdata),
tagdata.as_deref(),
)
.map_err(io_err)?;
framed_chunks.push(parts.into_vec());
}
counters
.frame_ns
.fetch_add(elapsed_ns(t_frame), Ordering::Relaxed);
counters.blobs_rewritten.fetch_add(1, Ordering::Relaxed);
send_drain(
drain_tx,
DrainItem::Rewritten {
seq: desc.seq,
framed_chunks,
kind: effective_kind,
id_range: (effective_min_id, effective_max_id),
stats: output.stats,
},
)
}
fn count_block_elements(block: &crate::PrimitiveBlock) -> u64 {
u64::try_from(block.elements_skip_metadata().count()).unwrap_or(u64::MAX)
}
fn infer_kind_and_range(block: &crate::PrimitiveBlock) -> (ElemKind, i64, i64) {
use crate::{BlockType, Element};
let primary = match block.block_type() {
BlockType::DenseNodes | BlockType::Nodes => Some(ElemKind::Node),
BlockType::Ways => Some(ElemKind::Way),
BlockType::Relations => Some(ElemKind::Relation),
BlockType::Mixed | BlockType::Empty => None,
};
let mut kind: Option<ElemKind> = primary;
let mut min_id = i64::MAX;
let mut max_id = i64::MIN;
for element in block.elements_skip_metadata() {
let (elem_kind, id) = match &element {
Element::DenseNode(dn) => (ElemKind::Node, dn.id()),
Element::Node(n) => (ElemKind::Node, n.id()),
Element::Way(w) => (ElemKind::Way, w.id()),
Element::Relation(r) => (ElemKind::Relation, r.id()),
};
match kind {
Some(k) if k == elem_kind => {
min_id = min_id.min(id);
max_id = max_id.max(id);
}
Some(_) => {
}
None => {
kind = Some(elem_kind);
min_id = id;
max_id = id;
}
}
}
match kind {
Some(k) => (k, min_id, max_id),
None => (ElemKind::Node, i64::MAX, i64::MIN),
}
}
fn handle_owned_passthrough(
file: &std::fs::File,
desc: &BlobDescriptor,
drain_tx: &mpsc::SyncSender<DrainItem>,
counters: &WorkerCounters,
read_buf: &mut Vec<u8>,
count: u64,
override_kind_range: Option<(ElemKind, (i64, i64))>,
) -> Result<()> {
counters.blobs_processed.fetch_add(1, Ordering::Relaxed);
counters
.blobs_owned_passthrough
.fetch_add(1, Ordering::Relaxed);
pread_into(file, read_buf, desc.frame_start, desc.frame_len).map_err(io_err)?;
let frame_bytes = std::mem::take(read_buf);
let (kind, id_range) = override_kind_range
.unwrap_or_else(|| (desc.kind, desc.id_range.unwrap_or((0, 0))));
send_drain(
drain_tx,
DrainItem::OwnedBytes {
seq: desc.seq,
frame_bytes,
kind,
id_range,
count,
},
)
}
fn pread_into(
file: &std::fs::File,
buf: &mut Vec<u8>,
offset: u64,
len: usize,
) -> std::io::Result<()> {
buf.resize(len, 0);
file.read_exact_at(buf, offset)
}
fn upsert_slice(ranges: &DiffRanges, kind: ElemKind, min_id: i64, max_id: i64) -> &[i64] {
let upserts = ranges.upserts(kind);
let first = crate::osm_id::blob_osm_first_key(min_id, max_id);
let last = crate::osm_id::blob_osm_last_key(min_id, max_id);
let start = upserts.partition_point(|&id| crate::osm_id::osm_id_key(id) < first);
let end = upserts[start..].partition_point(|&id| crate::osm_id::osm_id_key(id) <= last) + start;
&upserts[start..end]
}
fn flush_local_coords(
slot: &Arc<Mutex<FxHashMap<i64, (i32, i32)>>>,
local: &mut FxHashMap<i64, (i32, i32)>,
) {
if local.is_empty() {
return;
}
let mut shared = slot.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
if shared.is_empty() {
std::mem::swap(&mut *shared, local);
} else {
shared.extend(local.drain());
}
}
fn send_drain(drain_tx: &mpsc::SyncSender<DrainItem>, item: DrainItem) -> Result<()> {
drain_tx.send(item).map_err(|_| {
new_error(ErrorKind::Io(std::io::Error::other(
"streaming: drain closed worker→drain channel",
)))
})
}
#[allow(clippy::needless_pass_by_value)]
fn io_err<E: std::fmt::Display>(e: E) -> Error {
new_error(ErrorKind::Io(std::io::Error::other(e.to_string())))
}
fn elapsed_ns(t: std::time::Instant) -> u64 {
u64::try_from(t.elapsed().as_nanos()).unwrap_or(u64::MAX)
}