extern crate flate2;
extern crate protobuf;
mod fileformat; mod osmformat;
mod osmtypes;
mod cbindgen;
use fileformat::*;
pub use osmtypes::*;
use flate2::read::ZlibDecoder;
use std::fs::File;
use std::io::{BufReader, Read};
use std::path::Path;
use std::thread;
use std::thread::JoinHandle;
use std::option;
use std::sync::mpsc;
use std::result::Result;
use std::collections::VecDeque;
macro_rules! pbf_min {
($x:expr, $y:expr) => ( if $x < $y { $x } else { $y } );
}
type BlobData = option::Option<(BlobHeader, Vec<u8>, BlobPosition)>;
pub fn read_pbf(
fname: &String,
threads: usize,
result_sender: &mut mpsc::Sender<PBFData>,
) -> Result<String, String> {
let f = match File::open(&Path::new(fname)) {
Ok(file) => file,
Err(e) => {
println!("Failed to open file '{}': {}", fname, e);
return Err("Failed to open file".to_string());
}
};
let reader = BufReader::new(f);
return read_pbf_data(reader, threads, result_sender);
}
pub fn read_pbf_data<R: Read + 'static>(
reader: R,
threads: usize,
result_sender: &mut mpsc::Sender<PBFData>,
) -> Result<String, String>
where
R: std::marker::Send,
{
let (tx, rx) = mpsc::channel::<BlobData>();
let rh = thread::spawn(move || read_blob_headers(reader, threads, &tx));
match read_header_block(&rx, &mut result_sender.clone()) {
Err(str) => return Err(str),
Ok(_) => {}
}
schedule_primitives_reads(threads, rx, &mut result_sender.clone());
rh.join().unwrap();
result_sender
.send(PBFData::ParseEnd)
.expect("failed to send parse end sign.");
Ok("pbf-read-ok".to_string())
}
pub fn readb(sz: usize, reader: &mut dyn Read) -> Vec<u8> {
let mut v = Vec::<u8>::with_capacity(sz);
let mut r = reader.take(sz as u64);
match r.read_to_end(&mut v) {
Ok(_) => {}
Err(e) => println!("Read error {}", e),
}
v
}
fn read_blob_headers<R: Read>(mut reader: R, threads: usize, tx: &mpsc::Sender<BlobData>) {
let mut count = 0;
let mut bytes = 0;
let mut vh: VecDeque<JoinHandle<()>> = VecDeque::with_capacity(threads + 10);
loop {
match read_blob_header(&mut reader, bytes, &tx) {
Err(_) => {
break;
}
Ok((read, handle)) => {
bytes = bytes + read;
vh.push_back(handle);
if vh.len() > threads || count == 0 {
vh.pop_front().unwrap().join().unwrap();
}
count = count + 1;
}
}
}
for h in vh {
h.join().unwrap();
}
tx.send(None).unwrap();
println!("Read {} blobs, {} bytes.", count, bytes);
}
fn read_blob_header(
reader: &mut dyn Read,
current_offset: usize,
tx: &mpsc::Sender<BlobData>,
) -> Result<(usize, JoinHandle<()>), usize> {
let mut mreader = reader;
let mut read = 0;
let szv = readb(4, &mut mreader);
read = read + szv.len();
if szv.len() != 4 {
return Err(read);
}
let blhd = readb(szv[3] as usize, &mut mreader);
read = read + blhd.len();
let mut blob_pos = BlobPosition {
start: current_offset + read,
size: 0,
};
match protobuf::parse_from_bytes::<BlobHeader>(&blhd) {
Ok(b) => {
let blob_data = readb(b.get_datasize() as usize, &mut mreader);
read = read + blob_data.len();
blob_pos.size = blob_data.len();
let tc = tx.clone();
let h = thread::spawn(move || {
match protobuf::parse_from_bytes::<Blob>(&blob_data) {
Ok(blob) => {
let mut decoder = ZlibDecoder::new(blob.get_zlib_data());
let mut uncompressed_data = Vec::<u8>::new(); match decoder.read_to_end(&mut uncompressed_data) {
Ok(_) => {
tc.send(Some((b, uncompressed_data, blob_pos))).unwrap();
}
Err(e) => println!("block error {}", e),
}
}
Err(e) => println!("blob error {}", e),
}
});
return Ok((read, h));
}
Err(e) => println!("header error {} ", e),
}
Err(read)
}
fn read_header_block(
rx: &mpsc::Receiver<BlobData>,
result_sender: &mut mpsc::Sender<PBFData>,
) -> Result<String, String> {
match rx.recv() {
Ok(message) => match message {
Some((b, bdata, blob_pos)) => {
if b.get_field_type() != "OSMHeader" {
println!("Unexpected BlobHeader type: {}", b.get_field_type());
return Err("Unexpected BlobHeader type.".to_string());
} else {
match protobuf::parse_from_bytes::<osmformat::HeaderBlock>(&bdata) {
Ok(hb) => {
println!(
"!!!! HeaderBlock OK \n req. features: {:?} \n \
optional_features: {:?}",
hb.get_required_features(),
hb.get_optional_features()
);
let bbox = hb.get_bbox();
let microseconds = 1000000000.;
let bbox = LBox {
top_left: Coord {
lat: bbox.get_top() as CoordType / microseconds,
lon: bbox.get_left() as CoordType / microseconds,
},
bottom_right: Coord {
lat: bbox.get_bottom() as CoordType / microseconds,
lon: bbox.get_right() as CoordType / microseconds,
},
};
&result_sender
.send(PBFData::PbfInfo(PbfInfo {
bbox: bbox,
position: blob_pos,
}))
.expect("info send error");
for f in hb.get_required_features().clone() {
if !(f == "OsmSchema-V0.6" || f == "DenseNodes") {
println!("Error, unsuported feature {}", f);
return Err("Error, unsuported feature".to_string());
}
}
}
Err(_) => return Err("Failed to get header.block".to_string()),
}
}
}
None => {
return Err("Failed to get blob header.".to_string());
}
},
Err(_) => {
return Err("Failed to receive blob header.".to_string());
}
}
Ok("".to_string())
}
fn schedule_primitives_reads(
threads: usize,
rx: mpsc::Receiver<BlobData>,
result_sender: &mut mpsc::Sender<PBFData>,
) {
let mut id = 0;
let mut handles = VecDeque::new();
loop {
id = id + 1;
match rx.recv() {
Ok(optional) => match optional {
Some((header, bdata, blob_pos)) => {
if header.get_field_type() != "OSMData" {
println!("Unexpected BlobHeader type: {}", header.get_field_type());
return;
} else {
let data_tx = result_sender.clone();
let handle = thread::spawn(move || {
parse_primitive_block(&bdata, blob_pos, &data_tx, id);
});
handles.push_back(handle);
if handles.len() > threads {
handles.pop_front().unwrap().join().unwrap();
}
}
}
None => {
println!("Finished main parsing thread.");
break;
}
},
Err(e) => {
println!("get data error: {}", e);
break;
}
}
}
for handle in handles {
handle.join().unwrap();
}
println!("Parsed {} blobs!", id);
}
fn parse_primitive_block(
bdata: &Vec<u8>,
blob_pos: BlobPosition,
results_tx: &mpsc::Sender<PBFData>,
string_table_id: IDType,
) {
match protobuf::parse_from_bytes::<osmformat::PrimitiveBlock>(&bdata) {
Ok(pb) => {
read_strings(&pb, string_table_id, &results_tx);
for g in pb.get_primitivegroup() {
let mut bbox1: Option<LBox> = None;
if g.has_dense() {
bbox1 = read_dense_nodes(&pb, string_table_id, g.get_dense(), &results_tx);
}
let bbox2 = read_nodes(&pb, string_table_id, &g.get_nodes(), &results_tx);
send_blob_info(bbox1, bbox2, &blob_pos, &results_tx);
read_ways(string_table_id, &g.get_ways(), &results_tx);
read_relations(string_table_id, &g.get_relations(), &results_tx);
}
}
Err(e) => {
println!("PrimitiveBlock error {}", e);
}
}
}
fn send_blob_info(
bbox1: Option<LBox>,
bbox2: Option<LBox>,
blob_pos: &BlobPosition,
results_tx: &mpsc::Sender<PBFData>,
) {
match bbox1 {
Some(m1) => match bbox2 {
Some(m2) => {
&results_tx
.send(PBFData::PbfInfo(PbfInfo {
bbox: m1.union_with_box(&m2),
position: blob_pos.clone(),
}))
.expect("info send error2");
}
None => {
&results_tx
.send(PBFData::PbfInfo(PbfInfo {
bbox: m1,
position: blob_pos.clone(),
}))
.expect("info send error3");
}
},
None => match bbox2 {
Some(m2) => {
&results_tx
.send(PBFData::PbfInfo(PbfInfo {
bbox: m2,
position: blob_pos.clone(),
}))
.expect("info send error4");
}
None => {}
},
}
}
fn read_ways(strings_id: IDType, ways: &[osmformat::Way], data_tx: &mpsc::Sender<PBFData>) {
let mut set = Vec::with_capacity(ways.len());
for w in ways {
let mut nodes = Vec::with_capacity(w.get_refs().len());
let mut delta = 0;
for i in w.get_refs() {
delta = delta + i;
nodes.push(delta as IDType);
}
let way = Way {
nodes: nodes,
tags: read_tags(&w.get_keys(), &w.get_vals(), strings_id),
};
set.push((w.get_id() as IDType, way));
}
if !set.is_empty() {
data_tx.send(PBFData::WaysSet(set)).unwrap();
}
}
fn read_tags(keys: &[u32], vals: &[u32], strings_id: IDType) -> Tags {
let length = pbf_min!(keys.len(), vals.len());
if keys.len() != vals.len() {
println!(
"Error: failed to get tags for string table {} because {} != {}",
strings_id,
keys.len(),
vals.len()
);
}
let mut tags = Vec::with_capacity(length);
for i in 0..length {
let k = keys[i] as IDType;
let v = vals[i] as IDType;
tags.push(Tag { key: k, val: v });
}
return Tags {
string_table_id: strings_id,
tags: tags,
};
}
fn read_kvtags(keys_vals: &[i32], strings_id: IDType) -> VecDeque<Tags> {
let length = keys_vals.len();
let mut tags_index = VecDeque::with_capacity(length);
let mut tags = Vec::new();
let mut i = 0;
while i < length {
let k = keys_vals[i] as IDType;
if k > 0 {
let v = keys_vals[i + 1] as IDType;
if v > 0 {
tags.push(Tag { key: k, val: v });
i = i + 1;
} else {
println!("Error value tag is <= 0");
}
} else {
tags_index.push_back(Tags {
string_table_id: strings_id,
tags: tags,
});
tags = Vec::new();
}
i = i + 1;
}
tags_index.push_back(Tags {
string_table_id: strings_id,
tags: Vec::new(),
});
tags_index
}
fn read_string_table(string_table: &osmformat::StringTable) -> Vec<std::string::String> {
let bytes = string_table.get_s();
if !bytes.is_empty() {
let mut strings = Vec::with_capacity(bytes.len());
for byte_str in bytes {
match String::from_utf8(byte_str.clone()) {
Ok(s1) => {
strings.push(s1);
}
Err(error) => {
println!(
"Error: failed to convert UTF8 to string {:?} err {:?}",
byte_str, error
);
strings.push("".to_string());
}
}
}
return strings;
}
Vec::new()
}
fn read_strings(
pb: &osmformat::PrimitiveBlock,
strings_id: IDType,
data_tx: &mpsc::Sender<PBFData>,
) {
let string_table = read_string_table(pb.get_stringtable());
let strings = PBFData::Strings(
strings_id,
Strings {
strings: string_table,
},
);
let dtx = data_tx.clone();
match dtx.send(strings) {
Ok(_) => {}
Err(e) => println!("Error: failed to send data {:?}", e),
}
}
fn read_nodes(
pb: &osmformat::PrimitiveBlock,
strings_id: IDType,
nodes: &[osmformat::Node],
data_tx: &mpsc::Sender<PBFData>,
) -> Option<LBox> {
let mut set = Vec::with_capacity(nodes.len());
let mut minmax: Option<LBox> = None;
for (i, n) in nodes.iter().enumerate() {
let offset = Tup {
lat: pb.get_lat_offset(),
lon: pb.get_lon_offset(),
};
let inode = Tup {
lat: n.get_lat(),
lon: n.get_lon(),
};
let fnode = Node {
coord: dense_node_to_node(&inode, pb.get_granularity() as LType, &offset),
tags: read_tags(&n.get_keys(), &n.get_vals(), strings_id),
};
if i == 0 {
minmax = Some(LBox {
top_left: fnode.coord.clone(),
bottom_right: fnode.coord.clone(),
});
} else {
match minmax.as_mut() {
Some(m) => m.union_with_coord(&fnode.coord),
None => {}
}
}
set.push((n.get_id() as IDType, fnode));
}
if !set.is_empty() {
data_tx.send(PBFData::NodesSet(set)).unwrap();
}
minmax
}
fn read_dense_nodes(
pb: &osmformat::PrimitiveBlock,
strings_id: IDType,
dense: &osmformat::DenseNodes,
data_tx: &mpsc::Sender<PBFData>,
) -> Option<LBox> {
let lats = &dense.get_lat();
let lons = &dense.get_lon();
let ids = &dense.get_id();
let mut tags_index = read_kvtags(&dense.get_keys_vals(), strings_id);
let count = pbf_min!(pbf_min!(lats.len(), lons.len()), ids.len());
if ids.len() != count || lats.len() != count || lons.len() != count || count == 0
|| count >= tags_index.len()
{
println!("Error: failed to parse dense {:?}", dense);
println!(
"Dense length: id {} lat {} lon {} keys_vals {} > tags {}",
ids.len(),
lats.len(),
lons.len(),
dense.get_keys_vals().len(),
tags_index.len()
);
return None;
}
let offset = Tup {
lat: pb.get_lat_offset(),
lon: pb.get_lon_offset(),
};
let mut minmax: Option<LBox> = None;
let mut set = Vec::with_capacity(count);
let mut coord = Tup { lat: 0, lon: 0 };
let mut node_id = 0;
for i in 0..count {
coord.lat = lats[i] + coord.lat;
coord.lon = lons[i] + coord.lon;
node_id = ids[i] + node_id;
let tags = match tags_index.pop_front() {
Some(existing_tag) => existing_tag,
None => Tags {
string_table_id: strings_id,
tags: Vec::new(),
},
};
let fnode = Node {
coord: dense_node_to_node(&coord, pb.get_granularity() as LType, &offset),
tags: tags,
};
if i == 0 {
minmax = Some(LBox {
top_left: fnode.coord.clone(),
bottom_right: fnode.coord.clone(),
});
} else {
match minmax.as_mut() {
Some(m) => m.union_with_coord(&fnode.coord),
None => {}
}
}
set.push((node_id as IDType, fnode));
}
if !set.is_empty() {
data_tx.send(PBFData::NodesSet(set)).unwrap();
}
minmax
}
fn dense_coord_to_coord(coord: LType, granularity: LType, off: LType) -> CoordType {
let div = 1000000000.0;
(coord * granularity + off) as CoordType / div
}
fn dense_node_to_node(dnode: &Tup, granularity: LType, offset: &Tup) -> Coord {
Coord {
lat: dense_coord_to_coord(dnode.lat, granularity, offset.lat),
lon: dense_coord_to_coord(dnode.lon, granularity, offset.lon),
}
}
fn read_relations(
strings_id: IDType,
relations: &[osmformat::Relation],
data_tx: &mpsc::Sender<PBFData>,
) {
let mut set = Vec::with_capacity(relations.len());
for rel in relations {
let count = pbf_min!(
pbf_min!(rel.get_memids().len(), rel.get_types().len()),
rel.get_roles_sid().len()
);
if count != rel.get_memids().len() || count != rel.get_types().len()
|| count != rel.get_roles_sid().len()
{
println!("Warning: Relation data is inconsistent {:?}", rel);
}
let mut members = Vec::with_capacity(count);
let tags = read_tags(rel.get_keys(), rel.get_vals(), strings_id);
let mut member = RelationMemeber {
member_id: 0,
member_type: RelationMemeberType::Node,
role_id: 0,
};
for i in 0..count {
member.member_id = (member.member_id as i64 + rel.get_memids()[i]) as IDType;
member.member_type = match rel.get_types()[i] as osmformat::Relation_MemberType {
osmformat::Relation_MemberType::NODE => RelationMemeberType::Node,
osmformat::Relation_MemberType::WAY => RelationMemeberType::Way,
osmformat::Relation_MemberType::RELATION => RelationMemeberType::Relation,
};
member.role_id = rel.get_roles_sid()[i] as IDType;
members.push(member.clone());
}
let pbf_relation = Relation {
tags: tags,
members: members,
};
set.push((rel.get_id() as IDType, pbf_relation));
}
if !set.is_empty() {
data_tx.send(PBFData::RelationsSet(set)).unwrap();
}
}
#[test]
fn test_read_kvtags() {
{
let tags = read_kvtags(&[0, 0, 0, 0, 3, 4, 5, 6, 0], 5);
println!("\n\nread tags {:?}\n", tags);
assert_eq!(
tags[0],
Tags {
string_table_id: 5,
tags: vec![],
}
);
assert_eq!(
tags[1],
Tags {
string_table_id: 5,
tags: vec![],
}
);
assert_eq!(
tags[2],
Tags {
string_table_id: 5,
tags: vec![],
}
);
assert_eq!(
tags[3],
Tags {
string_table_id: 5,
tags: vec![],
}
);
assert_eq!(
tags[4],
Tags {
string_table_id: 5,
tags: vec![Tag { key: 3, val: 4 }, Tag { key: 5, val: 6 }],
}
);
assert_eq!(
tags[5],
Tags {
string_table_id: 5,
tags: vec![],
}
);
}
{
let tags = read_kvtags(&[3, 4, 0], 5);
println!("\n\nread tags {:?}\n", tags);
assert_eq!(
tags[0],
Tags {
string_table_id: 5,
tags: vec![Tag { key: 3, val: 4 }],
}
);
assert_eq!(
tags[1],
Tags {
string_table_id: 5,
tags: vec![],
}
);
}
{
let tags = read_kvtags(&[3, 4, 5, 6, 0], 57);
println!("\n\nread tags {:?}\n", tags);
assert_eq!(
tags[0],
Tags {
string_table_id: 57,
tags: vec![Tag { key: 3, val: 4 }, Tag { key: 5, val: 6 }],
}
);
assert_eq!(
tags[1],
Tags {
string_table_id: 57,
tags: vec![],
}
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::mpsc;
use std::thread;
use std::collections::HashMap;
#[test]
#[ignore]
fn zero_main() {
let (mut node_tx, node_rx) = mpsc::channel::<PBFData>();
let result = read_pbf(&"src/sample.pbf".to_string(), 0, &mut node_tx);
result.expect("Read error; ");
let mut count = 0;
loop {
let r = node_rx.recv().unwrap();
println!("main {:?}", r);
if let PBFData::ParseEnd = r {
break;
}
count = count + 1;
}
assert_eq!(count, 343);
}
#[test]
#[ignore]
fn zero_bg() {
let (mut node_tx, node_rx) = mpsc::channel::<PBFData>();
let h = thread::spawn(move || {
let result = read_pbf(&"src/sample.pbf".to_string(), 0, &mut node_tx);
result.expect("Read error; ");
});
let mut count = 0;
loop {
if let PBFData::ParseEnd = node_rx.recv().unwrap() {
break;
}
count = count + 1;
}
assert_eq!(count, 343);
h.join().expect("join error ");
}
#[test]
fn simple_count() {
let (mut node_tx, node_rx) = mpsc::channel::<PBFData>();
let h = thread::spawn(move || {
return read_pbf(&"src/sample.pbf".to_string(), 10, &mut node_tx);
});
let mut nodes_count = 0;
let mut tags_count = 0;
let mut way_count = 0;
let mut rel_count = 0;
let mut info_count = 0;
let mut strings_count = 0;
let mut total_strings_count = 0;
let mut strings_index = HashMap::new();
loop {
match node_rx.recv() {
Ok(pbfdata) => {
println!("---------------------------------------------------------");
match pbfdata {
PBFData::NodesSet(set) => {
for (id, node) in set {
nodes_count = nodes_count + 1;
tags_count = tags_count + node.tags.tags.len();
if !node.tags.tags.is_empty() {
match strings_index.get(&node.tags.string_table_id) {
Some(smap) => {
let kvs = node.tags.get_keys_vals(smap);
assert_eq!(kvs.len(), node.tags.tags.len());
for (k, v) in kvs {
println!("Node id {} tags [{}: {}] ", id, k, v);
}
}
None => {
println!(
"Error: failed to find strings_id {} \n {:?}",
node.tags.string_table_id, strings_index
);
}
}
}
}
}
PBFData::WaysSet(set) => {
for (id, way) in set {
way_count = way_count + 1;
tags_count = tags_count + way.tags.tags.len();
if !way.tags.tags.is_empty() {
match strings_index.get(&way.tags.string_table_id) {
Some(smap) => {
let kvs = way.tags.get_keys_vals(smap);
assert_eq!(kvs.len(), way.tags.tags.len());
for (k, v) in kvs {
println!("Way id {} tags [{}: {}] ", id, k, v);
}
}
None => {
println!(
"Error: failed to find strings_id {} \n {:?}",
way.tags.string_table_id, strings_index
);
}
}
}
}
}
PBFData::RelationsSet(set) => {
for (id, relation) in set {
rel_count = rel_count + relation.members.len();
tags_count = tags_count + relation.tags.tags.len();
if !relation.tags.tags.is_empty() {
match strings_index.get(&relation.tags.string_table_id) {
Some(smap) => {
let kvs = relation.tags.get_keys_vals(smap);
assert_eq!(kvs.len(), relation.tags.tags.len());
for (k, v) in kvs {
println!(
"Relation id {} tags: [{}: {}] ",
id, k, v
);
}
}
None => {
println!(
"Error: failed to find strings_id {} \n {:?}",
relation.tags.string_table_id, strings_index
);
}
}
}
}
}
PBFData::Strings(id, strings) => {
strings_count = strings_count + 1;
total_strings_count = total_strings_count + strings.strings.len();
match strings_index.insert(id, strings) {
Some(ostrings) => {
println!("strings id {} exists!!!! {:?} ", id, ostrings);
}
None => {
println!("Got strings {}", id);
}
}
}
PBFData::PbfInfo(info) => {
info_count = info_count + 1;
println!("BBox lrtb {:?}", info.bbox);
if info_count == 1 {
let bbox = LBox {
top_left: Coord {
lat: 51.766859700,
lon: -0.235376100,
},
bottom_right: Coord {
lat: 51.764840700,
lon: -0.228513400,
},
};
assert_eq!(info.bbox, bbox);
assert_eq!(
info.position,
BlobPosition {
start: 18,
size: 146,
}
);
} else if info_count == 2 {
let bbox = LBox {
top_left: Coord {
lat: 51.7600489,
lon: -0.2415577,
},
bottom_right: Coord {
lat: 51.7742478,
lon: -0.2162900,
},
};
assert_eq!(info.bbox, bbox);
assert_eq!(
info.position,
BlobPosition {
start: 180,
size: 5940,
}
);
}
}
PBFData::ParseEnd => {
println!("got parse end!");
break;
}
}
}
Err(e) => {
println!("Receive errro {:?}", e);
}
}
}
assert_eq!(nodes_count, 290);
assert_eq!(way_count, 44);
assert_eq!(rel_count, 242);
assert_eq!(info_count, 2);
assert_eq!(strings_count, 3);
assert_eq!(total_strings_count, 184);
assert_eq!(tags_count, 212);
let t = h.join().unwrap();
match t {
Ok(_) => {
println!("DONE!!!!");
}
Err(s) => {
println!("Read pbf error: '{}'", s);
panic!();
}
}
}
}