pub mod altw;
pub mod cat;
#[cfg(feature = "commands")]
pub mod check;
pub mod diff;
#[cfg(feature = "commands")]
pub mod extract;
pub mod getid;
pub mod getparents;
pub mod inspect;
pub mod apply_changes;
pub mod merge_changes;
pub mod renumber;
pub mod sort;
pub mod tags_count;
pub mod tags_filter;
pub mod time_filter;
use std::path::Path;
use crate::blob::BlobKind;
use crate::block_builder::{BlockBuilder, HeaderBuilder, Metadata, OwnedBlock};
use crate::file_reader::FileReader;
use crate::file_writer::FileWriter;
use crate::writer::{Compression, PbfWriter};
use crate::PrimitiveBlock;
pub(crate) type Result<T> = crate::BoxResult<T>;
pub(crate) const BATCH_SIZE: usize = 64;
pub(crate) const BATCH_BYTE_BUDGET: usize = 128 * 1024 * 1024;
pub(crate) const BATCH_MIN_BLOBS: usize = 8;
pub(crate) const BATCH_MAX_BLOBS: usize = 128;
pub(crate) fn for_each_primitive_block_batch<E>(
blocks: impl IntoIterator<Item = std::result::Result<PrimitiveBlock, E>>,
batch_size: usize,
mut process_batch: impl FnMut(&[PrimitiveBlock]) -> Result<()>,
) -> Result<()>
where
E: Into<Box<dyn std::error::Error>>,
{
for_each_primitive_block_batch_budgeted(blocks, batch_size, None, &mut process_batch)
}
pub(crate) fn for_each_primitive_block_batch_budgeted<E>(
blocks: impl IntoIterator<Item = std::result::Result<PrimitiveBlock, E>>,
max_blocks: usize,
max_bytes: Option<usize>,
process_batch: &mut dyn FnMut(&[PrimitiveBlock]) -> Result<()>,
) -> Result<()>
where
E: Into<Box<dyn std::error::Error>>,
{
let mut batch: Vec<PrimitiveBlock> = Vec::with_capacity(max_blocks);
let mut batch_bytes: usize = 0;
for block in blocks {
let block = block.map_err(Into::into)?;
batch_bytes += block.decompressed_size();
batch.push(block);
let over_byte_budget = max_bytes.is_some_and(|limit| batch_bytes >= limit);
if batch.len() >= max_blocks || over_byte_budget {
process_batch(&batch)?;
batch.clear();
batch_bytes = 0;
}
}
if !batch.is_empty() {
process_batch(&batch)?;
}
Ok(())
}
pub(crate) fn flush_passthrough_buf(
chunks: &mut Vec<Vec<u8>>,
writer: &mut PbfWriter<FileWriter>,
) -> Result<()> {
if !chunks.is_empty() {
writer.write_raw_chunks(std::mem::take(chunks))?;
}
Ok(())
}
pub(crate) fn flush_block(
bb: &mut BlockBuilder,
writer: &mut PbfWriter<FileWriter>,
) -> Result<()> {
if let Some((bytes, index, tagdata)) = bb.take_owned()? {
writer.write_primitive_block_owned(bytes, index, tagdata.as_deref())?;
}
Ok(())
}
pub(crate) fn ensure_node_capacity(
bb: &mut BlockBuilder,
writer: &mut PbfWriter<FileWriter>,
) -> Result<()> {
if !bb.can_add_node() {
flush_block(bb, writer)?;
}
Ok(())
}
pub(crate) fn ensure_way_capacity(
bb: &mut BlockBuilder,
writer: &mut PbfWriter<FileWriter>,
) -> Result<()> {
if !bb.can_add_way() {
flush_block(bb, writer)?;
}
Ok(())
}
pub(crate) fn ensure_relation_capacity(
bb: &mut BlockBuilder,
writer: &mut PbfWriter<FileWriter>,
) -> Result<()> {
if !bb.can_add_relation() {
flush_block(bb, writer)?;
}
Ok(())
}
pub(crate) fn drain_batch_results<S>(
results: Vec<std::result::Result<(Vec<OwnedBlock>, S), String>>,
writer: &mut PbfWriter<FileWriter>,
mut merge: impl FnMut(S),
) -> Result<()> {
for result in results {
let (blocks, stats) = result.map_err(|e| -> Box<dyn std::error::Error> { e.into() })?;
merge(stats);
for (block_bytes, index, tagdata) in blocks {
writer.write_primitive_block_owned(block_bytes, index, tagdata.as_deref())?;
}
}
Ok(())
}
pub(crate) fn flush_local(
bb: &mut BlockBuilder,
output: &mut Vec<OwnedBlock>,
) -> std::result::Result<(), String> {
if let Some(triple) = bb.take_owned().map_err(|e| e.to_string())? {
output.push(triple);
}
Ok(())
}
pub(crate) fn ensure_node_capacity_local(
bb: &mut BlockBuilder,
output: &mut Vec<OwnedBlock>,
) -> std::result::Result<(), String> {
if !bb.can_add_node() {
flush_local(bb, output)?;
}
Ok(())
}
pub(crate) fn ensure_way_capacity_local(
bb: &mut BlockBuilder,
output: &mut Vec<OwnedBlock>,
) -> std::result::Result<(), String> {
if !bb.can_add_way() {
flush_local(bb, output)?;
}
Ok(())
}
pub(crate) fn ensure_relation_capacity_local(
bb: &mut BlockBuilder,
output: &mut Vec<OwnedBlock>,
) -> std::result::Result<(), String> {
if !bb.can_add_relation() {
flush_local(bb, output)?;
}
Ok(())
}
pub(crate) fn warn_locations_on_ways_loss(header: &crate::HeaderBlock) {
if header.has_locations_on_ways() {
eprintln!(
"Warning: input PBF has LocationsOnWays (inline way-node coordinates). \
These will not be preserved in the output."
);
}
}
#[derive(Default)]
pub struct HeaderOverrides {
pub generator: Option<String>,
pub replication_timestamp: Option<i64>,
pub replication_sequence_number: Option<i64>,
pub replication_base_url: Option<String>,
}
impl HeaderOverrides {
pub fn parse(generator: Option<String>, output_headers: &[String]) -> Result<Self> {
let mut ov = HeaderOverrides {
generator,
..Default::default()
};
for entry in output_headers {
let (key, value) = entry.split_once('=').ok_or_else(|| {
format!("invalid --output-header format: '{entry}' (expected key=value)")
})?;
match key {
"osmosis_replication_timestamp" => {
ov.replication_timestamp = Some(value.parse::<i64>().map_err(|_| {
format!("invalid osmosis_replication_timestamp: '{value}'")
})?);
}
"osmosis_replication_sequence_number" => {
ov.replication_sequence_number =
Some(value.parse::<i64>().map_err(|_| {
format!("invalid osmosis_replication_sequence_number: '{value}'")
})?);
}
"osmosis_replication_base_url" => {
ov.replication_base_url = Some(value.to_string());
}
_ => return Err(format!("unknown --output-header key: '{key}'").into()),
}
}
Ok(ov)
}
pub(crate) fn apply<'a>(&'a self, mut hb: HeaderBuilder<'a>) -> HeaderBuilder<'a> {
if let Some(program) = &self.generator {
hb = hb.writing_program(program);
}
if let Some(ts) = self.replication_timestamp {
hb = hb.replication_timestamp(ts);
}
if let Some(seq) = self.replication_sequence_number {
hb = hb.replication_sequence_number(seq);
}
if let Some(url) = &self.replication_base_url {
hb = hb.replication_base_url(url);
}
hb
}
}
pub(crate) fn build_output_header(
header: &crate::HeaderBlock,
preserve_sorted: bool,
overrides: &HeaderOverrides,
configure: impl FnOnce(HeaderBuilder) -> HeaderBuilder,
) -> Result<Vec<u8>> {
let mut hb = configure(HeaderBuilder::from_header(header));
if preserve_sorted && header.is_sorted() {
hb = hb.sorted();
}
hb = overrides.apply(hb);
Ok(hb.build()?)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn writer_from_header(
output: &Path,
compression: Compression,
header: &crate::HeaderBlock,
preserve_sorted: bool,
overrides: &HeaderOverrides,
configure: impl FnOnce(HeaderBuilder) -> HeaderBuilder,
direct_io: bool,
io_uring: bool,
) -> Result<PbfWriter<FileWriter>> {
let header_bytes = build_output_header(header, preserve_sorted, overrides, configure)?;
writer_from_header_bytes(output, compression, &header_bytes, direct_io, io_uring)
}
pub(crate) fn writer_from_header_bytes(
output: &Path,
compression: Compression,
header_bytes: &[u8],
direct_io: bool,
io_uring: bool,
) -> Result<PbfWriter<FileWriter>> {
if io_uring {
#[cfg(feature = "linux-io-uring")]
{
Ok(PbfWriter::to_path_uring(output, compression, header_bytes)?)
}
#[cfg(not(feature = "linux-io-uring"))]
{
Err("--io-uring requires the linux-io-uring feature".into())
}
} else if direct_io {
#[cfg(feature = "linux-direct-io")]
{
Ok(PbfWriter::to_path_direct(output, compression, header_bytes)?)
}
#[cfg(not(feature = "linux-direct-io"))]
{
Err("--direct-io requires the linux-direct-io feature".into())
}
} else {
Ok(PbfWriter::to_path(output, compression, header_bytes)?)
}
}
pub(crate) fn writer_for_apply_changes(
output: &Path,
compression: Compression,
header_bytes: &[u8],
direct_io: bool,
io_uring: bool,
) -> Result<PbfWriter<FileWriter>> {
if io_uring {
#[cfg(feature = "linux-io-uring")]
{
Ok(PbfWriter::to_path_uring(output, compression, header_bytes)?)
}
#[cfg(not(feature = "linux-io-uring"))]
{
Err("--io-uring requires the linux-io-uring feature".into())
}
} else if direct_io {
#[cfg(feature = "linux-direct-io")]
{
Ok(PbfWriter::to_path_direct(output, compression, header_bytes)?)
}
#[cfg(not(feature = "linux-direct-io"))]
{
Err("--direct-io requires the linux-direct-io feature".into())
}
} else {
Ok(PbfWriter::to_path_parallel(output, compression, header_bytes)?)
}
}
pub(crate) fn clean_metadata<'a>(meta: Option<Metadata<'a>>, clean: &cat::CleanAttrs) -> Option<Metadata<'a>> {
if !clean.any() {
return meta;
}
meta.map(|mut m| {
if clean.version { m.version = 0; }
if clean.changeset { m.changeset = 0; }
if clean.timestamp { m.timestamp = 0; }
if clean.uid { m.uid = 0; }
if clean.user { m.user = ""; }
m
})
}
pub(crate) fn require_indexdata(
path: &Path,
direct_io: bool,
force: bool,
reason: &str,
) -> Result<bool> {
let present = has_indexdata(path, direct_io)?;
if !force && !present {
return Err(format!(
"{reason}\n\n\
Generate an indexed PBF first:\n\n\
\x20 pbfhogg cat input.osm.pbf -o indexed.osm.pbf\n\n\
Or pass --force to proceed anyway."
)
.into());
}
Ok(present)
}
pub(crate) fn require_sorted(
header: &crate::HeaderBlock,
path: &Path,
context: &str,
) -> Result<()> {
if !header.is_sorted() {
return Err(format!(
"{context} is not sorted (missing Sort.Type_then_ID optional feature).\n\
File: {}\n\n\
Sort the input file first:\n\n\
\x20 pbfhogg sort {} -o sorted.osm.pbf\n\n\
Streaming diff requires sorted inputs to operate in constant memory.",
path.display(),
path.display(),
)
.into());
}
Ok(())
}
pub(crate) fn require_sorted_err(path: &Path, context: &str) -> Result<()> {
Err(format!(
"{context} is not sorted (missing Sort.Type_then_ID optional feature).\n\
File: {}\n\n\
Sort the input file first:\n\n\
\x20 pbfhogg sort {} -o sorted.osm.pbf\n\n\
Streaming diff requires sorted inputs to operate in constant memory.",
path.display(),
path.display(),
)
.into())
}
pub fn has_indexdata(path: &Path, direct_io: bool) -> Result<bool> {
let mut reader = FileReader::open(path, direct_io)?;
let mut offset = 0u64;
while let Some(info) = crate::read::raw_frame::read_blob_header_only(&mut reader, &mut offset)? {
if matches!(info.blob_type, BlobKind::OsmData) {
return Ok(info.index.is_some());
}
reader.skip(info.data_size as u64)?;
offset += info.data_size as u64;
}
Ok(false)
}
pub(crate) fn format_epoch_secs(secs: u64) -> String {
let secs = secs.cast_signed();
let day_secs = secs.rem_euclid(86400);
let days = (secs - day_secs) / 86400;
let z = days + 719_468;
let era = z.div_euclid(146_097);
let doe = z.rem_euclid(146_097);
let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
let y = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let m = if mp < 10 { mp + 3 } else { mp - 9 };
let y = if m <= 2 { y + 1 } else { y };
let h = day_secs / 3600;
let min = (day_secs % 3600) / 60;
let s = day_secs % 60;
format!("{y:04}-{m:02}-{d:02}T{h:02}:{min:02}:{s:02}Z")
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
fn make_block(target_size: usize) -> PrimitiveBlock {
assert!(target_size >= 2, "minimum block size is 2 bytes");
let mut buf = Vec::with_capacity(target_size);
buf.push(0x0a);
buf.push(0x00);
if target_size > 2 {
buf.push(0xfa);
buf.push(0x01);
let remaining = target_size - 4; let varint_len = if remaining.saturating_sub(1) < 128 { 1 } else { 2 };
let pad_len = remaining - varint_len;
assert!(pad_len < 16384, "test helper limited to ~16 KB blocks");
if varint_len == 1 {
#[allow(clippy::cast_possible_truncation)]
buf.push(pad_len as u8);
} else {
#[allow(clippy::cast_possible_truncation)]
{
buf.push((pad_len as u8 & 0x7f) | 0x80);
buf.push((pad_len >> 7) as u8);
}
}
buf.resize(buf.len() + pad_len, 0x00);
}
assert_eq!(buf.len(), target_size, "block size mismatch");
PrimitiveBlock::new(Bytes::from(buf)).expect("valid minimal protobuf")
}
#[test]
fn budgeted_batch_flushes_on_max_blocks() {
let blocks: Vec<std::result::Result<PrimitiveBlock, crate::Error>> =
(0..10).map(|_| Ok(make_block(100))).collect();
let mut batch_sizes = Vec::new();
for_each_primitive_block_batch_budgeted(blocks, 4, None, &mut |batch| {
batch_sizes.push(batch.len());
Ok(())
})
.expect("should not fail");
assert_eq!(batch_sizes, vec![4, 4, 2]);
}
#[test]
fn budgeted_batch_flushes_on_max_bytes() {
let blocks: Vec<std::result::Result<PrimitiveBlock, crate::Error>> =
(0..5).map(|_| Ok(make_block(2000))).collect();
let mut batch_sizes = Vec::new();
for_each_primitive_block_batch_budgeted(blocks, 64, Some(4500), &mut |batch| {
batch_sizes.push(batch.len());
Ok(())
})
.expect("should not fail");
assert_eq!(batch_sizes, vec![3, 2]);
}
#[test]
fn budgeted_batch_both_limits_active() {
let blocks: Vec<std::result::Result<PrimitiveBlock, crate::Error>> =
(0..7).map(|_| Ok(make_block(2000))).collect();
let mut batch_sizes = Vec::new();
for_each_primitive_block_batch_budgeted(blocks, 3, Some(5000), &mut |batch| {
batch_sizes.push(batch.len());
Ok(())
})
.expect("should not fail");
assert_eq!(batch_sizes, vec![3, 3, 1]);
}
#[test]
fn budgeted_batch_byte_limit_smaller_than_one_block() {
let blocks: Vec<std::result::Result<PrimitiveBlock, crate::Error>> =
(0..3).map(|_| Ok(make_block(2000))).collect();
let mut batch_sizes = Vec::new();
for_each_primitive_block_batch_budgeted(blocks, 64, Some(50), &mut |batch| {
batch_sizes.push(batch.len());
Ok(())
})
.expect("should not fail");
assert_eq!(batch_sizes, vec![1, 1, 1]);
}
#[test]
fn budgeted_batch_empty_input() {
let blocks: Vec<std::result::Result<PrimitiveBlock, crate::Error>> = Vec::new();
let mut called = false;
for_each_primitive_block_batch_budgeted(blocks, 64, Some(1000), &mut |_batch| {
called = true;
Ok(())
})
.expect("should not fail");
assert!(!called);
}
}