1use std::collections::HashSet;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6use anyhow::anyhow;
7use command_executor::shutdown_mode::ShutdownMode;
8use command_executor::thread_pool_builder::ThreadPoolBuilder;
9
10use crate::osm::model::element::Element;
11use crate::osm::pbf::blob_iterator::BlobIterator;
12use crate::osm::pbf::element_iterator::ElementIterator;
13use crate::osm::pbf::file_block_iterator::FileBlockIterator;
14use crate::osm::pbf::file_info::FileInfo;
15use crate::osm::pbf::parallel_element_iteration_command::ParallelElementIterationCommand;
16
17#[derive(Debug, Clone)]
18pub struct Reader {
19 supported_features: Vec<String>,
20 path: PathBuf,
21 info: FileInfo,
22}
23
24impl Reader {
28 pub fn new(path: &Path) -> Result<Reader, anyhow::Error> {
40 let supported_features = vec![
41 "OsmSchema-V0.6".to_string(),
42 "DenseNodes".to_string(),
43 "HistoricalInformation".to_string(),
44 "Sort.Type_then_ID".to_string(),
45 ];
46
47 let mut reader = Reader {
48 supported_features,
49 path: path.to_path_buf(),
50 info: Default::default(),
51 };
52 let mut block_iterator = reader.clone().blocks()?;
53 let file_block = block_iterator.next().ok_or(
54 anyhow!("Failed to parse file header, path: {}", path.display())
55 )?;
56 let osm_header = file_block.as_osm_header()?;
57 reader.info = osm_header.info().clone();
58
59 Self::verify_supported_features(
60 &reader.supported_features,
61 reader.info().required_features(),
62 )?;
63
64 Ok(
65 reader
66 )
67 }
68
69 pub(crate) fn blobs(&self) -> Result<BlobIterator, anyhow::Error> {
70 BlobIterator::new(self.path.clone())
71 }
72
73 pub fn blocks(&self) -> Result<FileBlockIterator, anyhow::Error> {
75 match self.blobs() {
76 Ok(blob_iterator) => {
77 Ok(
78 FileBlockIterator::new(blob_iterator)
79 )
80 }
81 Err(e) => {
82 Err(e)
83 }
84 }
85 }
86
87 pub fn elements(&self) -> Result<ElementIterator, anyhow::Error> {
125 match self.blocks() {
126 Ok(file_block_iterator) => {
127 Ok(
128 ElementIterator::new(file_block_iterator)
129 )
130 }
131 Err(e) => {
132 Err(e)
133 }
134 }
135 }
136
137 pub fn parallel_for_each(&self, tasks: usize, f: impl Fn(Element) -> Result<(), anyhow::Error> + Send + Sync + 'static) -> Result<(), anyhow::Error> {
184 let mut iteration_pool = ThreadPoolBuilder::new()
185 .with_tasks(tasks)
186 .with_queue_size(1024)
187 .with_shutdown_mode(ShutdownMode::CompletePending)
188 .with_name_str("parallel-element-iterator")
189 .build()?;
190
191 let f_wrapper = Arc::new(f);
192 for blob_desc in self.blobs()? {
193 let f_wrapper_clone = f_wrapper.clone();
194 iteration_pool.submit(
195 Box::new(
196 ParallelElementIterationCommand::new(blob_desc, f_wrapper_clone)
197 )
198 );
199 }
200
201 iteration_pool.shutdown();
202 iteration_pool.join()?;
203 Ok(())
204 }
205
206 fn find_missing_features(supported_features: &[String], required_features: &[String]) -> Vec<String> {
207 let supported: HashSet<&String> = supported_features.iter().collect::<HashSet<&String>>();
208 let required: HashSet<&String> = required_features.iter().collect::<HashSet<&String>>();
209 if required.is_subset(&supported) {
210 vec![]
211 } else {
212 required.difference(&supported)
213 .map(|e| (*e).clone())
214 .collect::<Vec<String>>()
215 }
216 }
217
218 fn verify_supported_features(supported_features: &[String], required_features: &[String]) -> Result<(), anyhow::Error> {
219 let missing_features = Self::find_missing_features(supported_features, required_features);
220 if missing_features.is_empty() {
221 Ok(())
222 } else {
223 Err(
224 anyhow!("Unsupported features: {missing_features:?}")
225 )
226 }
227 }
228
229 pub fn supported_features(&self) -> &Vec<String> {
231 &self.supported_features
232 }
233
234 pub fn info(&self) -> &FileInfo {
235 &self.info
236 }
237
238 pub fn count_objects(&self) -> Result<(i64, i64, i64), anyhow::Error> {
239 let nodes = Arc::new(AtomicUsize::new(0));
240 let ways = Arc::new(AtomicUsize::new(0));
241 let relations = Arc::new(AtomicUsize::new(0));
242
243 let nodes_clone = nodes.clone();
244 let ways_clone = ways.clone();
245 let relations_clone = relations.clone();
246
247 self.parallel_for_each(num_cpus::get(), move |element| {
248 match element {
249 Element::Node { node: _ } => {
250 nodes.fetch_add(1, Ordering::SeqCst);
251 }
252 Element::Way { .. } => {
253 ways.fetch_add(1, Ordering::SeqCst);
254 }
255 Element::Relation { .. } => {
256 relations.fetch_add(1, Ordering::SeqCst);
257 }
258 Element::Sentinel => {}
259 }
260 Ok(())
261 },
262 )?;
263
264 Ok(
265 (
266 nodes_clone.load(Ordering::SeqCst) as i64,
267 ways_clone.load(Ordering::SeqCst) as i64,
268 relations_clone.load(Ordering::SeqCst) as i64
269 )
270 )
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277
278 #[test]
279 fn test_find_missing_features() {
280 let mut supported = vec!["a".to_string(), "b".to_string()];
281 let mut required = vec!["a".to_string(), "b".to_string()];
282 let mut missing = Reader::find_missing_features(&supported, &required);
283 assert!(missing.is_empty());
284
285 supported = vec!["a".to_string()];
286 required = vec!["a".to_string(), "b".to_string()];
287 missing = Reader::find_missing_features(&supported, &required);
288 assert_eq!(missing, vec!["b".to_string()]);
289
290
291 supported = vec!["a".to_string(), "b".to_string()];
292 required = vec!["a".to_string()];
293 missing = Reader::find_missing_features(&supported, &required);
294 assert!(missing.is_empty());
295 }
296}