#![allow(clippy::unwrap_used, clippy::cognitive_complexity, clippy::too_many_lines)]
mod common;
use std::io::SeekFrom;
use std::path::Path;
use pbfhogg::block_builder::{self, BlockBuilder, MemberData};
use pbfhogg::writer::{Compression, PbfWriter};
use pbfhogg::{
BlobFilter, BlobReader, BlobType, ByteOffset, Element, ElementReader, IndexedReader, MemberId,
};
use tempfile::TempDir;
fn write_test_pbf(path: &Path) {
let file = std::fs::File::create(path).unwrap();
let mut writer = PbfWriter::new(file, Compression::default());
let header =
block_builder::HeaderBuilder::new().bbox(9.0, 54.0, 13.0, 58.0).build()
.unwrap();
writer.write_header(&header).unwrap();
let mut bb = BlockBuilder::new();
bb.add_node(100, 550_000_000, 120_000_000, [("name", "A")], None);
bb.add_node(200, 560_000_000, 130_000_000, [("name", "B")], None);
bb.add_node(300, -330_000_000, -580_000_000, std::iter::empty::<(&str, &str)>(), None);
writer
.write_primitive_block(bb.take().unwrap().unwrap())
.unwrap();
bb.add_way(1000, [("highway", "primary")], &[100, 200, 300], None);
bb.add_way(2000, [("building", "yes")], &[200, 300, 200], None);
writer
.write_primitive_block(bb.take().unwrap().unwrap())
.unwrap();
bb.add_relation(
5000,
[("type", "multipolygon")],
&[MemberData {
id: MemberId::Way(1000),
role: "outer",
}],
None,
);
writer
.write_primitive_block(bb.take().unwrap().unwrap())
.unwrap();
writer.flush().unwrap();
}
fn element_id(element: &Element<'_>) -> (char, i64) {
match element {
Element::Node(n) => ('n', n.id()),
Element::DenseNode(dn) => ('n', dn.id()),
Element::Way(w) => ('w', w.id()),
Element::Relation(r) => ('r', r.id()),
_ => ('?', 0),
}
}
fn collect_sequential(path: &Path) -> Vec<(char, i64)> {
let mut result = Vec::new();
let reader = ElementReader::from_path(path).unwrap();
reader
.for_each(|element| {
result.push(element_id(&element));
})
.unwrap();
result
}
#[test]
fn pipelined_matches_sequential() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.osm.pbf");
write_test_pbf(&path);
let sequential = collect_sequential(&path);
let mut pipelined = Vec::new();
let reader = ElementReader::from_path(&path).unwrap();
reader
.for_each_pipelined(|element| {
pipelined.push(element_id(&element));
})
.unwrap();
assert_eq!(sequential, pipelined);
}
#[test]
fn block_iterator_matches_pipelined() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.osm.pbf");
write_test_pbf(&path);
let sequential = collect_sequential(&path);
let mut from_iter = Vec::new();
let reader = ElementReader::from_path(&path).unwrap();
for block_result in reader.into_blocks_pipelined() {
let block = block_result.unwrap();
for element in block.elements() {
from_iter.push(element_id(&element));
}
}
assert_eq!(sequential, from_iter);
}
#[test]
fn block_iterator_early_drop() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.osm.pbf");
write_test_pbf(&path);
let reader = ElementReader::from_path(&path).unwrap();
let mut blocks = reader.into_blocks_pipelined();
let _first = blocks.next();
drop(blocks);
}
#[test]
fn block_type_classification() {
use pbfhogg::BlockType;
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.osm.pbf");
write_test_pbf(&path);
let reader = ElementReader::from_path(&path).unwrap();
let mut types = Vec::new();
for block_result in reader.into_blocks_pipelined() {
let block = block_result.unwrap();
types.push(block.block_type());
}
assert_eq!(types, vec![BlockType::DenseNodes, BlockType::Ways, BlockType::Relations]);
assert!(BlockType::DenseNodes.is_nodes());
assert!(BlockType::Nodes.is_nodes());
assert!(!BlockType::Ways.is_nodes());
assert!(BlockType::Ways.is_ways());
assert!(BlockType::Relations.is_relations());
assert!(!BlockType::Mixed.is_nodes());
}
#[test]
fn par_map_reduce_count() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.osm.pbf");
write_test_pbf(&path);
let sequential = collect_sequential(&path);
let expected_nodes = sequential.iter().filter(|(t, _)| *t == 'n').count() as u64;
let expected_ways = sequential.iter().filter(|(t, _)| *t == 'w').count() as u64;
let expected_relations = sequential.iter().filter(|(t, _)| *t == 'r').count() as u64;
let reader = ElementReader::from_path(&path).unwrap();
let (nodes, ways, relations) = reader
.par_map_reduce(
|element| match element {
Element::Node(_) | Element::DenseNode(_) => (1u64, 0u64, 0u64),
Element::Way(_) => (0, 1, 0),
Element::Relation(_) => (0, 0, 1),
_ => (0, 0, 0),
},
|| (0, 0, 0),
|a, b| (a.0 + b.0, a.1 + b.1, a.2 + b.2),
)
.unwrap();
assert_eq!(nodes, expected_nodes);
assert_eq!(ways, expected_ways);
assert_eq!(relations, expected_relations);
}
#[test]
fn par_map_reduce_collect_ids() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.osm.pbf");
write_test_pbf(&path);
let mut expected = collect_sequential(&path);
expected.sort();
let reader = ElementReader::from_path(&path).unwrap();
let mut actual: Vec<(char, i64)> = reader
.par_map_reduce(
|element| vec![element_id(&element)],
Vec::new,
|mut a, b| {
a.extend(b);
a
},
)
.unwrap();
actual.sort();
assert_eq!(expected, actual);
}
#[test]
fn blobreader_seek_to_start() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.osm.pbf");
write_test_pbf(&path);
let mut reader = BlobReader::seekable_from_path(&path).unwrap();
let first = reader.next().unwrap().unwrap();
assert_eq!(first.get_type(), BlobType::OsmHeader);
assert_eq!(first.offset(), Some(ByteOffset(0)));
reader.seek(ByteOffset(0)).unwrap();
let first_again = reader.next().unwrap().unwrap();
assert_eq!(first_again.get_type(), BlobType::OsmHeader);
assert_eq!(first_again.offset(), Some(ByteOffset(0)));
}
#[test]
fn blobreader_blob_from_offset() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.osm.pbf");
write_test_pbf(&path);
let mut reader = BlobReader::seekable_from_path(&path).unwrap();
let mut blobs_info: Vec<(String, ByteOffset)> = Vec::new();
for blob in reader.by_ref() {
let blob = blob.unwrap();
blobs_info.push((
blob.get_type().as_str().to_string(),
blob.offset().unwrap(),
));
}
for (expected_type, offset) in &blobs_info {
let blob = reader.blob_from_offset(*offset).unwrap();
assert_eq!(blob.get_type().as_str(), expected_type.as_str());
assert_eq!(blob.offset(), Some(*offset));
}
}
#[test]
fn blobreader_seek_raw() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.osm.pbf");
write_test_pbf(&path);
let mut reader = BlobReader::seekable_from_path(&path).unwrap();
let _ = reader.next().unwrap().unwrap();
let pos = reader.seek_raw(SeekFrom::Start(0)).unwrap();
assert_eq!(pos, 0);
let blob = reader.next().unwrap().unwrap();
assert_eq!(blob.get_type(), BlobType::OsmHeader);
let end_pos = reader.seek_raw(SeekFrom::End(0)).unwrap();
assert!(end_pos > 0);
assert!(reader.next().is_none());
}
#[test]
fn blobreader_seek_raw_clears_error_state() {
let dir = TempDir::new().unwrap();
let good_path = dir.path().join("good.osm.pbf");
write_test_pbf(&good_path);
let good_bytes = std::fs::read(&good_path).unwrap();
let mut bytes = vec![0xFFu8, 0xFF, 0xFF, 0xFF]; bytes.extend_from_slice(&good_bytes);
let mut reader = BlobReader::new(std::io::Cursor::new(bytes));
assert!(reader.next().unwrap().is_err());
assert!(reader.next().is_none(), "reader must stay dead until seek");
reader.seek_raw(SeekFrom::Start(4)).unwrap();
let blob = reader.next().unwrap().unwrap();
assert_eq!(blob.get_type(), BlobType::OsmHeader);
}
#[test]
fn blobreader_next_header_skip_blob() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.osm.pbf");
write_test_pbf(&path);
let reader = BlobReader::from_path(&path).unwrap();
let mut expected: Vec<(String, Option<ByteOffset>)> = Vec::new();
for blob in reader {
let blob = blob.unwrap();
expected.push((blob.get_type().as_str().to_string(), blob.offset()));
}
let mut reader = BlobReader::seekable_from_path(&path).unwrap();
let mut actual: Vec<(String, Option<ByteOffset>)> = Vec::new();
while let Some(result) = reader.next_header_skip_blob() {
let (header, offset) = result.unwrap();
actual.push((header.blob_type().as_str().to_string(), offset));
}
assert_eq!(expected.len(), actual.len());
for (e, a) in expected.iter().zip(actual.iter()) {
assert_eq!(e.0, a.0, "blob types must match");
assert_eq!(e.1, a.1, "offsets must match");
}
}
fn write_sorted_pbf(path: &Path) {
let file = std::fs::File::create(path).unwrap();
let mut writer = PbfWriter::new(file, Compression::default());
let header = block_builder::HeaderBuilder::new()
.bbox(9.0, 54.0, 13.0, 58.0)
.sorted()
.build()
.unwrap();
writer.write_header(&header).unwrap();
let mut bb = BlockBuilder::new();
bb.add_node(1, 550_000_000, 120_000_000, std::iter::empty::<(&str, &str)>(), None);
bb.add_node(2, 560_000_000, 130_000_000, std::iter::empty::<(&str, &str)>(), None);
bb.add_node(3, 570_000_000, 140_000_000, std::iter::empty::<(&str, &str)>(), None);
writer
.write_primitive_block(bb.take().unwrap().unwrap())
.unwrap();
writer.flush().unwrap();
}
#[test]
fn header_accessor() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.osm.pbf");
write_test_pbf(&path);
let reader = ElementReader::from_path(&path).unwrap();
let header = reader.header();
let bbox = header.bbox().unwrap();
assert!((bbox.left - 9.0).abs() < 1e-6);
assert!((bbox.bottom - 54.0).abs() < 1e-6);
assert_eq!(header.writing_program(), Some("pbfhogg"));
}
#[test]
fn header_is_sorted_true() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("sorted.osm.pbf");
write_sorted_pbf(&path);
let reader = ElementReader::from_path(&path).unwrap();
assert!(reader.header().is_sorted());
}
#[test]
fn header_is_sorted_false() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("unsorted.osm.pbf");
write_test_pbf(&path);
let reader = ElementReader::from_path(&path).unwrap();
assert!(!reader.header().is_sorted());
}
#[test]
fn header_consumed_elements_still_work() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.osm.pbf");
write_sorted_pbf(&path);
let reader = ElementReader::from_path(&path).unwrap();
assert!(reader.header().is_sorted());
let mut count = 0u64;
reader
.for_each(|_element| {
count += 1;
})
.unwrap();
assert_eq!(count, 3); }
#[test]
fn sorted_pbf_no_assertion_failure() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("sorted.osm.pbf");
write_sorted_pbf(&path);
let reader = ElementReader::from_path(&path).unwrap();
reader.for_each(|_| {}).unwrap();
let reader = ElementReader::from_path(&path).unwrap();
reader.for_each_pipelined(|_| {}).unwrap();
}
#[cfg(debug_assertions)]
#[test]
#[should_panic(expected = "Sort.Type_then_ID violated")]
fn sorted_flag_but_unsorted_nodes_panics() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("liar.osm.pbf");
let file = std::fs::File::create(&path).unwrap();
let mut writer = PbfWriter::new(file, Compression::default());
let header = block_builder::HeaderBuilder::new()
.sorted()
.build()
.unwrap();
writer.write_header(&header).unwrap();
let mut bb = BlockBuilder::new();
bb.add_node(100, 550_000_000, 120_000_000, std::iter::empty::<(&str, &str)>(), None);
bb.add_node(50, 560_000_000, 130_000_000, std::iter::empty::<(&str, &str)>(), None); writer
.write_primitive_block(bb.take().unwrap().unwrap())
.unwrap();
writer.flush().unwrap();
let reader = ElementReader::from_path(&path).unwrap();
reader.for_each(|_| {}).unwrap();
}
#[test]
fn blobfilter_only_ways_skips_node_blobs_on_indexed_input() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("indexed.osm.pbf");
common::write_test_pbf_sorted(
&path,
&common::generate_nodes(10, 1),
&common::generate_ways(5, 1_000, 2, 1),
&[],
);
common::assert_indexed(&path);
let reader = ElementReader::from_path(&path)
.unwrap()
.with_blob_filter(BlobFilter::only_ways());
let mut saw_nodes = 0u64;
let mut saw_ways = 0u64;
reader
.for_each_pipelined(|element| match element {
Element::Node(_) | Element::DenseNode(_) => saw_nodes += 1,
Element::Way(_) => saw_ways += 1,
_ => {}
})
.unwrap();
assert_eq!(saw_nodes, 0, "only_ways filter must skip node blobs on indexed input");
assert_eq!(saw_ways, 5, "only_ways filter must deliver all ways on indexed input");
}
#[test]
fn blobfilter_only_ways_passes_through_on_non_indexed_input() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("non_indexed.osm.pbf");
common::write_test_pbf_non_indexed(
&path,
&common::generate_nodes(10, 1),
&common::generate_ways(5, 1_000, 2, 1),
&[],
);
common::assert_non_indexed(&path);
let reader = ElementReader::from_path(&path)
.unwrap()
.with_blob_filter(BlobFilter::only_ways());
let mut saw_nodes = 0u64;
let mut saw_ways = 0u64;
reader
.for_each_pipelined(|element| match element {
Element::Node(_) | Element::DenseNode(_) => saw_nodes += 1,
Element::Way(_) => saw_ways += 1,
_ => {}
})
.unwrap();
assert_eq!(
saw_nodes, 10,
"BlobFilter on non-indexed input must NOT drop node blobs - callers get every element"
);
assert_eq!(saw_ways, 5, "ways still delivered");
}
#[test]
fn indexed_reader_output_matches_on_indexed_and_non_indexed_twins() {
let dir = TempDir::new().unwrap();
let indexed = dir.path().join("indexed.osm.pbf");
let non_indexed = dir.path().join("non_indexed.osm.pbf");
let nodes = common::generate_nodes(8, 1);
let mut ways = common::generate_ways(4, 1_000, 2, 1);
for (i, w) in ways.iter_mut().enumerate() {
if i % 2 == 0 {
w.tags = vec![("building", "yes")];
}
}
common::write_test_pbf_sorted(&indexed, &nodes, &ways, &[]);
common::write_test_pbf_non_indexed(&non_indexed, &nodes, &ways, &[]);
common::assert_indexed(&indexed);
common::assert_non_indexed(&non_indexed);
let collect = |path: &Path| -> (Vec<i64>, Vec<i64>) {
let mut reader = IndexedReader::from_path(path).unwrap();
let mut way_ids = Vec::new();
let mut node_ids = Vec::new();
reader
.read_ways_and_deps(
|w| w.tags().any(|(k, v)| k == "building" && v == "yes"),
|element| match element {
Element::Way(w) => way_ids.push(w.id()),
Element::Node(n) => node_ids.push(n.id()),
Element::DenseNode(n) => node_ids.push(n.id()),
_ => {}
},
)
.unwrap();
way_ids.sort_unstable();
node_ids.sort_unstable();
(way_ids, node_ids)
};
let (ways_idx, nodes_idx) = collect(&indexed);
let (ways_non, nodes_non) = collect(&non_indexed);
assert_eq!(ways_idx, ways_non, "way set diverges on non-indexed input");
assert_eq!(nodes_idx, nodes_non, "node dep set diverges on non-indexed input");
assert!(!ways_idx.is_empty(), "filter must match at least one way");
}
#[test]
fn trailing_partial_length_prefix_returns_ok_none() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("good.osm.pbf");
write_test_pbf(&path);
let good_bytes = std::fs::read(&path).unwrap();
for tail in 0..=3 {
let mut bytes = good_bytes.clone();
bytes.extend(std::iter::repeat_n(0xAAu8, tail));
let mut reader = BlobReader::new(std::io::Cursor::new(bytes));
let mut blob_count = 0;
loop {
match reader.next() {
Some(Ok(_)) => blob_count += 1,
Some(Err(e)) => panic!(
"reader must tolerate {tail} trailing bytes per the \
truncation reference doc; got Err: {e:?}"
),
None => break,
}
}
assert!(
blob_count >= 1,
"fixture must contain at least one blob (got {blob_count})"
);
}
}
#[test]
fn trailing_partial_length_prefix_4_bytes_is_committed_frame() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("good.osm.pbf");
write_test_pbf(&path);
let good_bytes = std::fs::read(&path).unwrap();
let mut bytes = good_bytes.clone();
bytes.extend_from_slice(&[0x00, 0x00, 0x00, 0x10]);
let mut reader = BlobReader::new(std::io::Cursor::new(bytes));
let mut errored = false;
loop {
match reader.next() {
Some(Ok(_)) => {}
Some(Err(_)) => {
errored = true;
break;
}
None => break,
}
}
assert!(
errored,
"4 trailing bytes (a complete length prefix declaring N>0 \
header bytes that don't follow) is shape 2, must hard-error"
);
}