use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, OnceLock, mpsc};
use rustc_hash::FxHashMap;
use crate::blob_meta::ElemKind;
use crate::block_builder::BlockBuilder;
use crate::commands::{flush_block, flush_passthrough_buf};
use crate::error::{Error, ErrorKind, Result, new_error};
use crate::file_writer::FileWriter;
use crate::osc::CompactDiffOverlay;
use crate::writer::PbfWriter;
use super::descriptor::DrainItem;
use super::diff_ranges::{DiffRanges, UpsertCursors};
use super::stats::MergeStats;
use super::stream_output::{
emit_create_for_output, emit_gap_creates, flush_remaining_upserts, has_gap_creates,
};
use super::streaming::CoordSlots;
pub(super) type LocMapHandle = Arc<OnceLock<Arc<FxHashMap<i64, (i32, i32)>>>>;
pub(super) type SeededLocations = std::sync::Mutex<Option<FxHashMap<i64, (i32, i32)>>>;
pub(super) struct DrainConfig<'a> {
pub ranges: &'a DiffRanges,
pub diff: &'a CompactDiffOverlay,
pub use_copy_range: bool,
#[cfg_attr(not(feature = "linux-direct-io"), allow(dead_code))]
pub input_fd: i32,
pub locations_on_ways: bool,
pub coord_slots: Option<CoordSlots>,
pub seeded_locations: Option<SeededLocations>,
pub barrier_tx: Option<mpsc::SyncSender<()>>,
pub loc_map_out: Option<LocMapHandle>,
}
pub(super) struct DrainChannels {
pub drain_rx: mpsc::Receiver<DrainItem>,
pub last_node_seq_rx: Option<mpsc::Receiver<u64>>,
}
pub(super) struct DrainCounters {
pub items_processed: AtomicU64,
pub copy_range_calls: AtomicU64,
pub copy_range_coalesced_items: AtomicU64,
pub passthrough_chunks_flushed: AtomicU64,
pub rewrite_blocks_written: AtomicU64,
pub gap_creates_emitted: AtomicU64,
pub trailing_creates_emitted: AtomicU64,
pub reorder_buffer_high_water_count: AtomicU64,
pub reorder_buffer_high_water_bytes: AtomicU64,
pub barrier_loc_map_size: AtomicU64,
pub reorder_gap_wait_ns: AtomicU64,
}
impl DrainCounters {
pub(super) fn new() -> Self {
Self {
items_processed: AtomicU64::new(0),
copy_range_calls: AtomicU64::new(0),
copy_range_coalesced_items: AtomicU64::new(0),
passthrough_chunks_flushed: AtomicU64::new(0),
rewrite_blocks_written: AtomicU64::new(0),
gap_creates_emitted: AtomicU64::new(0),
trailing_creates_emitted: AtomicU64::new(0),
reorder_buffer_high_water_count: AtomicU64::new(0),
reorder_buffer_high_water_bytes: AtomicU64::new(0),
barrier_loc_map_size: AtomicU64::new(0),
reorder_gap_wait_ns: AtomicU64::new(0),
}
}
pub(super) fn emit(&self) {
macro_rules! emit {
($name:literal, $field:ident) => {
let v = self.$field.load(Ordering::Relaxed);
crate::debug::emit_counter($name, i64::try_from(v).unwrap_or(i64::MAX));
};
}
emit!("merge_drain_items_processed", items_processed);
emit!("merge_drain_copy_range_calls", copy_range_calls);
emit!("merge_drain_copy_range_coalesced_items", copy_range_coalesced_items);
emit!("merge_drain_passthrough_chunks_flushed", passthrough_chunks_flushed);
emit!("merge_drain_rewrite_blocks_written", rewrite_blocks_written);
emit!("merge_drain_gap_creates_emitted", gap_creates_emitted);
emit!("merge_drain_trailing_creates_emitted", trailing_creates_emitted);
emit!("merge_drain_reorder_buffer_high_water_count", reorder_buffer_high_water_count);
emit!("merge_drain_reorder_buffer_high_water_bytes", reorder_buffer_high_water_bytes);
emit!("merge_drain_barrier_loc_map_size", barrier_loc_map_size);
emit!("merge_drain_reorder_gap_wait_ns", reorder_gap_wait_ns);
}
}
#[allow(clippy::too_many_lines, clippy::cognitive_complexity, clippy::needless_pass_by_value)]
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub(super) fn run_drain(
cfg: DrainConfig<'_>,
channels: DrainChannels,
writer: &mut PbfWriter<FileWriter>,
counters: &DrainCounters,
) -> Result<MergeStats> {
crate::debug::emit_marker("MERGE_DRAIN_START");
let DrainChannels { drain_rx, last_node_seq_rx } = channels;
let mut state = DrainState::new();
let mut stats = MergeStats::new();
let mut last_node_seq: Option<u64> = None;
let recv_timeout = std::time::Duration::from_millis(25);
let mut drain_disconnected = false;
loop {
let recv_start = std::time::Instant::now();
let item_opt = if drain_disconnected {
None
} else {
match drain_rx.recv_timeout(recv_timeout) {
Ok(item) => Some(item),
Err(mpsc::RecvTimeoutError::Timeout) => None,
Err(mpsc::RecvTimeoutError::Disconnected) => {
drain_disconnected = true;
None
}
}
};
let wait_ns = u64::try_from(recv_start.elapsed().as_nanos()).unwrap_or(u64::MAX);
if state.buffer.is_empty() {
counters
.reorder_gap_wait_ns
.fetch_add(wait_ns, Ordering::Relaxed);
}
if let Some(item) = item_opt {
let seq = item.seq();
let cost = item.byte_cost();
state.buffer.insert(seq, item);
state.bytes_in_buffer += cost;
let buf_count = state.buffer.len() as u64;
let prev_count = counters
.reorder_buffer_high_water_count
.load(Ordering::Relaxed);
if buf_count > prev_count {
counters
.reorder_buffer_high_water_count
.store(buf_count, Ordering::Relaxed);
}
let buf_bytes = state.bytes_in_buffer as u64;
let prev_bytes = counters
.reorder_buffer_high_water_bytes
.load(Ordering::Relaxed);
if buf_bytes > prev_bytes {
counters
.reorder_buffer_high_water_bytes
.store(buf_bytes, Ordering::Relaxed);
}
}
if cfg.locations_on_ways
&& last_node_seq.is_none()
&& let Some(rx) = last_node_seq_rx.as_ref()
&& let Ok(n) = rx.try_recv()
{
last_node_seq = Some(n);
}
while let Some(item) = state.buffer.remove(&state.next_seq) {
state.bytes_in_buffer = state.bytes_in_buffer.saturating_sub(item.byte_cost());
process_item(item, &cfg, &mut state, writer, &mut stats, counters)?;
state.next_seq += 1;
if cfg.locations_on_ways
&& !state.barrier_done
&& let Some(n) = last_node_seq
&& (n == u64::MAX || state.next_seq > n)
{
barrier_publish_loc_map(&cfg, &mut state, counters)?;
}
}
if cfg.locations_on_ways
&& !state.barrier_done
&& let Some(n) = last_node_seq
&& (n == u64::MAX || state.next_seq > n)
{
barrier_publish_loc_map(&cfg, &mut state, counters)?;
}
if drain_disconnected {
break;
}
}
if !state.buffer.is_empty() {
let first_remaining = state.buffer.keys().next().copied().unwrap_or(0);
return Err(new_error(ErrorKind::Io(std::io::Error::other(format!(
"drain: channel closed with {} items still in reorder buffer; expected next seq {}, \
smallest remaining seq {}. Producer dropped a seq.",
state.buffer.len(),
state.next_seq,
first_remaining,
)))));
}
flush_copy_range(&mut state, &cfg, writer, counters)?;
flush_passthrough_buf(&mut state.passthrough_chunks, writer)
.map_err(|e| new_error(ErrorKind::Io(std::io::Error::other(e.to_string()))))?;
crate::debug::emit_marker("MERGE_TRAILING_CREATES_START");
let types_to_flush = match state.last_type {
None | Some(ElemKind::Node) => &[ElemKind::Node, ElemKind::Way, ElemKind::Relation][..],
Some(ElemKind::Way) => &[ElemKind::Way, ElemKind::Relation][..],
Some(ElemKind::Relation) => &[ElemKind::Relation][..],
};
let loc_map_ref = state.loc_map.as_deref();
for &kind in types_to_flush {
let (cursor, upserts) = state.cursors.get_mut(kind, cfg.ranges);
while *cursor < upserts.len() {
emit_create_for_output(
upserts[*cursor],
kind,
cfg.diff,
&mut state.bb,
writer,
&mut stats,
loc_map_ref,
)
.map_err(|e| new_error(ErrorKind::Io(std::io::Error::other(e.to_string()))))?;
*cursor += 1;
counters
.trailing_creates_emitted
.fetch_add(1, Ordering::Relaxed);
}
flush_block(&mut state.bb, writer)
.map_err(|e| new_error(ErrorKind::Io(std::io::Error::other(e.to_string()))))?;
}
crate::debug::emit_marker("MERGE_TRAILING_CREATES_END");
writer.flush()?;
crate::debug::emit_marker("MERGE_DRAIN_END");
counters.emit();
Ok(stats)
}
struct DrainState {
buffer: BTreeMap<u64, DrainItem>,
bytes_in_buffer: usize,
next_seq: u64,
last_type: Option<ElemKind>,
cursors: UpsertCursors,
bb: BlockBuilder,
copy_range_run: Option<(u64, u64)>,
passthrough_chunks: Vec<Vec<u8>>,
loc_map: Option<Arc<FxHashMap<i64, (i32, i32)>>>,
barrier_done: bool,
}
impl DrainState {
fn new() -> Self {
Self {
buffer: BTreeMap::new(),
bytes_in_buffer: 0,
next_seq: 0,
last_type: None,
cursors: UpsertCursors::new(),
bb: BlockBuilder::new(),
copy_range_run: None,
passthrough_chunks: Vec::new(),
loc_map: None,
barrier_done: false,
}
}
}
#[allow(clippy::too_many_arguments)]
fn handle_type_transition(
prev: ElemKind,
item_kind: ElemKind,
cfg: &DrainConfig<'_>,
state: &mut DrainState,
writer: &mut PbfWriter<FileWriter>,
stats: &mut MergeStats,
counters: &DrainCounters,
) -> Result<()> {
flush_copy_range(state, cfg, writer, counters)?;
let had_chunks = !state.passthrough_chunks.is_empty();
flush_passthrough_buf(&mut state.passthrough_chunks, writer).map_err(io_err)?;
if had_chunks {
counters
.passthrough_chunks_flushed
.fetch_add(1, Ordering::Relaxed);
}
if cfg.locations_on_ways
&& !state.barrier_done
&& prev == ElemKind::Node
&& matches!(item_kind, ElemKind::Way | ElemKind::Relation)
{
barrier_publish_loc_map(cfg, state, counters)?;
}
let loc_map_ref = state.loc_map.as_deref();
flush_remaining_upserts(
prev,
item_kind,
cfg.ranges,
cfg.diff,
&mut state.cursors,
&mut state.bb,
writer,
stats,
loc_map_ref,
)
.map_err(io_err)
}
fn handle_gap_creates(
item_kind: ElemKind,
osm_first: i64,
cfg: &DrainConfig<'_>,
state: &mut DrainState,
writer: &mut PbfWriter<FileWriter>,
stats: &mut MergeStats,
counters: &DrainCounters,
) -> Result<()> {
if !has_gap_creates(item_kind, osm_first, cfg.ranges, &state.cursors) {
return Ok(());
}
flush_copy_range(state, cfg, writer, counters)?;
flush_passthrough_buf(&mut state.passthrough_chunks, writer).map_err(io_err)?;
let loc_map_ref = state.loc_map.as_deref();
emit_gap_creates(
item_kind,
osm_first,
cfg.ranges,
cfg.diff,
&mut state.cursors,
&mut state.bb,
writer,
stats,
loc_map_ref,
)
.map_err(io_err)?;
flush_block(&mut state.bb, writer).map_err(io_err)?;
counters.gap_creates_emitted.fetch_add(1, Ordering::Relaxed);
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn process_item(
item: DrainItem,
cfg: &DrainConfig<'_>,
state: &mut DrainState,
writer: &mut PbfWriter<FileWriter>,
stats: &mut MergeStats,
counters: &DrainCounters,
) -> Result<()> {
counters.items_processed.fetch_add(1, Ordering::Relaxed);
let item_kind = item.kind();
let (min_id, max_id) = item.id_range();
if let Some(prev) = state.last_type
&& prev != item_kind
{
handle_type_transition(prev, item_kind, cfg, state, writer, stats, counters)?;
}
state.last_type = Some(item_kind);
if min_id <= max_id {
let osm_first = crate::osm_id::blob_osm_first_id(min_id, max_id);
handle_gap_creates(item_kind, osm_first, cfg, state, writer, stats, counters)?;
}
dispatch_variant(item, cfg, state, writer, stats, counters)
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
fn dispatch_variant(
item: DrainItem,
cfg: &DrainConfig<'_>,
state: &mut DrainState,
writer: &mut PbfWriter<FileWriter>,
stats: &mut MergeStats,
counters: &DrainCounters,
) -> Result<()> {
match item {
DrainItem::CopyRange {
seq: _,
frame_start,
frame_len,
kind,
id_range: _,
index,
tagdata: _,
} => {
if !cfg.use_copy_range {
return Err(new_error(ErrorKind::Io(std::io::Error::other(
"drain: received CopyRange item but use_copy_range is false",
))));
}
flush_passthrough_buf(&mut state.passthrough_chunks, writer).map_err(io_err)?;
let new_end = frame_start + frame_len as u64;
match state.copy_range_run {
Some((run_start, run_end)) if run_end == frame_start => {
state.copy_range_run = Some((run_start, new_end));
counters
.copy_range_coalesced_items
.fetch_add(1, Ordering::Relaxed);
}
_ => {
flush_copy_range(state, cfg, writer, counters)?;
state.copy_range_run = Some((frame_start, new_end));
}
}
stats.bytes_passthrough += frame_len as u64;
stats.blobs_passthrough += 1;
stats.blobs_index_hit += 1;
#[allow(clippy::cast_possible_truncation)]
stats.blob_sizes.push(frame_len as u32);
match kind {
ElemKind::Node => stats.base_nodes += index.count,
ElemKind::Way => stats.base_ways += index.count,
ElemKind::Relation => stats.base_relations += index.count,
}
}
DrainItem::OwnedBytes {
seq: _,
frame_bytes,
kind,
id_range: _,
count,
} => {
flush_copy_range(state, cfg, writer, counters)?;
let frame_len = frame_bytes.len() as u64;
state.passthrough_chunks.push(frame_bytes);
stats.bytes_passthrough += frame_len;
stats.blobs_passthrough += 1;
#[allow(clippy::cast_possible_truncation)]
stats.blob_sizes.push(frame_len as u32);
match kind {
ElemKind::Node => stats.base_nodes += count,
ElemKind::Way => stats.base_ways += count,
ElemKind::Relation => stats.base_relations += count,
}
}
DrainItem::Rewritten {
seq: _,
framed_chunks,
kind,
id_range,
stats: per_blob_stats,
} => {
flush_copy_range(state, cfg, writer, counters)?;
flush_passthrough_buf(&mut state.passthrough_chunks, writer).map_err(io_err)?;
let mut rewrite_bytes: u64 = 0;
for chunk in framed_chunks {
rewrite_bytes += chunk.len() as u64;
writer.write_raw_owned(chunk).map_err(io_err)?;
counters
.rewrite_blocks_written
.fetch_add(1, Ordering::Relaxed);
}
stats.bytes_rewritten += rewrite_bytes;
stats.blobs_rewritten += 1;
stats.merge_from(&per_blob_stats);
let (min_id, max_id) = id_range;
let last = crate::osm_id::blob_osm_last_key(min_id, max_id);
let (cursor, upserts) = state.cursors.get_mut(kind, cfg.ranges);
while *cursor < upserts.len() && crate::osm_id::osm_id_key(upserts[*cursor]) <= last {
*cursor += 1;
}
}
}
Ok(())
}
#[cfg(feature = "linux-direct-io")]
fn flush_copy_range(
state: &mut DrainState,
cfg: &DrainConfig<'_>,
writer: &mut PbfWriter<FileWriter>,
counters: &DrainCounters,
) -> Result<()> {
let Some((start, end)) = state.copy_range_run.take() else {
return Ok(());
};
let len = end - start;
writer
.write_raw_copy(cfg.input_fd, start, len)
.map_err(io_err)?;
counters.copy_range_calls.fetch_add(1, Ordering::Relaxed);
Ok(())
}
#[cfg(not(feature = "linux-direct-io"))]
fn flush_copy_range(
state: &mut DrainState,
_cfg: &DrainConfig<'_>,
_writer: &mut PbfWriter<FileWriter>,
_counters: &DrainCounters,
) -> Result<()> {
if state.copy_range_run.take().is_some() {
return Err(new_error(ErrorKind::Io(std::io::Error::other(
"drain: copy_file_range path requires linux-direct-io feature",
))));
}
Ok(())
}
fn barrier_publish_loc_map(
cfg: &DrainConfig<'_>,
state: &mut DrainState,
counters: &DrainCounters,
) -> Result<()> {
crate::debug::emit_marker("MERGE_DRAIN_BARRIER_START");
let slots = cfg.coord_slots.as_ref().ok_or_else(|| {
new_error(ErrorKind::Io(std::io::Error::other(
"drain: locations_on_ways true but coord_slots is None",
)))
})?;
let loc_map_out = cfg.loc_map_out.as_ref().ok_or_else(|| {
new_error(ErrorKind::Io(std::io::Error::other(
"drain: locations_on_ways true but loc_map_out is None",
)))
})?;
let mut merged: FxHashMap<i64, (i32, i32)> = cfg
.seeded_locations
.as_ref()
.and_then(|m| {
m.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.take()
})
.unwrap_or_default();
for slot in slots {
let mut local = slot
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if merged.is_empty() {
std::mem::swap(&mut merged, &mut *local);
} else {
merged.extend(local.drain());
}
}
counters
.barrier_loc_map_size
.store(merged.len() as u64, Ordering::Relaxed);
let arc = Arc::new(merged);
if loc_map_out.set(Arc::clone(&arc)).is_err() {
return Err(new_error(ErrorKind::Io(std::io::Error::other(
"drain: loc_map_out already published - barrier ran twice?",
))));
}
state.loc_map = Some(arc);
state.barrier_done = true;
if let Some(barrier_tx) = cfg.barrier_tx.as_ref() {
if let Err(e) = barrier_tx.send(()) {
let _: mpsc::SendError<()> = e;
}
}
crate::debug::emit_marker("MERGE_DRAIN_BARRIER_END");
Ok(())
}
#[allow(clippy::needless_pass_by_value)]
fn io_err<E: std::fmt::Display>(e: E) -> Error {
new_error(ErrorKind::Io(std::io::Error::other(e.to_string())))
}