use std::path::Path;
use crate::idset::IdSet;
use crate::BoxResult as Result;
use crate::owned::TypeFilter;
use crate::ElementReader;
pub struct VerifyIdsOptions<'a> {
pub full: bool,
pub type_filter: Option<&'a str>,
pub max_errors: usize,
pub direct_io: bool,
}
pub enum IdViolation {
NonMonotonic {
elem_type: &'static str,
id: i64,
prev_id: i64,
},
Duplicate {
elem_type: &'static str,
id: i64,
},
TypeOrder {
found: &'static str,
after: &'static str,
},
}
pub struct VerifyIdsReport {
pub header_sorted: bool,
pub indexed: bool,
pub full: bool,
pub node_count: u64,
pub way_count: u64,
pub relation_count: u64,
pub violations: Vec<IdViolation>,
pub total_violations: u64,
pub passed: bool,
}
impl VerifyIdsReport {
pub fn to_json_value(&self, file_name: &str) -> serde_json::Value {
let violations_json: Vec<serde_json::Value> =
self.violations.iter().map(violation_to_json).collect();
serde_json::json!({
"file": file_name,
"header_sorted": self.header_sorted,
"indexed": self.indexed,
"counts": {
"nodes": self.node_count,
"ways": self.way_count,
"relations": self.relation_count,
},
"passed": self.passed,
"total_violations": self.total_violations,
"violations": violations_json,
})
}
pub fn to_json(&self, file_name: &str) -> Result<String> {
Ok(serde_json::to_string_pretty(&self.to_json_value(file_name))?)
}
pub fn print_human(&self, file_name: &str) {
println!("Verify IDs: {file_name}");
println!(" Header sorted: {}", yes_no(self.header_sorted));
println!(" Indexed: {}", yes_no(self.indexed));
print_mode_line(self.full);
println!();
println!(
"Scanned {} nodes, {} ways, {} relations",
fmt_count(self.node_count),
fmt_count(self.way_count),
fmt_count(self.relation_count),
);
println!();
if self.passed {
println!("ID integrity: OK");
} else {
print_violations(self);
}
}
}
fn yes_no(b: bool) -> &'static str {
if b { "yes" } else { "no" }
}
fn print_mode_line(full: bool) {
if full {
println!(" Mode: full (duplicate detection)");
} else {
println!(" Mode: streaming");
}
}
fn print_violations(report: &VerifyIdsReport) {
let showing = report.violations.len();
let total = report.total_violations;
println!("{total} violations (showing first {showing} of {total}):");
for v in &report.violations {
print_single_violation(v);
}
println!();
println!("ID integrity: FAILED");
}
fn print_single_violation(v: &IdViolation) {
match v {
IdViolation::NonMonotonic {
elem_type,
id,
prev_id,
} => {
println!(" {elem_type} {id}: non-monotonic (previous: {prev_id})");
}
IdViolation::Duplicate { elem_type, id } => {
println!(" {elem_type} {id}: duplicate");
}
IdViolation::TypeOrder { found, after } => {
println!(" type order: {found} after {after}");
}
}
}
fn fmt_count(n: u64) -> String {
let s = n.to_string();
let mut result = String::with_capacity(s.len() + s.len() / 3);
for (i, ch) in s.chars().rev().enumerate() {
if i > 0 && i % 3 == 0 {
result.push(',');
}
result.push(ch);
}
result.chars().rev().collect()
}
fn violation_to_json(v: &IdViolation) -> serde_json::Value {
match v {
IdViolation::NonMonotonic {
elem_type,
id,
prev_id,
} => serde_json::json!({
"type": "non_monotonic",
"elem_type": elem_type,
"id": id,
"prev_id": prev_id,
}),
IdViolation::Duplicate { elem_type, id } => serde_json::json!({
"type": "duplicate",
"elem_type": elem_type,
"id": id,
}),
IdViolation::TypeOrder { found, after } => serde_json::json!({
"type": "type_order",
"found": found,
"after": after,
}),
}
}
#[hotpath::measure]
pub fn verify_ids(path: &Path, opts: &VerifyIdsOptions<'_>) -> Result<VerifyIdsReport> {
if opts.full {
return verify_ids_full_parallel(path, opts);
}
verify_ids_streaming_parallel(path, opts)
}
#[allow(clippy::too_many_lines)]
fn verify_ids_streaming_parallel(path: &Path, opts: &VerifyIdsOptions<'_>) -> Result<VerifyIdsReport> {
use crate::Element;
crate::debug::emit_marker("VERIFYIDS_SCAN_START");
#[cfg(target_os = "linux")]
unsafe {
libc::mallopt(libc::M_ARENA_MAX, 2);
}
let header_sorted = ElementReader::open(path, opts.direct_io)?.header().is_sorted();
let indexed = crate::commands::has_indexdata(path, opts.direct_io)?;
let type_filter = opts.type_filter.map_or_else(TypeFilter::all, TypeFilter::parse);
let (node_schedule, way_schedule, rel_schedule, shared_file) =
crate::scan::classify::build_classify_schedules_split(path)?;
let mut node_count: u64 = 0;
let mut way_count: u64 = 0;
let mut relation_count: u64 = 0;
let mut violations: Vec<IdViolation> = Vec::new();
let mut total_violations: u64 = 0;
if indexed {
check_type_order(&node_schedule, &way_schedule, &rel_schedule, &mut violations, &mut total_violations, opts.max_errors);
}
if type_filter.nodes {
crate::debug::emit_marker("VERIFYIDS_NODES_START");
let (count, phase_violations, phase_total) = verify_single_kind_streaming(
&shared_file,
&node_schedule,
"node",
opts.max_errors.saturating_sub(violations.len()),
|el| match el {
Element::DenseNode(dn) => Some(dn.id()),
Element::Node(n) => Some(n.id()),
_ => None,
},
)?;
node_count = count;
total_violations += phase_total;
violations.extend(phase_violations);
crate::debug::emit_marker("VERIFYIDS_NODES_END");
}
if type_filter.ways {
crate::debug::emit_marker("VERIFYIDS_WAYS_START");
let (count, phase_violations, phase_total) = verify_single_kind_streaming(
&shared_file,
&way_schedule,
"way",
opts.max_errors.saturating_sub(violations.len()),
|el| match el {
Element::Way(w) => Some(w.id()),
_ => None,
},
)?;
way_count = count;
total_violations += phase_total;
violations.extend(phase_violations);
crate::debug::emit_marker("VERIFYIDS_WAYS_END");
}
if type_filter.relations {
crate::debug::emit_marker("VERIFYIDS_RELATIONS_START");
let (count, phase_violations, phase_total) = verify_single_kind_streaming(
&shared_file,
&rel_schedule,
"relation",
opts.max_errors.saturating_sub(violations.len()),
|el| match el {
Element::Relation(r) => Some(r.id()),
_ => None,
},
)?;
relation_count = count;
total_violations += phase_total;
violations.extend(phase_violations);
crate::debug::emit_marker("VERIFYIDS_RELATIONS_END");
}
crate::debug::emit_marker("VERIFYIDS_SCAN_END");
Ok(VerifyIdsReport {
header_sorted,
indexed,
full: false,
node_count,
way_count,
relation_count,
passed: total_violations == 0,
total_violations,
violations,
})
}
#[allow(clippy::type_complexity)]
fn verify_single_kind_streaming(
shared_file: &std::sync::Arc<std::fs::File>,
schedule: &[(usize, u64, usize)],
elem_type: &'static str,
max_errors_remaining: usize,
extract_id: impl Fn(&crate::Element) -> Option<i64> + Send + Sync,
) -> Result<(u64, Vec<IdViolation>, u64)> {
if schedule.is_empty() {
return Ok((0, Vec::new(), 0));
}
let mut per_blob: Vec<Option<BlobVerifyResult>> = (0..schedule.len()).map(|_| None).collect();
let extract_ref = &extract_id;
crate::scan::classify::parallel_classify_phase(
shared_file,
schedule,
None,
|| (),
|block, _state| -> BlobVerifyResult {
let mut r = BlobVerifyResult::empty();
let mut prev: Option<i64> = None;
for el in block.elements_skip_metadata() {
if let Some(id) = extract_ref(&el) {
r.count += 1;
if r.first_id.is_none() {
r.first_id = Some(id);
}
r.last_id = Some(id);
if let Some(p) = prev
&& id <= p
{
r.within_violations.push(IdViolation::NonMonotonic {
elem_type,
id,
prev_id: p,
});
}
prev = Some(id);
}
}
r
},
|seq, r| {
per_blob[seq] = Some(r);
},
)?;
let mut count: u64 = 0;
let mut violations: Vec<IdViolation> = Vec::new();
let mut total_violations: u64 = 0;
let mut prev_last: Option<i64> = None;
for slot in per_blob {
let r = slot.expect("parallel_classify_phase must deliver every blob");
count += r.count;
for v in r.within_violations {
total_violations += 1;
if violations.len() < max_errors_remaining {
violations.push(v);
}
}
if let (Some(pl), Some(fi)) = (prev_last, r.first_id)
&& fi <= pl
{
total_violations += 1;
if violations.len() < max_errors_remaining {
violations.push(IdViolation::NonMonotonic {
elem_type,
id: fi,
prev_id: pl,
});
}
}
if r.last_id.is_some() {
prev_last = r.last_id;
}
}
Ok((count, violations, total_violations))
}
struct BlobVerifyResult {
first_id: Option<i64>,
last_id: Option<i64>,
count: u64,
within_violations: Vec<IdViolation>,
duplicate_ids: Vec<i64>,
}
impl BlobVerifyResult {
fn empty() -> Self {
Self {
first_id: None,
last_id: None,
count: 0,
within_violations: Vec::new(),
duplicate_ids: Vec::new(),
}
}
}
#[allow(clippy::too_many_lines)]
fn verify_ids_full_parallel(path: &Path, opts: &VerifyIdsOptions<'_>) -> Result<VerifyIdsReport> {
use crate::Element;
crate::debug::emit_marker("VERIFYIDS_SCAN_START");
#[cfg(target_os = "linux")]
unsafe {
libc::mallopt(libc::M_ARENA_MAX, 2);
}
let header_sorted = ElementReader::open(path, opts.direct_io)?.header().is_sorted();
let indexed = crate::commands::has_indexdata(path, opts.direct_io)?;
let type_filter = opts.type_filter.map_or_else(TypeFilter::all, TypeFilter::parse);
let mut node_ids = IdSet::new();
let mut way_ids = IdSet::new();
let mut relation_ids = IdSet::new();
if type_filter.nodes {
node_ids.pre_allocate(14_000_000_000);
}
if type_filter.ways {
way_ids.pre_allocate(1_500_000_000);
}
if type_filter.relations {
relation_ids.pre_allocate(25_000_000);
}
let (node_schedule, way_schedule, rel_schedule, shared_file) =
crate::scan::classify::build_classify_schedules_split(path)?;
let mut node_count: u64 = 0;
let mut way_count: u64 = 0;
let mut relation_count: u64 = 0;
let mut violations: Vec<IdViolation> = Vec::new();
let mut total_violations: u64 = 0;
if indexed {
check_type_order(&node_schedule, &way_schedule, &rel_schedule, &mut violations, &mut total_violations, opts.max_errors);
}
if type_filter.nodes {
crate::debug::emit_marker("VERIFYIDS_NODES_START");
let node_ids_ref = &node_ids;
let (count, phase_violations, phase_total) = verify_single_kind_parallel(
&shared_file,
&node_schedule,
node_ids_ref,
"node",
opts.max_errors.saturating_sub(violations.len()),
|el| match el {
Element::DenseNode(dn) => Some(dn.id()),
Element::Node(n) => Some(n.id()),
_ => None,
},
)?;
node_count = count;
total_violations += phase_total;
violations.extend(phase_violations);
crate::debug::emit_marker("VERIFYIDS_NODES_END");
}
if type_filter.ways {
crate::debug::emit_marker("VERIFYIDS_WAYS_START");
let way_ids_ref = &way_ids;
let (count, phase_violations, phase_total) = verify_single_kind_parallel(
&shared_file,
&way_schedule,
way_ids_ref,
"way",
opts.max_errors.saturating_sub(violations.len()),
|el| match el {
Element::Way(w) => Some(w.id()),
_ => None,
},
)?;
way_count = count;
total_violations += phase_total;
violations.extend(phase_violations);
crate::debug::emit_marker("VERIFYIDS_WAYS_END");
}
if type_filter.relations {
crate::debug::emit_marker("VERIFYIDS_RELATIONS_START");
let relation_ids_ref = &relation_ids;
let (count, phase_violations, phase_total) = verify_single_kind_parallel(
&shared_file,
&rel_schedule,
relation_ids_ref,
"relation",
opts.max_errors.saturating_sub(violations.len()),
|el| match el {
Element::Relation(r) => Some(r.id()),
_ => None,
},
)?;
relation_count = count;
total_violations += phase_total;
violations.extend(phase_violations);
crate::debug::emit_marker("VERIFYIDS_RELATIONS_END");
}
crate::debug::emit_marker("VERIFYIDS_SCAN_END");
Ok(VerifyIdsReport {
header_sorted,
indexed,
full: true,
node_count,
way_count,
relation_count,
passed: total_violations == 0,
total_violations,
violations,
})
}
#[allow(clippy::type_complexity)]
fn verify_single_kind_parallel(
shared_file: &std::sync::Arc<std::fs::File>,
schedule: &[(usize, u64, usize)],
id_set: &IdSet,
elem_type: &'static str,
max_errors_remaining: usize,
extract_id: impl Fn(&crate::Element) -> Option<i64> + Send + Sync,
) -> Result<(u64, Vec<IdViolation>, u64)> {
if schedule.is_empty() {
return Ok((0, Vec::new(), 0));
}
let mut per_blob: Vec<Option<BlobVerifyResult>> = (0..schedule.len()).map(|_| None).collect();
let extract_ref = &extract_id;
crate::scan::classify::parallel_classify_phase(
shared_file,
schedule,
None,
|| (),
|block, _state| -> BlobVerifyResult {
let mut r = BlobVerifyResult::empty();
let mut prev: Option<i64> = None;
for el in block.elements_skip_metadata() {
if let Some(id) = extract_ref(&el) {
r.count += 1;
if r.first_id.is_none() {
r.first_id = Some(id);
}
r.last_id = Some(id);
if let Some(p) = prev
&& id <= p
{
r.within_violations.push(IdViolation::NonMonotonic {
elem_type,
id,
prev_id: p,
});
}
prev = Some(id);
if !id_set.set_atomic_if_new(id) {
r.duplicate_ids.push(id);
}
}
}
r
},
|seq, r| {
per_blob[seq] = Some(r);
},
)?;
let mut count: u64 = 0;
let mut violations: Vec<IdViolation> = Vec::new();
let mut total_violations: u64 = 0;
let mut prev_last: Option<i64> = None;
for slot in per_blob {
let r = slot.expect("parallel_classify_phase must deliver every blob");
count += r.count;
for v in r.within_violations {
total_violations += 1;
if violations.len() < max_errors_remaining {
violations.push(v);
}
}
if let (Some(pl), Some(fi)) = (prev_last, r.first_id)
&& fi <= pl
{
total_violations += 1;
if violations.len() < max_errors_remaining {
violations.push(IdViolation::NonMonotonic {
elem_type,
id: fi,
prev_id: pl,
});
}
}
for id in r.duplicate_ids {
total_violations += 1;
if violations.len() < max_errors_remaining {
violations.push(IdViolation::Duplicate { elem_type, id });
}
}
if r.last_id.is_some() {
prev_last = r.last_id;
}
}
Ok((count, violations, total_violations))
}
fn check_type_order(
node_sched: &[(usize, u64, usize)],
way_sched: &[(usize, u64, usize)],
rel_sched: &[(usize, u64, usize)],
violations: &mut Vec<IdViolation>,
total_violations: &mut u64,
max_errors: usize,
) {
let record = |after: &'static str, found: &'static str, violations: &mut Vec<IdViolation>, total_violations: &mut u64| {
*total_violations += 1;
if violations.len() < max_errors {
violations.push(IdViolation::TypeOrder { found, after });
}
};
let max_node = node_sched.iter().map(|(_, o, _)| *o).max();
let min_way = way_sched.iter().map(|(_, o, _)| *o).min();
let max_way = way_sched.iter().map(|(_, o, _)| *o).max();
let min_rel = rel_sched.iter().map(|(_, o, _)| *o).min();
if let (Some(mn), Some(mw)) = (max_node, min_way)
&& mn > mw
{
record("node", "way", violations, total_violations);
}
if let (Some(mw), Some(mr)) = (max_way, min_rel)
&& mw > mr
{
record("way", "relation", violations, total_violations);
}
}