osm_io/osm/pbf/
reader.rs

1use std::collections::HashSet;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6use anyhow::anyhow;
7use command_executor::shutdown_mode::ShutdownMode;
8use command_executor::thread_pool_builder::ThreadPoolBuilder;
9
10use crate::osm::model::element::Element;
11use crate::osm::pbf::blob_iterator::BlobIterator;
12use crate::osm::pbf::element_iterator::ElementIterator;
13use crate::osm::pbf::file_block_iterator::FileBlockIterator;
14use crate::osm::pbf::file_info::FileInfo;
15use crate::osm::pbf::parallel_element_iteration_command::ParallelElementIterationCommand;
16
17#[derive(Debug, Clone)]
18pub struct Reader {
19    supported_features: Vec<String>,
20    path: PathBuf,
21    info: FileInfo,
22}
23
24/// *.osm.pbf file reader
25///
26/// Prepare the *.osm.pbf file for reading. The actual reading is performed by associated iterators.
27impl Reader {
28    /// Create a new Reader
29    ///
30    /// * path - a path to a valid *.osm.pbf file
31    ///
32    ///   Example:
33    /// ```
34    /// use std::path::PathBuf;
35    /// use osm_io::osm::pbf::reader::Reader;
36    /// let input_path = PathBuf::from("./planet.osm.pbf");
37    /// let reader = Reader::new(&input_path);
38    /// ```
39    pub fn new(path: &Path) -> Result<Reader, anyhow::Error> {
40        let supported_features = vec![
41            "OsmSchema-V0.6".to_string(),
42            "DenseNodes".to_string(),
43            "HistoricalInformation".to_string(),
44            "Sort.Type_then_ID".to_string(),
45        ];
46
47        let mut reader = Reader {
48            supported_features,
49            path: path.to_path_buf(),
50            info: Default::default(),
51        };
52        let mut block_iterator = reader.clone().blocks()?;
53        let file_block = block_iterator.next().ok_or(
54            anyhow!("Failed to parse file header, path: {}", path.display())
55        )?;
56        let osm_header = file_block.as_osm_header()?;
57        reader.info = osm_header.info().clone();
58
59        Self::verify_supported_features(
60            &reader.supported_features,
61            reader.info().required_features(),
62        )?;
63
64        Ok(
65            reader
66        )
67    }
68
69    pub(crate) fn blobs(&self) -> Result<BlobIterator, anyhow::Error> {
70        BlobIterator::new(self.path.clone())
71    }
72
73    /// Low level [FileBlockIterator] used to access the sequence of underlying PBF blocks
74    pub fn blocks(&self) -> Result<FileBlockIterator, anyhow::Error> {
75        match self.blobs() {
76            Ok(blob_iterator) => {
77                Ok(
78                    FileBlockIterator::new(blob_iterator)
79                )
80            }
81            Err(e) => {
82                Err(e)
83            }
84        }
85    }
86
87    /// Iterator used to iterate over elements.
88    /// Example:
89    /// ```
90    /// use std::path::PathBuf;
91    /// use osm_io::osm::model::element::Element;
92    /// use osm_io::osm::pbf;
93    /// fn example() -> Result<(), anyhow::Error> {
94    ///     let input_path = PathBuf::from("./tests/fixtures/malta-230109.osm.pbf");
95    ///     let reader = pbf::reader::Reader::new(&input_path)?;
96    ///
97    ///     let mut nodes = 0usize;
98    ///     let mut ways = 0usize;
99    ///     let mut relations = 0usize;
100    ///
101    ///     for element in reader.elements()? {
102    ///         match element {
103    ///             Element::Node { node } => {
104    ///                 nodes += 1;
105    ///             }
106    ///             Element::Way { way } => {
107    ///                 ways += 1;
108    ///             }
109    ///             Element::Relation { relation } => {
110    ///                 relations += 1;
111    ///             }
112    ///             Element::Sentinel => {
113    ///             }
114    ///         }
115    ///     }
116    ///
117    ///     println!("nodes: {}", nodes);
118    ///     println!("ways: {}", ways);
119    ///     println!("relations: {}", relations);
120    ///
121    ///     Ok(())
122    /// }
123    /// ```
124    pub fn elements(&self) -> Result<ElementIterator, anyhow::Error> {
125        match self.blocks() {
126            Ok(file_block_iterator) => {
127                Ok(
128                    ElementIterator::new(file_block_iterator)
129                )
130            }
131            Err(e) => {
132                Err(e)
133            }
134        }
135    }
136
137    /// Parallel iteration over elements in a *.osm.pbf file
138    ///
139    /// Note that because of the parallel access the order of elements enforced by *.osm.pbf format
140    /// is lost.
141    /// Example:
142    /// ```
143    /// use std::path::PathBuf;
144    /// use std::sync::Arc;
145    /// use std::sync::atomic::{AtomicUsize, Ordering};
146    /// use osm_io::osm::model::element::Element;
147    /// use osm_io::osm::pbf;
148    /// fn example() -> Result<(), anyhow::Error> {
149    ///     let input_path = PathBuf::from("./tests/fixtures/malta-230109.osm.pbf");
150    ///     let reader = pbf::reader::Reader::new(&input_path)?;
151    ///
152    ///     let nodes = Arc::new(AtomicUsize::new(0));
153    ///     let ways = Arc::new(AtomicUsize::new(0));
154    ///     let relations = Arc::new(AtomicUsize::new(0));
155    ///
156    ///     let nodes_clone = nodes.clone();
157    ///     let ways_clone = ways.clone();
158    ///     let relations_clone = relations.clone();
159    ///
160    ///     reader.parallel_for_each(4, move |element| {
161    ///         match element {
162    ///             Element::Node { node: _ } => {
163    ///                 nodes.fetch_add(1, Ordering::SeqCst);
164    ///             }
165    ///             Element::Way { .. } => {
166    ///                 ways.fetch_add(1, Ordering::SeqCst);
167    ///             }
168    ///             Element::Relation { .. } => {
169    ///                 relations.fetch_add(1, Ordering::SeqCst);
170    ///             }
171    ///             Element::Sentinel => {}
172    ///             }
173    ///             Ok(())
174    ///         },
175    ///     )?;
176    ///
177    ///     println!("nodes: {}", nodes_clone.load(Ordering::SeqCst));
178    ///     println!("ways: {}", ways_clone.load(Ordering::SeqCst));
179    ///     println!("relations: {}", relations_clone.load(Ordering::SeqCst));
180    ///     Ok(())
181    /// }
182    /// ```
183    pub fn parallel_for_each(&self, tasks: usize, f: impl Fn(Element) -> Result<(), anyhow::Error> + Send + Sync + 'static) -> Result<(), anyhow::Error> {
184        let mut iteration_pool = ThreadPoolBuilder::new()
185            .with_tasks(tasks)
186            .with_queue_size(1024)
187            .with_shutdown_mode(ShutdownMode::CompletePending)
188            .with_name_str("parallel-element-iterator")
189            .build()?;
190
191        let f_wrapper = Arc::new(f);
192        for blob_desc in self.blobs()? {
193            let f_wrapper_clone = f_wrapper.clone();
194            iteration_pool.submit(
195                Box::new(
196                    ParallelElementIterationCommand::new(blob_desc, f_wrapper_clone)
197                )
198            );
199        }
200
201        iteration_pool.shutdown();
202        iteration_pool.join()?;
203        Ok(())
204    }
205
206    fn find_missing_features(supported_features: &[String], required_features: &[String]) -> Vec<String> {
207        let supported: HashSet<&String> = supported_features.iter().collect::<HashSet<&String>>();
208        let required: HashSet<&String> = required_features.iter().collect::<HashSet<&String>>();
209        if required.is_subset(&supported) {
210            vec![]
211        } else {
212            required.difference(&supported)
213                .map(|e| (*e).clone())
214                .collect::<Vec<String>>()
215        }
216    }
217
218    fn verify_supported_features(supported_features: &[String], required_features: &[String]) -> Result<(), anyhow::Error> {
219        let missing_features = Self::find_missing_features(supported_features, required_features);
220        if missing_features.is_empty() {
221            Ok(())
222        } else {
223            Err(
224                anyhow!("Unsupported features: {missing_features:?}")
225            )
226        }
227    }
228
229    /// List the features supported by this [Reader]
230    pub fn supported_features(&self) -> &Vec<String> {
231        &self.supported_features
232    }
233
234    pub fn info(&self) -> &FileInfo {
235        &self.info
236    }
237
238    pub fn count_objects(&self) -> Result<(i64, i64, i64), anyhow::Error> {
239        let nodes = Arc::new(AtomicUsize::new(0));
240        let ways = Arc::new(AtomicUsize::new(0));
241        let relations = Arc::new(AtomicUsize::new(0));
242
243        let nodes_clone = nodes.clone();
244        let ways_clone = ways.clone();
245        let relations_clone = relations.clone();
246
247        self.parallel_for_each(num_cpus::get(), move |element| {
248            match element {
249                Element::Node { node: _ } => {
250                    nodes.fetch_add(1, Ordering::SeqCst);
251                }
252                Element::Way { .. } => {
253                    ways.fetch_add(1, Ordering::SeqCst);
254                }
255                Element::Relation { .. } => {
256                    relations.fetch_add(1, Ordering::SeqCst);
257                }
258                Element::Sentinel => {}
259            }
260            Ok(())
261        },
262        )?;
263
264        Ok(
265            (
266                nodes_clone.load(Ordering::SeqCst) as i64,
267                ways_clone.load(Ordering::SeqCst) as i64,
268                relations_clone.load(Ordering::SeqCst) as i64
269            )
270        )
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277
278    #[test]
279    fn test_find_missing_features() {
280        let mut supported = vec!["a".to_string(), "b".to_string()];
281        let mut required = vec!["a".to_string(), "b".to_string()];
282        let mut missing = Reader::find_missing_features(&supported, &required);
283        assert!(missing.is_empty());
284
285        supported = vec!["a".to_string()];
286        required = vec!["a".to_string(), "b".to_string()];
287        missing = Reader::find_missing_features(&supported, &required);
288        assert_eq!(missing, vec!["b".to_string()]);
289
290
291        supported = vec!["a".to_string(), "b".to_string()];
292        required = vec!["a".to_string()];
293        missing = Reader::find_missing_features(&supported, &required);
294        assert!(missing.is_empty());
295    }
296}