1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// Copyright (c) 2014-2017 Guillaume Pinot <texitoi(a)texitoi.eu>
//
// This work is free. You can redistribute it and/or modify it under
// the terms of the Do What The Fuck You Want To Public License,
// Version 2, as published by Sam Hocevar. See the COPYING file for
// more details.

//! Parallel iterator for `OsmPbfReader`.

use std::collections::VecDeque;
use {Result, OsmObj};
use futures::Future;
use futures_cpupool::{CpuPool, CpuFuture};

/// A parallel iterator over the `OsmObj` of an `OsmPbfReader`.
pub struct Iter<'a, R: 'a> {
    pool: CpuPool,
    queue: VecDeque<CpuFuture<Vec<Result<OsmObj>>, ()>>,
    blob_iter: ::reader::Blobs<'a, R>,
    obj_iter: ::std::vec::IntoIter<Result<OsmObj>>,
}
impl<'a, R> Iter<'a, R>
    where R: ::std::io::Read
{
    /// Creates a parallel iterator.
    pub fn new(reader: &mut ::reader::OsmPbfReader<R>) -> Iter<R> {
        let num_threads = ::num_cpus::get();
        let mut res = Iter {
            pool: CpuPool::new(num_threads),
            queue: VecDeque::new(),
            blob_iter: reader.blobs(),
            obj_iter: vec![].into_iter(),
        };
        for _ in 0..num_threads * 2 {
            res.push_block();
        }
        res
    }
    fn push_block(&mut self) {
        let future = match self.blob_iter.next() {
            None => return,
            Some(Err(e)) => self.pool.spawn_fn(move || Ok(vec![Err(e)])),
            Some(Ok(blob)) => {
                self.pool.spawn_fn(move || {
                    let block = match ::reader::primitive_block_from_blob(&blob) {
                        Ok(b) => b,
                        Err(e) => return Ok(vec![Err(e)]),
                    };
                    let res = ::blocks::iter(&block).map(Ok).collect();
                    Ok(res)
                })
            }
        };
        self.queue.push_back(future);
    }
}

impl<'a, R> Iterator for Iter<'a, R>
    where R: ::std::io::Read
{
    type Item = Result<OsmObj>;
    fn next(&mut self) -> Option<Self::Item> {
        loop {
            if let Some(obj) = self.obj_iter.next() {
                return Some(obj);
            }
            let v = match self.queue.pop_front() {
                Some(f) => f.wait().unwrap(),
                None => return None,
            };
            self.obj_iter = v.into_iter();
            self.push_block();
        }
    }
}