1extern crate crossbeam;
2extern crate osm_pbf_iter;
3use std::io::{self, Write};
4pub mod parse_status;
5pub mod relation;
6
7use std::collections::{HashMap, HashSet};
8use std::fmt;
9use std::fs::File;
10use std::io::BufReader;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::sync::mpsc::sync_channel;
13use std::sync::{Arc, RwLock};
14use std::thread;
15
16use osm_pbf_iter::{Blob, BlobReader, Primitive, PrimitiveBlock, RelationMemberType};
17
18use relation::{Area, Node, PublicTransport, Relation, Way};
19
20#[derive(Clone, Debug)]
21struct NodeData {
22 lat: f64,
24 lon: f64,
25 tags: HashMap<String, String>,
26}
27
28#[derive(Clone, Debug)]
29struct WayData {
30 id: u64,
31 tags: HashMap<String, String>,
32 info: HashMap<String, String>,
33 nodes: Vec<u64>,
34}
35
36#[derive(Clone, Debug)]
37struct RelationData {
38 id: u64,
39 tags: HashMap<String, String>,
40 info: HashMap<String, String>,
41 ways: Vec<u64>,
42 stops: Vec<u64>,
43}
44
45type WayIdsSet = HashSet<u64>;
46type NodeIdsSet = HashSet<u64>;
47
48struct MessageRelations {
49 relations: Vec<RelationData>,
50 stop_ids: NodeIdsSet,
51 way_ids: WayIdsSet,
52}
53
54struct MessageWays {
55 ways: Vec<WayData>,
56 relations_ways: HashMap<u64, WayData>,
57 node_ids: NodeIdsSet,
58}
59
60struct MessageNodes {
61 nodes: HashMap<u64, NodeData>,
62}
63
64#[derive(Clone)]
67pub struct Parser {
68 relations: Vec<RelationData>,
69 relations_ways: HashMap<u64, WayData>,
70 ways: Vec<WayData>,
71 nodes: HashMap<u64, NodeData>,
72 cpus: usize,
73}
74
75pub struct ParserRelationIterator {
77 index: usize,
78 data: Parser,
79}
80
81impl Parser {
84 fn filter_relation(relation: &osm_pbf_iter::Relation, conditions: &String) -> bool {
92 for condition in conditions.split('&') {
93 let mut condition_split = condition.split('=');
94 let condition_key = condition_split.next().unwrap().to_string();
95 let condition_values = condition_split.next();
96 let tag = relation.tags().find(|&kv| kv.0 == condition_key);
97 if (condition_key.starts_with('!') && tag.is_some())
98 || (!condition_key.starts_with('!') && tag.is_none())
99 {
100 return false;
101 } else if condition_values.is_some() {
102 let tag_value = tag.unwrap().1;
103 let condition_values_string = condition_values.unwrap().to_string();
104 let condition_values_split = condition_values_string.split(',');
105 let mut found = false;
106 for condition_value in condition_values_split {
107 if (condition_value.starts_with('!') && tag_value != &condition_value[1..])
108 || (!condition_value.starts_with('!') && tag_value == condition_value)
109 {
110 found = true;
111 break;
112 }
113 }
114 if !found {
115 return false;
116 }
117 }
118 }
119 true
120 }
121
122 fn filter_way(way: &osm_pbf_iter::Way, conditions: &String) -> bool {
130 for condition in conditions.split('&') {
131 let mut condition_split = condition.split('=');
132 let condition_key = condition_split.next().unwrap().to_string();
133 let condition_values = condition_split.next();
134 let tag = way.tags().find(|&kv| kv.0 == condition_key);
135 if (condition_key.starts_with('!') && tag.is_some())
136 || (!condition_key.starts_with('!') && tag.is_none())
137 {
138 return false;
139 } else if condition_values.is_some() {
140 let tag_value = tag.unwrap().1;
141 let condition_values_string = condition_values.unwrap().to_string();
142 let condition_values_split = condition_values_string.split(',');
143 let mut found = false;
144 for condition_value in condition_values_split {
145 if (condition_value.starts_with('!') && tag_value != &condition_value[1..])
146 || (!condition_value.starts_with('!') && tag_value == condition_value)
147 {
148 found = true;
149 break;
150 }
151 }
152 if !found {
153 return false;
154 }
155 }
156 }
157 true
158 }
159
160 pub fn new_ptv2(pbf_filename: &str, cpus: usize) -> Self {
162 Self::new(
163 pbf_filename,
164 cpus,
165 "name&!route_master&route=bus,tram,train,subway,light_rail,monorail,trolleybus"
166 .to_string(),
167 )
168 }
169
170 pub fn new_aa(pbf_filename: &str, cpus: usize) -> Self {
172 Self::new(
173 pbf_filename,
174 cpus,
175 "name&admin_level&boundary=administrative".to_string(),
176 )
177 }
178
179 pub fn new(pbf_filename: &str, cpus: usize, filters: String) -> Self {
188 let mut relations = Vec::new() as Vec<RelationData>;
189 let mut ways = Vec::new() as Vec<WayData>;
190 let mut relations_ways = HashMap::default() as HashMap<u64, WayData>;
191 let mut nodes = HashMap::default() as HashMap<u64, NodeData>;
192 let way_ids = Arc::new(RwLock::new(HashSet::default() as WayIdsSet));
193 let node_ids = Arc::new(RwLock::new(HashSet::default() as NodeIdsSet));
194 let filters_arc = Arc::new(filters);
195 {
199 eprint!("START Relations map, ");
200 io::stderr().flush().unwrap();
201 let mut workers = Vec::with_capacity(cpus);
202 for _ in 0..cpus {
203 let (req_tx, req_rx) = sync_channel(2);
204 let (res_tx, res_rx) = sync_channel(0);
205 workers.push((req_tx, res_rx));
206 let filters_local = filters_arc.clone();
207 thread::spawn(move || {
208 let mut relations = Vec::new() as Vec<RelationData>;
209 let mut stop_ids = HashSet::default() as NodeIdsSet;
210 let mut way_ids = HashSet::default() as WayIdsSet;
211
212 while let Ok(blob) = req_rx.recv() {
213 let data = (blob as Blob).into_data();
214 let primitive_block = PrimitiveBlock::parse(&data);
215 for primitive in primitive_block.primitives() {
216 if let Primitive::Relation(relation) = primitive {
217 if Self::filter_relation(&relation, filters_local.as_ref()) {
218 let mut info: HashMap<String, String> = HashMap::new();
219 if let Some(info_data) = relation.info.clone() {
220 if let Some(version) = info_data.version {
221 info.insert("version".to_string(), version.to_string());
222 }
223 if let Some(timestamp) = info_data.timestamp {
224 info.insert(
225 "timestamp".to_string(),
226 timestamp.to_string(),
227 );
228 }
229 if let Some(changeset) = info_data.changeset {
230 info.insert(
231 "changeset".to_string(),
232 changeset.to_string(),
233 );
234 }
235 if let Some(uid) = info_data.uid {
236 info.insert("uid".to_string(), uid.to_string());
237 }
238 if let Some(user) = info_data.user {
239 info.insert("user".to_string(), user.to_string());
240 }
241 if let Some(visible) = info_data.visible {
242 info.insert("visible".to_string(), visible.to_string());
243 }
244 }
245 let mut rd = RelationData {
247 id: relation.id,
248 tags: relation
249 .tags()
250 .map(|t| (t.0.to_string(), t.1.to_string()))
251 .collect(),
252 info,
253 ways: Vec::new(),
254 stops: Vec::new(),
255 };
256 for member in relation.members() {
257 if member.2 == RelationMemberType::Way {
259 rd.ways.push(member.1);
260 way_ids.insert(member.1);
261 }
262 if member.2 == RelationMemberType::Node {
263 rd.stops.push(member.1);
264 stop_ids.insert(member.1);
265 }
266 }
267 if !rd.ways.is_empty() {
268 relations.push(rd);
269 } else {
270 }
272 }
273 }
274 }
275 }
276
277 res_tx
278 .send(MessageRelations {
279 relations,
280 stop_ids,
281 way_ids,
282 })
283 .unwrap();
284 });
285 }
286
287 let f = File::open(pbf_filename).unwrap();
288 let mut reader = BlobReader::new(BufReader::new(f));
289
290 let mut w = 0;
291 for blob in &mut reader {
292 let req_tx = &workers[w].0;
293 w = (w + 1) % cpus;
294 req_tx.send(blob).unwrap();
295 }
296
297 eprint!("reduce, ");
298 io::stderr().flush().unwrap();
299 {
301 let mut node_ids_write = node_ids.write().unwrap();
303 let mut way_ids_write = way_ids.write().unwrap();
304 for (req_tx, res_rx) in workers.into_iter() {
305 drop(req_tx);
306 let worker_data = res_rx.recv().unwrap();
307 relations.extend(worker_data.relations);
308 node_ids_write.extend(worker_data.stop_ids);
309 way_ids_write.extend(worker_data.way_ids);
310 }
311 } eprintln!("found {}", relations.len());
313 }
314
315 {
319 eprint!("START Ways map, ");
320 io::stderr().flush().unwrap();
321 let mut workers = Vec::with_capacity(cpus);
322 for _ in 0..cpus {
323 let (req_tx, req_rx) = sync_channel(2);
324 let (res_tx, res_rx) = sync_channel(0);
325 workers.push((req_tx, res_rx));
326 let way_ids_local = way_ids.clone();
327 let filters_local = filters_arc.clone();
328 thread::spawn(move || {
329 let mut ways = Vec::new() as Vec<WayData>;
330 let mut relations_ways = HashMap::default() as HashMap<u64, WayData>;
331 let mut node_ids = HashSet::default() as NodeIdsSet;
332 let way_ids_read = way_ids_local.read().unwrap();
333 while let Ok(blob) = req_rx.recv() {
334 let blob = (blob as Blob).into_data();
335 let primitive_block = PrimitiveBlock::parse(&blob);
336 for primitive in primitive_block.primitives() {
337 if let Primitive::Way(way) = primitive {
338 if way_ids_read.contains(&way.id) {
340 for node in way.refs() {
341 node_ids.insert(node as u64);
342 }
343 relations_ways.insert(
344 way.id,
345 WayData {
346 id: way.id,
347 tags: way
348 .tags()
349 .map(|t| (t.0.to_string(), t.1.to_string()))
350 .collect(),
351 info: HashMap::new(),
352 nodes: way.refs().map(|id| id as u64).collect(),
353 },
354 );
355 }
356 if Self::filter_way(&way, filters_local.as_ref()) {
358 let mut info: HashMap<String, String> = HashMap::new();
359 if let Some(info_data) = way.info.clone() {
360 if let Some(version) = info_data.version {
361 info.insert("version".to_string(), version.to_string());
362 }
363 if let Some(timestamp) = info_data.timestamp {
364 info.insert(
365 "timestamp".to_string(),
366 timestamp.to_string(),
367 );
368 }
369 if let Some(changeset) = info_data.changeset {
370 info.insert(
371 "changeset".to_string(),
372 changeset.to_string(),
373 );
374 }
375 if let Some(uid) = info_data.uid {
376 info.insert("uid".to_string(), uid.to_string());
377 }
378 if let Some(user) = info_data.user {
379 info.insert("user".to_string(), user.to_string());
380 }
381 if let Some(visible) = info_data.visible {
382 info.insert("visible".to_string(), visible.to_string());
383 }
384 }
385 let wd = WayData {
386 id: way.id,
387 tags: way
388 .tags()
389 .map(|t| (t.0.to_string(), t.1.to_string()))
390 .collect(),
391 info,
392 nodes: way.refs().map(|id| id as u64).collect(),
393 };
394 if !wd.nodes.is_empty() {
395 for node in way.refs() {
396 node_ids.insert(node as u64);
397 }
398 ways.push(wd);
399 } else {
400 }
402 }
403 }
404 }
405 }
406
407 res_tx
408 .send(MessageWays {
409 ways,
410 relations_ways,
411 node_ids,
412 })
413 .unwrap();
414 });
415 }
416
417 let f = File::open(pbf_filename).unwrap();
418 let mut reader = BlobReader::new(BufReader::new(f));
419
420 let mut w = 0;
421 for blob in &mut reader {
422 let req_tx = &workers[w].0;
423 w = (w + 1) % cpus;
424 req_tx.send(blob).unwrap();
425 }
426
427 eprint!("reduce, ");
428 io::stderr().flush().unwrap();
429 {
431 let mut node_ids_write = node_ids.write().unwrap();
432 for (req_tx, res_rx) in workers.into_iter() {
433 drop(req_tx);
434 let worker_data = res_rx.recv().unwrap();
435 ways.extend(worker_data.ways);
436 relations_ways.extend(worker_data.relations_ways);
437 node_ids_write.extend(worker_data.node_ids);
438 }
439 } eprintln!(
441 "found {} for relations +{} new",
442 relations_ways.len(),
443 ways.len()
444 );
445 }
446
447 {
451 eprint!("START Nodes map, ");
452 io::stderr().flush().unwrap();
453 let mut workers = Vec::with_capacity(cpus);
454 for _ in 0..cpus {
455 let (req_tx, req_rx) = sync_channel(2);
456 let (res_tx, res_rx) = sync_channel(0);
457 workers.push((req_tx, res_rx));
458 let node_ids_local = node_ids.clone();
459 thread::spawn(move || {
460 let node_ids_read = node_ids_local.read().unwrap();
462 let mut nodes = HashMap::default() as HashMap<u64, NodeData>;
463 while let Ok(blob) = req_rx.recv() {
464 let blob = (blob as Blob).into_data();
465 let primitive_block = PrimitiveBlock::parse(&blob);
466 for primitive in primitive_block.primitives() {
467 if let Primitive::Node(node) = primitive {
468 if node_ids_read.contains(&node.id) {
469 nodes.insert(
470 node.id,
471 NodeData {
472 tags: node
474 .tags
475 .into_iter()
476 .map(|t| (t.0.to_string(), t.1.to_string()))
477 .collect(),
478 lat: node.lat,
479 lon: node.lon,
480 },
481 );
482 }
483 }
484 }
485 }
486
487 res_tx.send(MessageNodes { nodes }).unwrap();
488 });
489 }
490
491 let f = File::open(pbf_filename).unwrap();
492 let mut reader = BlobReader::new(BufReader::new(f));
493
494 let mut w = 0;
495 for blob in &mut reader {
496 let req_tx = &workers[w].0;
497 w = (w + 1) % cpus;
498 req_tx.send(blob).unwrap();
499 }
500
501 eprint!("reduce, ");
502 io::stderr().flush().unwrap();
503 {
505 for (req_tx, res_rx) in workers.into_iter() {
506 drop(req_tx);
507 let worker_data = res_rx.recv().unwrap();
508 nodes.extend(worker_data.nodes);
509 }
510 } } eprintln!("found {}", nodes.len());
513
514 Parser {
515 relations,
516 relations_ways,
517 ways,
518 nodes,
519 cpus,
520 }
521 }
522
523 pub fn get_public_transports(&self, gap: f64) -> Vec<PublicTransport> {
526 self.par_map(&move |r| {
527 let (f, s) = r.flatten_ways(gap, false).unwrap();
529 PublicTransport {
530 id: r.id,
531 tags: r.tags.clone(),
532 info: r.info.clone(),
533 stops: r.stops,
534 geometry: f
535 .iter()
536 .map(|v| v.iter().map(|n| (n.lon, n.lat)).collect())
537 .collect(),
538 parse_status: s,
539 }
540 })
541 }
542
543 pub fn par_map<R, F>(&self, func: &F) -> Vec<R>
547 where
548 F: Fn(Relation) -> R + Sync + Send + Clone + 'static,
549 R: Send + 'static,
550 {
551 let cpus = self.cpus;
552 let length = self.relations.len();
553 let mut workers = Vec::with_capacity(cpus);
554 let index = Arc::new(RwLock::new(AtomicUsize::new(0)));
555 crossbeam::scope(|s| {
556 for _ in 0..cpus {
557 let (res_tx, res_rx) = sync_channel(200);
558 workers.push(res_rx);
559 let index_local = index.clone();
560 s.spawn(move |_| loop {
561 let index;
562 {
563 let index_write = index_local.write().unwrap();
564 index = index_write.fetch_add(1, Ordering::SeqCst);
565 }
566 if index >= length {
567 break;
568 }
569 let relation = self.get_relation_at(index);
570 let processed = func(relation);
571 res_tx.send(processed).unwrap();
572 });
573 }
574
575 let mut relations = Vec::with_capacity(length);
577 let mut errors = 0;
578 while errors < cpus {
579 errors = 0;
580 for res_rx in workers.iter() {
581 match res_rx.recv() {
582 Ok(worker_data) => relations.push(worker_data),
583 Err(_) => errors += 1,
584 };
585 }
586 }
587 relations
588 })
589 .unwrap()
590 }
591
592 pub fn get_areas(&self, gap: f64) -> Vec<Area> {
595 let relations_ways = self.par_map(&move |r| {
596 let (f, s) = r.flatten_ways(gap, true).unwrap();
597 Area {
598 id: r.id,
599 id_type: 'r',
600 tags: r.tags,
601 info: r.info,
602 geometry: f
603 .iter()
604 .map(|v| v.iter().map(|n| (n.lon, n.lat)).collect())
605 .collect(),
606 parse_status: s,
607 }
608 });
609
610 let cpus = self.cpus;
612 let length = self.ways.len();
613 let mut workers = Vec::with_capacity(cpus);
614 let index = Arc::new(RwLock::new(AtomicUsize::new(0)));
615 let areas_ways = crossbeam::scope(|s| {
616 for _ in 0..cpus {
617 let (res_tx, res_rx) = sync_channel(200);
618 workers.push(res_rx);
619 let index_local = index.clone();
620 s.spawn(move |_| loop {
621 let index;
622 {
623 let index_write = index_local.write().unwrap();
624 index = index_write.fetch_add(1, Ordering::SeqCst);
625 }
626 if index >= length {
627 break;
628 }
629 let way = self.get_way_at(index);
630 let (f, s) = way.flatten_ways(gap, true).unwrap();
631 let area = Area {
632 id: way.id,
633 id_type: 'w',
634 tags: way.tags,
635 info: way.info,
636 geometry: f
637 .iter()
638 .map(|v| v.iter().map(|n| (n.lon, n.lat)).collect())
639 .collect(),
640 parse_status: s,
641 };
642 res_tx.send(area).unwrap();
644 });
645 }
646
647 let mut ways = Vec::with_capacity(length);
649 let mut errors = 0;
650 while errors < cpus {
651 errors = 0;
652 for res_rx in workers.iter() {
653 match res_rx.recv() {
654 Ok(worker_data) => ways.push(worker_data),
655 Err(_) => errors += 1,
656 };
657 }
658 }
659 ways
660 })
661 .unwrap();
662
663 relations_ways.into_iter().chain(areas_ways).collect()
665 }
666
667 pub fn get_relation_from_id(self, id: u64) -> Relation {
669 let relopt = self.relations.iter().find(|rel| rel.id == id);
670 let rel = relopt.as_ref().unwrap();
671 self.get_relation_from(rel)
672 }
673
674 fn get_relation_from(&self, relation_data: &RelationData) -> Relation {
676 Relation {
677 id: relation_data.id,
678 tags: relation_data.tags.clone(),
679 info: relation_data.info.clone(),
680 ways: relation_data
681 .ways
682 .iter()
683 .filter(|wid| self.relations_ways.contains_key(wid))
684 .map(|wid| Way {
685 id: *wid,
686 tags: self.relations_ways[wid].tags.clone(),
687 info: self.relations_ways[wid].info.clone(),
688 nodes: self.relations_ways[wid]
689 .nodes
690 .iter()
691 .filter(|nid| self.nodes.contains_key(nid))
692 .map(|nid| Node {
693 id: *nid,
694 tags: self.nodes[nid].tags.clone(),
695 lat: self.nodes[nid].lat,
696 lon: self.nodes[nid].lon,
697 })
698 .collect(),
699 })
700 .collect(),
701 stops: relation_data
702 .stops
703 .iter()
704 .filter(|nid| self.nodes.contains_key(nid))
705 .map(|nid| Node {
706 id: *nid,
707 tags: self.nodes[nid].tags.clone(),
708 lat: self.nodes[nid].lat,
709 lon: self.nodes[nid].lon,
710 })
711 .collect(),
712 }
713 }
714
715 fn get_way_from(&self, way_data: &WayData) -> Way {
717 Way {
718 id: way_data.id,
719 tags: way_data.tags.clone(),
720 info: way_data.info.clone(),
721 nodes: way_data
722 .nodes
723 .iter()
724 .filter(|nid| self.nodes.contains_key(nid))
725 .map(|nid| Node {
726 id: *nid,
727 tags: self.nodes[nid].tags.clone(),
728 lat: self.nodes[nid].lat,
729 lon: self.nodes[nid].lon,
730 })
731 .collect(),
732 }
733 }
734
735 pub fn get_relation_at(&self, index: usize) -> Relation {
737 let rel = &self.relations[index];
738 self.get_relation_from(rel)
739 }
740
741 pub fn get_way_at(&self, index: usize) -> Way {
743 let way = &self.ways[index];
744 self.get_way_from(way)
745 }
746
747 pub fn iter(self) -> ParserRelationIterator {
749 ParserRelationIterator {
750 data: self,
751 index: 0,
752 }
753 }
754}
755
756impl fmt::Debug for Parser {
757 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
758 writeln!(f, "Preparing to print")?;
759 let mut count = 0;
760 for relation in self.relations.clone() {
761 count += 1;
762 let ways_count = relation.ways.len();
763 let stops_count = relation.stops.len();
764 let nodes_count: usize = relation
765 .ways
766 .iter()
767 .map(|wid| match self.relations_ways.get(wid) {
768 Some(way) => way.nodes.len(),
769 None => 0,
770 })
771 .sum();
772 writeln!(
773 f,
774 "{:?}: ways {:?}, stops {:?}, nodes {:?}, {:?}",
775 relation.id, ways_count, stops_count, nodes_count, relation.tags["name"]
776 )?;
777 }
778 writeln!(f, "\nFound {:?} relations", count)?;
779 Ok(())
780 }
781}
782
783impl Iterator for ParserRelationIterator {
784 type Item = Relation;
785
786 fn next(&mut self) -> Option<Relation> {
787 if self.index >= self.data.relations.len() {
788 None
789 } else {
790 let relation = self.data.get_relation_at(self.index);
791 self.index += 1usize;
792 Some(relation)
793 }
794 }
795}
796
797impl IntoIterator for Parser {
798 type Item = Relation;
799 type IntoIter = ParserRelationIterator;
800 fn into_iter(self) -> Self::IntoIter {
801 self.iter()
802 }
803}