use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::fs::File;
use std::io::{self, Read, Seek, SeekFrom};
use std::path::Path;
use crate::blob::{
decode_blob_to_headerblock, decode_blob_to_primitiveblock, decompress_blob_data_into,
BlobKind,
};
use crate::blob_meta::{BlobIndex, ElemKind, scan_block_ids};
use crate::block_builder::BlockBuilder;
use crate::file_writer::FileWriter;
use crate::read::header_walker::HeaderWalker;
use crate::writer::{reframe_raw_with_index, Compression, PbfWriter};
use crate::Element;
use crate::block_builder::OwnedBlock;
use crate::owned::{
read_dense_node, read_node, read_relation, read_way,
write_single_node_local, write_single_relation_local, write_single_way_local,
OwnedNode, OwnedRelation, OwnedWay,
};
use rayon::prelude::*;
use super::{
build_output_header, require_indexdata, HeaderOverrides, Result,
writer_from_header_bytes,
};
pub struct SortStats {
pub nodes: u64,
pub ways: u64,
pub relations: u64,
pub blobs_passthrough: u64,
pub blobs_rewritten: u64,
pub blobs_total: u64,
}
impl SortStats {
pub fn print_summary(&self) {
eprintln!(
"Sorted {} nodes, {} ways, {} relations ({} blobs: {} passthrough, {} rewritten)",
self.nodes, self.ways, self.relations,
self.blobs_total, self.blobs_passthrough, self.blobs_rewritten,
);
}
}
struct BlobEntry {
file_offset: u64,
frame_len: u64,
index: BlobIndex,
has_indexdata: bool,
tagdata: Option<Box<[u8]>>,
}
pub struct SortOptions {
pub compression: Compression,
pub direct_io: bool,
pub io_uring: bool,
pub force: bool,
}
#[allow(clippy::too_many_lines)]
#[hotpath::measure]
pub fn sort(input: &Path, output: &Path, opts: &SortOptions, overrides: &HeaderOverrides) -> Result<SortStats> {
let SortOptions { compression, direct_io, io_uring, force } = *opts;
require_indexdata(input, direct_io, force,
"input PBF has no blob-level indexdata. Without indexdata, every blob must be \
decompressed to scan element IDs (significantly slower).")?;
#[allow(clippy::cast_possible_wrap)]
if let Ok(meta) = std::fs::metadata(input) {
crate::debug::emit_counter("sort_total_bytes_in", meta.len() as i64);
}
crate::debug::emit_marker("SORT_PASS1_START");
eprintln!("Pass 1: indexing blobs...");
crate::debug::emit_marker("SORT_INDEX_BUILD_START");
let (header, mut entries) = build_blob_index(input, direct_io)?;
crate::debug::emit_marker("SORT_INDEX_BUILD_END");
super::warn_locations_on_ways_loss(&header);
eprintln!(" {} OSMData blobs indexed", entries.len());
#[allow(clippy::cast_possible_wrap)]
crate::debug::emit_counter("sort_blobs_total", entries.len() as i64);
crate::debug::emit_marker("SORT_OVERLAP_DETECT_START");
entries.sort_by_key(|e| {
(type_order(e.index.kind), crate::osm_id::blob_osm_first_key(e.index.min_id, e.index.max_id))
});
let overlaps = detect_overlaps(&entries);
crate::debug::emit_marker("SORT_OVERLAP_DETECT_END");
let overlap_count = overlaps.iter().filter(|&&b| b).count();
#[allow(clippy::cast_possible_wrap)]
crate::debug::emit_counter("sort_blobs_overlap", overlap_count as i64);
if overlap_count > 0 {
eprintln!(" {overlap_count} blobs in overlap runs (decode + re-encode)");
}
crate::debug::emit_marker("SORT_PASS1_END");
crate::debug::emit_marker("SORT_PASS2_START");
eprintln!("Pass 2: writing sorted output...");
crate::debug::emit_marker("SORT_WRITER_SETUP_START");
#[allow(clippy::redundant_closure_for_method_calls)]
let header_bytes = build_output_header(&header, false, overrides, |hb| hb.sorted())?;
let mut writer = writer_from_header_bytes(
output,
compression,
&header_bytes,
direct_io,
io_uring,
)?;
crate::debug::emit_marker("SORT_WRITER_SETUP_END");
let mut input_file = File::open(input)?;
#[cfg(feature = "linux-direct-io")]
let input_fd = {
use std::os::unix::io::AsRawFd;
input_file.as_raw_fd()
};
#[cfg(not(feature = "linux-direct-io"))]
let input_fd = 0i32;
let use_copy_range = io_uring || (!direct_io && cfg!(feature = "linux-direct-io"));
let mut stats = SortStats {
nodes: 0,
ways: 0,
relations: 0,
blobs_passthrough: 0,
blobs_rewritten: 0,
blobs_total: entries.len() as u64,
};
let mut frame_buf: Vec<u8> = Vec::new();
let overlap_runs: Vec<(usize, usize, ElemKind)> = collect_overlap_runs(&entries, &overlaps);
#[allow(clippy::cast_possible_wrap)]
crate::debug::emit_counter("sort_overlap_runs", overlap_runs.len() as i64);
let overlap_outputs: Vec<(Vec<OwnedBlock>, OverlapCounts)> = if overlap_runs.is_empty() {
Vec::new()
} else {
crate::debug::emit_marker("SORT_OVERLAP_PARALLEL_START");
let results: std::result::Result<Vec<_>, String> = overlap_runs
.par_iter()
.map(|(start, end, kind)| {
compute_overlap_run_local(&entries[*start..*end], *kind, input)
})
.collect();
let out = results.map_err(|e| -> Box<dyn std::error::Error> { e.into() })?;
crate::debug::emit_marker("SORT_OVERLAP_PARALLEL_END");
out
};
crate::debug::emit_marker("SORT_WRITE_LOOP_START");
let mut copy_run: Option<(u64, u64)> = None;
let mut copy_run_calls: u64 = 0;
let mut copy_run_coalesced: u64 = 0;
let mut overlap_iter = overlap_outputs.into_iter();
let mut i = 0;
while i < entries.len() {
if overlaps[i] {
flush_copy_run(&mut copy_run, &mut writer, input_fd, &mut copy_run_calls)?;
let start = i;
let run_kind = entries[i].index.kind;
while i < entries.len() && overlaps[i] && entries[i].index.kind == run_kind {
i += 1;
}
let (blocks, counts) = overlap_iter
.next()
.ok_or_else(|| io::Error::other("overlap output iterator drained"))?;
for (block_bytes, index, tagdata) in blocks {
writer.write_primitive_block_owned(block_bytes, index, tagdata.as_deref())?;
}
stats.nodes += counts.nodes;
stats.ways += counts.ways;
stats.relations += counts.relations;
stats.blobs_rewritten += (i - start) as u64;
} else {
let entry = &entries[i];
match try_extend_copy_run(
&mut copy_run,
entry,
use_copy_range,
&mut writer,
input_fd,
&mut copy_run_calls,
)? {
CopyRunStep::Extended => copy_run_coalesced += 1,
CopyRunStep::Started => {}
CopyRunStep::Fallback => {
flush_copy_run(&mut copy_run, &mut writer, input_fd, &mut copy_run_calls)?;
write_passthrough_blob(entry, &mut input_file, &mut writer, &mut frame_buf)?;
}
}
count_entry(entry, &mut stats);
stats.blobs_passthrough += 1;
i += 1;
}
}
flush_copy_run(&mut copy_run, &mut writer, input_fd, &mut copy_run_calls)?;
debug_assert!(overlap_iter.next().is_none(), "overlap outputs not fully drained");
crate::debug::emit_marker("SORT_WRITE_LOOP_END");
#[allow(clippy::cast_possible_wrap)]
{
crate::debug::emit_counter("sort_blobs_passthrough", stats.blobs_passthrough as i64);
crate::debug::emit_counter("sort_blobs_rewritten", stats.blobs_rewritten as i64);
crate::debug::emit_counter("sort_copy_range_calls", copy_run_calls as i64);
crate::debug::emit_counter("sort_copy_range_coalesced", copy_run_coalesced as i64);
}
crate::debug::emit_marker("SORT_FLUSH_START");
crate::debug::emit_marker("WAIT_WRITER_START");
writer.flush()?;
crate::debug::emit_marker("WAIT_WRITER_END");
crate::debug::emit_marker("SORT_FLUSH_END");
#[allow(clippy::cast_possible_wrap)]
if let Ok(meta) = std::fs::metadata(output) {
crate::debug::emit_counter("sort_total_bytes_out", meta.len() as i64);
}
crate::debug::emit_marker("SORT_PASS2_END");
Ok(stats)
}
#[hotpath::measure]
fn build_blob_index(
input: &Path,
_direct_io: bool,
) -> Result<(crate::HeaderBlock, Vec<BlobEntry>)> {
let mut walker = HeaderWalker::open(input)?;
let mut entries = Vec::new();
let mut header: Option<crate::HeaderBlock> = None;
let mut data_buf: Vec<u8> = Vec::new();
let mut decompress_buf: Vec<u8> = Vec::new();
while let Some(meta) = walker.next_header()? {
match meta.blob_type {
BlobKind::OsmHeader if header.is_none() => {
walker.pread_data(meta.data_offset, meta.data_size, &mut data_buf)?;
header = Some(decode_blob_to_headerblock(&data_buf)?);
}
BlobKind::OsmData => {
let has_indexdata = meta.index.is_some();
let index = match meta.index {
Some(idx) => idx,
None => {
walker.pread_data(meta.data_offset, meta.data_size, &mut data_buf)?;
decompress_blob_data_into(&data_buf, &mut decompress_buf)?;
scan_block_ids(&decompress_buf).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"failed to scan block IDs",
)
})?
}
};
entries.push(BlobEntry {
file_offset: meta.frame_start,
frame_len: meta.frame_size as u64,
index,
has_indexdata,
tagdata: meta.tagdata,
});
}
_ => {
}
}
}
let header = header.ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidData, "no OSMHeader blob found")
})?;
Ok((header, entries))
}
fn type_order(kind: ElemKind) -> u8 {
match kind {
ElemKind::Node => 0,
ElemKind::Way => 1,
ElemKind::Relation => 2,
}
}
fn detect_overlaps(entries: &[BlobEntry]) -> Vec<bool> {
let mut overlaps = vec![false; entries.len()];
for i in 0..entries.len().saturating_sub(1) {
if entries[i].index.kind == entries[i + 1].index.kind
&& crate::osm_id::blob_osm_last_key(entries[i].index.min_id, entries[i].index.max_id)
>= crate::osm_id::blob_osm_first_key(entries[i + 1].index.min_id, entries[i + 1].index.max_id)
{
overlaps[i] = true;
overlaps[i + 1] = true;
}
}
overlaps
}
#[hotpath::measure]
fn write_passthrough_blob(
entry: &BlobEntry,
input_file: &mut File,
writer: &mut PbfWriter<FileWriter>,
frame_buf: &mut Vec<u8>,
) -> Result<()> {
if entry.has_indexdata {
read_frame_into(input_file, entry, frame_buf)?;
writer.write_raw(frame_buf)?;
} else {
read_frame_into(input_file, entry, frame_buf)?;
let blob_bytes = extract_blob_bytes(frame_buf)?;
let reframed = reframe_raw_with_index(blob_bytes, &entry.index.serialize(), entry.tagdata.as_deref())?;
writer.write_raw(&reframed)?;
}
Ok(())
}
#[allow(dead_code)] enum CopyRunStep {
Extended,
Started,
Fallback,
}
#[allow(unused_variables)]
fn try_extend_copy_run(
run: &mut Option<(u64, u64)>,
entry: &BlobEntry,
use_copy_range: bool,
writer: &mut PbfWriter<FileWriter>,
input_fd: i32,
calls: &mut u64,
) -> Result<CopyRunStep> {
if !(entry.has_indexdata && use_copy_range) {
return Ok(CopyRunStep::Fallback);
}
#[cfg(feature = "linux-direct-io")]
{
match *run {
Some((start, end)) if end == entry.file_offset => {
*run = Some((start, end + entry.frame_len));
Ok(CopyRunStep::Extended)
}
_ => {
flush_copy_run(run, writer, input_fd, calls)?;
*run = Some((entry.file_offset, entry.file_offset + entry.frame_len));
Ok(CopyRunStep::Started)
}
}
}
#[cfg(not(feature = "linux-direct-io"))]
Ok(CopyRunStep::Fallback)
}
#[cfg(feature = "linux-direct-io")]
fn flush_copy_run(
run: &mut Option<(u64, u64)>,
writer: &mut PbfWriter<FileWriter>,
input_fd: std::os::unix::io::RawFd,
calls: &mut u64,
) -> Result<()> {
if let Some((start, end)) = run.take() {
writer.write_raw_copy(input_fd, start, end - start)?;
*calls += 1;
}
Ok(())
}
#[cfg(not(feature = "linux-direct-io"))]
fn flush_copy_run(
run: &mut Option<(u64, u64)>,
_writer: &mut PbfWriter<FileWriter>,
_input_fd: i32,
_calls: &mut u64,
) -> Result<()> {
debug_assert!(run.is_none(), "copy_file_range run set without linux-direct-io feature");
run.take();
Ok(())
}
fn read_frame_into(
file: &mut File,
entry: &BlobEntry,
buf: &mut Vec<u8>,
) -> io::Result<()> {
file.seek(SeekFrom::Start(entry.file_offset))?;
#[allow(clippy::cast_possible_truncation)]
let len = entry.frame_len as usize;
buf.clear();
buf.resize(len, 0);
file.read_exact(buf)?;
Ok(())
}
fn extract_blob_bytes(frame: &[u8]) -> Result<&[u8]> {
if frame.len() < 4 {
return Err(
io::Error::new(io::ErrorKind::InvalidData, "frame too short").into(),
);
}
#[allow(clippy::cast_possible_truncation)]
let header_len = u32::from_be_bytes([frame[0], frame[1], frame[2], frame[3]]) as usize;
let blob_start = 4 + header_len;
if blob_start > frame.len() {
return Err(
io::Error::new(io::ErrorKind::InvalidData, "invalid header length").into(),
);
}
Ok(&frame[blob_start..])
}
fn count_entry(entry: &BlobEntry, stats: &mut SortStats) {
match entry.index.kind {
ElemKind::Node => stats.nodes += entry.index.count,
ElemKind::Way => stats.ways += entry.index.count,
ElemKind::Relation => stats.relations += entry.index.count,
}
}
#[derive(Default)]
struct OverlapCounts {
nodes: u64,
ways: u64,
relations: u64,
}
fn collect_overlap_runs(entries: &[BlobEntry], overlaps: &[bool]) -> Vec<(usize, usize, ElemKind)> {
let mut runs = Vec::new();
let mut i = 0;
while i < entries.len() {
if overlaps[i] {
let start = i;
let run_kind = entries[i].index.kind;
while i < entries.len() && overlaps[i] && entries[i].index.kind == run_kind {
i += 1;
}
runs.push((start, i, run_kind));
} else {
i += 1;
}
}
runs
}
fn compute_overlap_run_local(
entries: &[BlobEntry],
kind: ElemKind,
input_path: &Path,
) -> std::result::Result<(Vec<OwnedBlock>, OverlapCounts), String> {
let mut input_file = File::open(input_path).map_err(|e| e.to_string())?;
let mut bb = BlockBuilder::new();
let mut output: Vec<OwnedBlock> = Vec::new();
let mut counts = OverlapCounts::default();
match kind {
ElemKind::Node => {
counts.nodes = sweep_merge_local(
entries,
&mut input_file,
&mut bb,
&mut output,
|e, heap| match e {
Element::DenseNode(dn) => heap.push(Reverse(read_dense_node(dn))),
Element::Node(n) => heap.push(Reverse(read_node(n))),
_ => {}
},
write_single_node_local,
)?;
}
ElemKind::Way => {
counts.ways = sweep_merge_local(
entries,
&mut input_file,
&mut bb,
&mut output,
|e, heap| {
if let Element::Way(w) = e {
heap.push(Reverse(read_way(w)));
}
},
write_single_way_local,
)?;
}
ElemKind::Relation => {
counts.relations = sweep_merge_local(
entries,
&mut input_file,
&mut bb,
&mut output,
|e, heap| {
if let Element::Relation(r) = e {
heap.push(Reverse(read_relation(r)));
}
},
write_single_relation_local,
)?;
}
};
Ok((output, counts))
}
fn sweep_merge_local<T: Ord + HasId>(
entries: &[BlobEntry],
input_file: &mut File,
bb: &mut BlockBuilder,
output: &mut Vec<OwnedBlock>,
mut extract: impl FnMut(&Element<'_>, &mut BinaryHeap<Reverse<T>>),
mut write_elem: impl FnMut(&T, &mut BlockBuilder, &mut Vec<OwnedBlock>) -> std::result::Result<(), String>,
) -> std::result::Result<u64, String> {
let mut heap: BinaryHeap<Reverse<T>> = BinaryHeap::new();
let mut frame_buf: Vec<u8> = Vec::new();
let mut count: u64 = 0;
for entry in entries {
flush_heap_below_local(
&mut heap,
crate::osm_id::blob_osm_first_id(entry.index.min_id, entry.index.max_id),
|elem| {
write_elem(&elem, bb, output)?;
count += 1;
Ok(())
},
)?;
read_frame_into(input_file, entry, &mut frame_buf).map_err(|e| e.to_string())?;
let blob_bytes = extract_blob_bytes(&frame_buf).map_err(|e| e.to_string())?;
let block = decode_blob_to_primitiveblock(blob_bytes).map_err(|e| e.to_string())?;
for element in block.elements() {
extract(&element, &mut heap);
}
}
while let Some(Reverse(elem)) = heap.pop() {
write_elem(&elem, bb, output)?;
count += 1;
}
crate::commands::flush_local(bb, output)?;
Ok(count)
}
fn flush_heap_below_local<T: Ord + HasId>(
heap: &mut BinaryHeap<Reverse<T>>,
below: i64,
mut emit: impl FnMut(T) -> std::result::Result<(), String>,
) -> std::result::Result<(), String> {
while heap.peek().is_some_and(|Reverse(e)| crate::osm_id::osm_id_cmp(e.id(), below).is_lt()) {
if let Some(Reverse(element)) = heap.pop() {
emit(element)?;
}
}
Ok(())
}
trait HasId {
fn id(&self) -> i64;
}
impl HasId for OwnedNode {
fn id(&self) -> i64 { self.id }
}
impl HasId for OwnedWay {
fn id(&self) -> i64 { self.id }
}
impl HasId for OwnedRelation {
fn id(&self) -> i64 { self.id }
}