Skip to main content

fast_osmpbf/
parser.rs

1use crate::{
2    Blob, DenseNodeBlock, ElementBlock, NodeBlock, PrimitiveBlock, RelationBlock, WayBlock, get_element_filter, get_tags_filter,
3};
4use quick_protobuf::{BytesReader, MessageRead};
5use std::{borrow::Cow, io::Read, sync::Arc};
6
7pub(crate) struct OsmParser;
8impl OsmParser {
9    /// Deserialize blob_slices into a Blob.
10    /// Then decompresses the blob if its stored in a compressed state.
11    /// Then parses ElementBlocks inside the decompressed blob.
12    pub(crate) fn deserialize_blob(blob_slice: Arc<[u8]>) -> std::io::Result<Vec<ElementBlock>> {
13        // Deserialize blob
14        let mut reader = BytesReader::from_bytes(&blob_slice);
15        let blob =
16            Blob::from_reader(&mut reader, &blob_slice).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
17
18        // either take the raw_size if available or use 2 * compressed_size as heuristic
19        let size = match blob.raw_size {
20            Some(raw_size) => raw_size as usize,
21            None => blob_slice.len() * 2,
22        };
23        let mut decompressed_blob: Vec<u8> = Vec::with_capacity(size);
24        if let Some(raw) = &blob.raw {
25            decompressed_blob.extend_from_slice(raw);
26        } else if let Some(zlib) = &blob.zlib_data {
27            let mut decoder = flate2::read::ZlibDecoder::new(&zlib[..]);
28            decoder.read_to_end(&mut decompressed_blob)?;
29        } else if let Some(lzma) = &blob.lzma_data {
30            let mut decoder = xz2::read::XzDecoder::new(&lzma[..]);
31            decoder.read_to_end(&mut decompressed_blob)?;
32        } else {
33            return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Empty OSMData blob"));
34        };
35
36        Self::parse_blob(&decompressed_blob)
37    }
38    // Processes a blob in parallel using rayon (one task per PrimitiveGroup)
39    fn parse_blob(blob: &[u8]) -> std::io::Result<Vec<ElementBlock>> {
40        let mut reader = BytesReader::from_bytes(blob);
41        let block = PrimitiveBlock::from_reader(&mut reader, blob)
42            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
43        let table: Vec<Cow<'static, [u8]>> = block.stringtable.s.into_iter().map(|s| Cow::Owned(s.to_vec())).collect();
44        let stringtable = Arc::new(table);
45        let cached_tag_ids = match get_tags_filter() {
46            Some(_) => Self::get_tag_ids(&stringtable),
47            None => Arc::new(Vec::with_capacity(0)),
48        };
49
50        let element_filter = get_element_filter();
51
52        let element_count: usize = block
53            .primitivegroup
54            .iter()
55            .map(|g| g.nodes.len() + g.ways.len() + g.relations.len() + g.dense.as_ref().map_or(0, |d| d.id.len()))
56            .sum();
57        let mut elements: Vec<ElementBlock> = Vec::with_capacity(element_count);
58
59        for group in block.primitivegroup {
60            if let Some(dense_nodes) = group.dense {
61                if element_filter.as_ref().map_or(true, |f| f.nodes) {
62                    let table = Arc::clone(&stringtable);
63                    elements.push(ElementBlock::DenseNodeBlock(DenseNodeBlock {
64                        table,
65                        cached_tag_ids: Arc::clone(&cached_tag_ids),
66                        granularity: block.granularity,
67                        lat_offset: block.lat_offset,
68                        lon_offset: block.lon_offset,
69                        kv_offsets: Self::compute_offsets(&dense_nodes.keys_vals, dense_nodes.id.len()),
70                        nodes: Arc::from(dense_nodes),
71                    }));
72                }
73            }
74            if !group.nodes.is_empty() {
75                if element_filter.as_ref().map_or(true, |f| f.nodes) {
76                    let table = Arc::clone(&stringtable);
77                    elements.push(ElementBlock::NodeBlock(NodeBlock {
78                        nodes: Arc::from(group.nodes),
79                        cached_tag_ids: Arc::clone(&cached_tag_ids),
80                        table,
81                    }));
82                }
83            }
84
85            if !group.ways.is_empty() {
86                if element_filter.as_ref().map_or(true, |f| f.ways) {
87                    let table = Arc::clone(&stringtable);
88                    elements.push(ElementBlock::WayBlock(WayBlock {
89                        ways: Arc::from(group.ways),
90                        cached_tag_ids: Arc::clone(&cached_tag_ids),
91                        table,
92                    }));
93                }
94            }
95
96            if !group.relations.is_empty() {
97                if element_filter.as_ref().map_or(true, |f| f.relations) {
98                    let table = Arc::clone(&stringtable);
99                    elements.push(ElementBlock::RelationBlock(RelationBlock {
100                        relations: Arc::from(group.relations),
101                        cached_tag_ids: Arc::clone(&cached_tag_ids),
102                        table,
103                    }));
104                }
105            }
106        }
107
108        Ok(elements)
109    }
110
111    // Gets tag ids from stringtable if corresponding value is in TAG_KEYS_CACHE
112    fn get_tag_ids(table: &[Cow<'_, [u8]>]) -> Arc<Vec<u32>> {
113        Arc::new(
114            table
115                .iter()
116                .enumerate()
117                .filter_map(|(i, s)| {
118                    let key = unsafe { std::str::from_utf8_unchecked(s) };
119                    let cache = get_tags_filter().unwrap();
120
121                    // Branchless linear scan for ≤8 elements
122                    if cache.contains(&key) { Some(i as u32) } else { None }
123                })
124                .collect::<Vec<u32>>(),
125        )
126    }
127
128    // Computes offsets for keys_vals in DenseNodes
129    // key_vals looks like [k, v, k, v, k, v, ..., 0, k, v, k, v ... 0 ...]
130    fn compute_offsets(keys_vals: &[i32], node_count: usize) -> Vec<usize> {
131        let mut offsets = Vec::with_capacity(node_count + 1);
132        offsets.push(0);
133
134        let mut idx = 0;
135
136        for _ in 0..node_count {
137            while idx < keys_vals.len() && keys_vals[idx] != 0 {
138                idx += 2; // skip k, v pair
139            }
140
141            if idx >= keys_vals.len() {
142                // malformed, but avoid UB
143                offsets.push(idx);
144                continue;
145            }
146
147            idx += 1; // skip terminating zero
148            offsets.push(idx);
149        }
150
151        offsets
152    }
153}