use std::path::Path;
use crate::block_builder::{BlockBuilder, MemberData, OwnedBlock};
use crate::cat::CleanAttrs;
use crate::writer::Compression;
use crate::{Element, MemberId, PrimitiveBlock};
use super::super::{Result, writer_from_header, HeaderOverrides,
ensure_node_capacity_local, ensure_way_capacity_local, ensure_relation_capacity_local,
};
use crate::idset::IdSet;
use super::common::{
BboxInt, BlobDesc, pread_write_pass, pread_write_pass_with_schedule,
relation_has_matched_member, spatial_blob_filter,
};
use super::{ExtractStats, Region};
pub(super) struct Pass1Result {
pub(super) bbox_node_ids: IdSet,
pub(super) matched_way_ids: IdSet,
pub(super) all_way_node_ids: IdSet,
pub(super) matched_relation_ids: IdSet,
pub(super) way_schedule: Vec<(usize, u64, usize)>,
pub(super) pass3_blob_schedule: Vec<BlobDesc>,
}
pub(super) trait RelationHandler {
const COLLECT_MEMBER_IDS: bool;
fn handle_relation(&mut self, r: &crate::Relation);
fn merge_worker_extras(&mut self, extra_way_ids: IdSet, extra_node_ids: IdSet);
}
pub(super) struct CompleteRelationHandler;
impl RelationHandler for CompleteRelationHandler {
const COLLECT_MEMBER_IDS: bool = false;
fn handle_relation(&mut self, _r: &crate::Relation) {}
fn merge_worker_extras(&mut self, _extra_way_ids: IdSet, _extra_node_ids: IdSet) {}
}
struct SmartRelationHandler {
extra_way_ids: IdSet,
extra_node_ids: IdSet,
}
impl SmartRelationHandler {
fn new() -> Self {
Self {
extra_way_ids: IdSet::new(),
extra_node_ids: IdSet::new(),
}
}
}
impl RelationHandler for SmartRelationHandler {
const COLLECT_MEMBER_IDS: bool = true;
fn handle_relation(&mut self, r: &crate::Relation) {
if is_smart_relation(r) {
for m in r.members() {
match m.id {
MemberId::Way(id) => self.extra_way_ids.set(id),
MemberId::Node(id) => self.extra_node_ids.set(id),
MemberId::Relation(_) | MemberId::Unknown(_, _) => {}
}
}
}
}
fn merge_worker_extras(&mut self, extra_way_ids: IdSet, extra_node_ids: IdSet) {
self.extra_way_ids.merge(extra_way_ids);
self.extra_node_ids.merge(extra_node_ids);
}
}
#[allow(clippy::cognitive_complexity, clippy::too_many_lines)]
#[hotpath::measure]
pub(super) fn collect_pass1_generic<H: RelationHandler>(
input: &Path,
region: &Region,
bbox_int: &BboxInt,
direct_io: bool,
handler: &mut H,
) -> Result<Pass1Result> {
let mut bbox_node_ids = IdSet::new();
let mut matched_way_ids = IdSet::new();
let mut all_way_node_ids = IdSet::new();
let mut matched_relation_ids = IdSet::new();
let mut blob_reader = crate::blob::BlobReader::open(input, direct_io)?;
blob_reader.set_parse_indexdata(true);
let header_blob = blob_reader.next()
.ok_or_else(|| crate::error::new_error(crate::error::ErrorKind::MissingHeader))??;
let is_sorted = header_blob.to_headerblock()?.is_sorted();
let filter = spatial_blob_filter(bbox_int);
let mut decompress_buf: Vec<u8> = Vec::new();
if !is_sorted {
for blob_result in &mut blob_reader {
let blob = blob_result?;
if !matches!(blob.get_type(), crate::blob::BlobType::OsmData) { continue; }
if let Some(idx) = blob.index() {
if !filter.wants_index(&idx) { continue; }
}
blob.decompress_into(&mut decompress_buf)?;
let block = PrimitiveBlock::from_vec(std::mem::take(&mut decompress_buf))?;
for element in block.elements_skip_metadata() {
match &element {
Element::DenseNode(dn)
if region.contains_decimicro(bbox_int, dn.decimicro_lat(), dn.decimicro_lon()) =>
{
bbox_node_ids.set(dn.id());
}
Element::Node(n)
if region.contains_decimicro(bbox_int, n.decimicro_lat(), n.decimicro_lon()) =>
{
bbox_node_ids.set(n.id());
}
Element::Way(w)
if w.refs().any(|r| bbox_node_ids.get(r)) =>
{
matched_way_ids.set(w.id());
for r in w.refs() {
all_way_node_ids.set(r);
}
}
Element::Relation(r)
if relation_has_matched_member(r, &bbox_node_ids, &matched_way_ids) =>
{
matched_relation_ids.set(r.id());
handler.handle_relation(r);
}
_ => {}
}
}
}
return Ok(Pass1Result {
bbox_node_ids, matched_way_ids, all_way_node_ids, matched_relation_ids,
way_schedule: Vec::new(),
pass3_blob_schedule: Vec::new(),
});
}
drop(blob_reader);
drop(decompress_buf);
crate::debug::emit_marker("SMART_PASS1_SCHEDULE_SCAN_START");
let mut walker = crate::read::header_walker::HeaderWalker::open(input)?;
let _ = walker
.next_header()?
.ok_or_else(|| crate::error::new_error(crate::error::ErrorKind::MissingHeader))?;
let mut node_schedule: Vec<(usize, u64, usize)> = Vec::new();
let mut way_schedule: Vec<(usize, u64, usize)> = Vec::new();
let mut relation_schedule: Vec<(usize, u64, usize)> = Vec::new();
let mut full_way_schedule: Vec<(usize, u64, usize)> = Vec::new();
let mut pass3_blob_schedule: Vec<BlobDesc> = Vec::new();
let mut seq: usize = 0;
while let Some(meta) = walker.next_header()? {
if !matches!(meta.blob_type, crate::blob::BlobKind::OsmData) { continue; }
let idx = meta.index.as_ref();
let bbox = idx.and_then(|i| i.bbox);
let count = idx.map_or(0, |i| i.count);
let kind_for_blob = idx.map(|i| i.kind);
pass3_blob_schedule.push(BlobDesc {
frame_offset: meta.frame_start,
frame_size: meta.frame_size,
offset: meta.data_offset,
size: meta.data_size,
kind: kind_for_blob,
bbox,
count,
raw_passthrough: false,
});
if let Some(idx) = idx {
if matches!(idx.kind, crate::blob_meta::ElemKind::Way) {
full_way_schedule.push((seq, meta.data_offset, meta.data_size));
}
if !filter.wants_index(idx) { continue; }
match idx.kind {
crate::blob_meta::ElemKind::Node => node_schedule.push((seq, meta.data_offset, meta.data_size)),
crate::blob_meta::ElemKind::Way => way_schedule.push((seq, meta.data_offset, meta.data_size)),
crate::blob_meta::ElemKind::Relation => relation_schedule.push((seq, meta.data_offset, meta.data_size)),
}
} else {
node_schedule.push((seq, meta.data_offset, meta.data_size));
way_schedule.push((seq, meta.data_offset, meta.data_size));
relation_schedule.push((seq, meta.data_offset, meta.data_size));
full_way_schedule.push((seq, meta.data_offset, meta.data_size));
}
seq += 1;
}
crate::debug::emit_marker("SMART_PASS1_SCHEDULE_SCAN_END");
#[allow(clippy::cast_possible_wrap)]
{
crate::debug::emit_counter("smart_pass1_node_blobs", node_schedule.len() as i64);
crate::debug::emit_counter("smart_pass1_way_blobs", way_schedule.len() as i64);
crate::debug::emit_counter("smart_pass1_relation_blobs", relation_schedule.len() as i64);
crate::debug::emit_counter("smart_pass1_full_way_blobs", full_way_schedule.len() as i64);
crate::debug::emit_counter("smart_pass1_pass3_blobs", pass3_blob_schedule.len() as i64);
}
let shared_file = std::sync::Arc::clone(walker.shared_file());
drop(walker);
crate::debug::emit_marker("PASS1_NODE_CLASSIFY_START");
let use_columnar = matches!(region, Region::Bbox(_));
crate::scan::classify::parallel_classify_phase(
&shared_file,
&node_schedule,
None,
|| (crate::read::columnar::DenseNodeColumns::new(), Vec::<i64>::new()),
|block, (columns, ids)| {
ids.clear();
if use_columnar {
block.decode_dense_columns(columns);
columns.collect_matching_ids_bbox(
bbox_int.min_lat, bbox_int.max_lat,
bbox_int.min_lon, bbox_int.max_lon,
ids,
);
} else {
for element in block.elements_skip_metadata() {
match &element {
Element::DenseNode(dn)
if region.contains_decimicro(bbox_int, dn.decimicro_lat(), dn.decimicro_lon()) =>
{
ids.push(dn.id());
}
Element::Node(n)
if region.contains_decimicro(bbox_int, n.decimicro_lat(), n.decimicro_lon()) =>
{
ids.push(n.id());
}
_ => {}
}
}
}
std::mem::take(ids)
},
|_seq, ids| {
for id in ids { bbox_node_ids.set(id); }
},
)?;
crate::debug::emit_marker("PASS1_NODE_CLASSIFY_END");
crate::debug::emit_marker("PASS1_WAY_CLASSIFY_START");
crate::scan::classify::parallel_classify_phase(
&shared_file,
&way_schedule,
None,
|| (),
|block, _s| {
let mut way_ids = Vec::new();
let mut node_ids = Vec::new();
for element in block.elements_skip_metadata() {
if let Element::Way(w) = &element {
if w.refs().any(|r| bbox_node_ids.get(r)) {
way_ids.push(w.id());
node_ids.extend(w.refs());
}
}
}
(way_ids, node_ids)
},
|_seq, (way_ids, node_ids)| {
for id in way_ids { matched_way_ids.set(id); }
for id in node_ids { all_way_node_ids.set(id); }
},
)?;
crate::debug::emit_marker("PASS1_WAY_CLASSIFY_END");
crate::debug::emit_marker("PASS1_RELATION_CLASSIFY_START");
let collect_member_ids = H::COLLECT_MEMBER_IDS;
crate::scan::classify::parallel_classify_accumulate(
&shared_file,
&relation_schedule,
None,
|| (IdSet::new(), IdSet::new(), IdSet::new()),
|block, (rel_ids, extra_way_ids, extra_node_ids)| {
for element in block.elements_skip_metadata() {
if let Element::Relation(r) = &element {
if relation_has_matched_member(r, &bbox_node_ids, &matched_way_ids) {
rel_ids.set(r.id());
if collect_member_ids && is_smart_relation(r) {
for m in r.members() {
match m.id {
MemberId::Way(id) => extra_way_ids.set(id),
MemberId::Node(id) => extra_node_ids.set(id),
MemberId::Relation(_) | MemberId::Unknown(_, _) => {}
}
}
}
}
}
}
},
|(worker_rel_ids, worker_extra_way_ids, worker_extra_node_ids)| {
matched_relation_ids.merge(worker_rel_ids);
handler.merge_worker_extras(worker_extra_way_ids, worker_extra_node_ids);
},
)?;
crate::debug::emit_marker("PASS1_RELATION_CLASSIFY_END");
Ok(Pass1Result {
bbox_node_ids, matched_way_ids, all_way_node_ids, matched_relation_ids,
way_schedule: full_way_schedule,
pass3_blob_schedule,
})
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
#[allow(clippy::too_many_lines, clippy::too_many_arguments)]
pub(super) fn extract_smart(
input: &Path,
output: &Path,
region: &Region,
set_bounds: bool,
clean: &CleanAttrs,
compression: Compression,
direct_io: bool,
overrides: &HeaderOverrides,
) -> Result<ExtractStats> {
let mut stats = ExtractStats {
nodes_in_bbox: 0,
nodes_from_ways: 0,
nodes_from_relations: 0,
ways_written: 0,
ways_from_relations: 0,
relations_written: 0,
strategy: "smart",
};
crate::debug::emit_marker("SMART_PASS1_START");
let bbox_int = BboxInt::from_bbox(region.bbox());
let mut handler = SmartRelationHandler::new();
let mut result = collect_pass1_generic(input, region, &bbox_int, direct_io, &mut handler)?;
let mut extra_node_ids = handler.extra_node_ids;
crate::debug::emit_marker("SMART_PASS1_END");
crate::debug::emit_mallinfo2("MI_PASS1_END");
crate::debug::emit_marker("SMART_PASS2_START");
{
crate::debug::emit_marker("SMART_PASS2_SCHEDULE_START");
let pass1_way_schedule = std::mem::take(&mut result.way_schedule);
let (way_schedule, shared_file) = if pass1_way_schedule.is_empty() {
crate::scan::classify::build_classify_schedule(input, Some(crate::blob_meta::ElemKind::Way))?
} else {
let shared_file = std::sync::Arc::new(
std::fs::File::open(input)
.map_err(|e| format!("failed to open {}: {e}", input.display()))?
);
(pass1_way_schedule, shared_file)
};
crate::debug::emit_marker("SMART_PASS2_SCHEDULE_END");
let extra_way_ids_ref = &handler.extra_way_ids;
let matched_way_ids_ref = &result.matched_way_ids;
crate::debug::emit_marker("SMART_PASS2_CLASSIFY_START");
crate::scan::classify::parallel_classify_phase(
&shared_file,
&way_schedule,
None,
Vec::<i64>::new,
|block, scratch| {
scratch.clear();
for element in block.elements_skip_metadata() {
if let Element::Way(w) = &element {
let wid = w.id();
if extra_way_ids_ref.get(wid) && !matched_way_ids_ref.get(wid) {
for r in w.refs() { scratch.push(r); }
}
}
}
std::mem::take(scratch)
},
|_seq, refs| {
for id in refs { extra_node_ids.set(id); }
},
)?;
crate::debug::emit_marker("SMART_PASS2_CLASSIFY_END");
}
crate::debug::emit_marker("SMART_PASS2_END");
crate::debug::emit_marker("SMART_PASS3_START");
crate::debug::emit_marker("SMART_PASS3_SETUP_START");
let mut header_reader = crate::blob::BlobReader::open(input, direct_io)?;
let header_blob = header_reader.next()
.ok_or_else(|| crate::error::new_error(crate::error::ErrorKind::MissingHeader))??;
let header = header_blob.to_headerblock()?;
drop(header_reader);
super::super::warn_locations_on_ways_loss(&header);
let bbox = region.bbox();
let mut writer = writer_from_header(output, compression, &header, false, overrides, |hb| {
let hb = if set_bounds {
hb.bbox(bbox.min_lon, bbox.min_lat, bbox.max_lon, bbox.max_lat)
} else {
hb
};
hb.sorted()
}, direct_io, false)?;
let pass1_blob_schedule = std::mem::take(&mut result.pass3_blob_schedule);
let ids = ExtractPass3IdSets {
bbox_node_ids: &result.bbox_node_ids,
all_way_node_ids: &result.all_way_node_ids,
extra_node_ids: &extra_node_ids,
matched_way_ids: &result.matched_way_ids,
extra_way_ids: &handler.extra_way_ids,
matched_relation_ids: &result.matched_relation_ids,
};
crate::debug::emit_marker("SMART_PASS3_SETUP_END");
crate::debug::emit_marker("SMART_PASS3_WRITE_START");
if pass1_blob_schedule.is_empty() {
pread_write_pass(input, &mut writer, &mut stats, |block, bb, output_blocks| {
extract_block_pass3(block, &ids, clean, bb, output_blocks)
})?;
} else {
pread_write_pass_with_schedule(input, &pass1_blob_schedule, &mut writer, &mut stats, |block, bb, output_blocks| {
extract_block_pass3(block, &ids, clean, bb, output_blocks)
})?;
}
crate::debug::emit_marker("SMART_PASS3_WRITE_END");
crate::debug::emit_marker("SMART_PASS3_END");
Ok(stats)
}
struct ExtractPass3IdSets<'a> {
bbox_node_ids: &'a IdSet,
all_way_node_ids: &'a IdSet,
extra_node_ids: &'a IdSet,
matched_way_ids: &'a IdSet,
extra_way_ids: &'a IdSet,
matched_relation_ids: &'a IdSet,
}
use super::super::clean_metadata;
use crate::owned::{dense_node_metadata, element_metadata};
#[hotpath::measure]
fn extract_block_pass3(
block: &PrimitiveBlock,
ids: &ExtractPass3IdSets<'_>,
clean: &CleanAttrs,
bb: &mut BlockBuilder,
output: &mut Vec<OwnedBlock>,
) -> std::result::Result<ExtractStats, String> {
let mut stats = ExtractStats {
nodes_in_bbox: 0,
nodes_from_ways: 0,
nodes_from_relations: 0,
ways_written: 0,
ways_from_relations: 0,
relations_written: 0,
strategy: "",
};
let mut refs_buf: Vec<i64> = Vec::new();
let mut members_buf: Vec<MemberData<'_>> = Vec::new();
for element in block.elements() {
match &element {
Element::DenseNode(dn) => {
let id = dn.id();
let in_bbox = ids.bbox_node_ids.get(id);
let from_way = ids.all_way_node_ids.get(id);
let from_rel = ids.extra_node_ids.get(id);
if in_bbox || from_way || from_rel {
ensure_node_capacity_local(bb, output)?;
let meta = clean_metadata(dense_node_metadata(dn), clean);
bb.add_node(dn.id(), dn.decimicro_lat(), dn.decimicro_lon(), dn.tags(), meta.as_ref());
if in_bbox {
stats.nodes_in_bbox += 1;
} else if from_way {
stats.nodes_from_ways += 1;
} else {
stats.nodes_from_relations += 1;
}
}
}
Element::Node(n) => {
let id = n.id();
let in_bbox = ids.bbox_node_ids.get(id);
let from_way = ids.all_way_node_ids.get(id);
let from_rel = ids.extra_node_ids.get(id);
if in_bbox || from_way || from_rel {
ensure_node_capacity_local(bb, output)?;
let meta = clean_metadata(element_metadata(&n.info()), clean);
bb.add_node(n.id(), n.decimicro_lat(), n.decimicro_lon(), n.tags(), meta.as_ref());
if in_bbox {
stats.nodes_in_bbox += 1;
} else if from_way {
stats.nodes_from_ways += 1;
} else {
stats.nodes_from_relations += 1;
}
}
}
Element::Way(w) => {
let in_matched = ids.matched_way_ids.get(w.id());
let in_extra = ids.extra_way_ids.get(w.id());
if in_matched || in_extra {
ensure_way_capacity_local(bb, output)?;
refs_buf.clear();
refs_buf.extend(w.refs());
let meta = clean_metadata(element_metadata(&w.info()), clean);
bb.add_way(w.id(), w.tags(), &refs_buf, meta.as_ref());
if in_extra && !in_matched {
stats.ways_from_relations += 1;
} else {
stats.ways_written += 1;
}
}
}
Element::Relation(r) => {
if ids.matched_relation_ids.get(r.id()) {
ensure_relation_capacity_local(bb, output)?;
members_buf.clear();
members_buf.extend(r.members().map(|m| MemberData {
id: m.id,
role: m.role().unwrap_or(""),
}));
let meta = clean_metadata(element_metadata(&r.info()), clean);
bb.add_relation(r.id(), r.tags(), &members_buf, meta.as_ref());
stats.relations_written += 1;
}
}
}
}
Ok(stats)
}
fn is_smart_relation(r: &crate::Relation) -> bool {
r.tags().any(|(k, v)| k == "type" && (v == "multipolygon" || v == "boundary"))
}