use crate::block::{HeaderBlock, PrimitiveBlock};
use crate::error::{new_blob_error, new_error, new_protobuf_error, BlobError, ErrorKind, Result};
use crate::proto::fileformat;
use byteorder::ReadBytesExt;
use protobuf::Message;
use std::fs::File;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::path::Path;
use flate2::read::ZlibDecoder;
pub static MAX_BLOB_HEADER_SIZE: u64 = 64 * 1024;
pub static MAX_BLOB_MESSAGE_SIZE: u64 = 32 * 1024 * 1024;
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum BlobType<'a> {
OsmHeader,
OsmData,
Unknown(&'a str),
}
impl<'a> BlobType<'a> {
pub const fn as_str(&self) -> &'a str {
match self {
Self::OsmHeader => "OSMHeader",
Self::OsmData => "OSMData",
Self::Unknown(x) => x,
}
}
}
#[derive(Clone, Debug)]
pub enum BlobDecode<'a> {
OsmHeader(Box<HeaderBlock>),
OsmData(PrimitiveBlock),
Unknown(&'a str),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct ByteOffset(pub u64);
#[derive(Clone, Debug)]
pub struct Blob {
header: fileformat::BlobHeader,
blob: fileformat::Blob,
offset: Option<ByteOffset>,
}
impl Blob {
fn new(
header: fileformat::BlobHeader,
blob: fileformat::Blob,
offset: Option<ByteOffset>,
) -> Blob {
Blob {
header,
blob,
offset,
}
}
pub fn decode(&self) -> Result<BlobDecode<'_>> {
match self.get_type() {
BlobType::OsmHeader => {
let block = Box::new(self.to_headerblock()?);
Ok(BlobDecode::OsmHeader(block))
}
BlobType::OsmData => {
let block = self.to_primitiveblock()?;
Ok(BlobDecode::OsmData(block))
}
BlobType::Unknown(x) => Ok(BlobDecode::Unknown(x)),
}
}
pub fn get_type(&self) -> BlobType<'_> {
match self.header.type_() {
x if x == BlobType::OsmHeader.as_str() => BlobType::OsmHeader,
x if x == BlobType::OsmData.as_str() => BlobType::OsmData,
x => BlobType::Unknown(x),
}
}
pub fn offset(&self) -> Option<ByteOffset> {
self.offset
}
pub fn to_headerblock(&self) -> Result<HeaderBlock> {
decode_blob(&self.blob).map(HeaderBlock::new)
}
pub fn to_primitiveblock(&self) -> Result<PrimitiveBlock> {
decode_blob(&self.blob).map(PrimitiveBlock::new)
}
}
#[derive(Clone, Debug)]
pub struct BlobHeader {
header: fileformat::BlobHeader,
}
impl BlobHeader {
fn new(header: fileformat::BlobHeader) -> Self {
BlobHeader { header }
}
pub fn blob_type(&self) -> BlobType<'_> {
match self.header.type_() {
"OSMHeader" => BlobType::OsmHeader,
"OSMData" => BlobType::OsmData,
x => BlobType::Unknown(x),
}
}
pub fn get_blob_size(&self) -> i32 {
self.header.datasize()
}
}
#[derive(Clone, Debug)]
pub struct BlobReader<R: Read + Send> {
reader: R,
offset: Option<ByteOffset>,
last_blob_ok: bool,
}
impl<R: Read + Send> BlobReader<R> {
pub fn new(reader: R) -> BlobReader<R> {
BlobReader {
reader,
offset: None,
last_blob_ok: true,
}
}
fn read_blob_header(&mut self) -> Option<Result<fileformat::BlobHeader>> {
let header_size: u64 = match self.reader.read_u32::<byteorder::BigEndian>() {
Ok(n) => {
self.offset = self.offset.map(|x| ByteOffset(x.0 + 4));
u64::from(n)
}
Err(e) => {
self.offset = None;
return match e.kind() {
::std::io::ErrorKind::UnexpectedEof => {
None
}
_ => {
self.last_blob_ok = false;
Some(Err(new_blob_error(BlobError::InvalidHeaderSize)))
}
};
}
};
if header_size >= MAX_BLOB_HEADER_SIZE {
self.last_blob_ok = false;
return Some(Err(new_blob_error(BlobError::HeaderTooBig {
size: header_size,
})));
}
let mut reader = self.reader.by_ref().take(header_size);
let header = match fileformat::BlobHeader::parse_from_reader(&mut reader) {
Ok(header) => header,
Err(e) => {
self.offset = None;
self.last_blob_ok = false;
return Some(Err(new_protobuf_error(e, "blob header")));
}
};
self.offset = self.offset.map(|x| ByteOffset(x.0 + header_size));
Some(Ok(header))
}
}
impl BlobReader<BufReader<File>> {
pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self> {
let f = File::open(path)?;
let reader = BufReader::new(f);
Ok(BlobReader {
reader,
offset: Some(ByteOffset(0)),
last_blob_ok: true,
})
}
}
impl<R: Read + Send> Iterator for BlobReader<R> {
type Item = Result<Blob>;
fn next(&mut self) -> Option<Self::Item> {
if !self.last_blob_ok {
return None;
}
let prev_offset = self.offset;
let header = match self.read_blob_header() {
Some(Ok(header)) => header,
Some(Err(err)) => return Some(Err(err)),
None => return None,
};
let mut reader = self.reader.by_ref().take(header.datasize() as u64);
let blob = match fileformat::Blob::parse_from_reader(&mut reader) {
Ok(blob) => blob,
Err(e) => {
self.offset = None;
self.last_blob_ok = false;
return Some(Err(new_protobuf_error(e, "blob content")));
}
};
self.offset = self
.offset
.map(|x| ByteOffset(x.0 + header.datasize() as u64));
Some(Ok(Blob::new(header, blob, prev_offset)))
}
}
impl<R: Read + Seek + Send> BlobReader<R> {
pub fn new_seekable(mut reader: R) -> Result<BlobReader<R>> {
let pos = reader.stream_position()?;
Ok(BlobReader {
reader,
offset: Some(ByteOffset(pos)),
last_blob_ok: true,
})
}
pub fn blob_from_offset(&mut self, pos: ByteOffset) -> Result<Blob> {
self.seek(pos)?;
self.next().unwrap_or_else(|| {
Err(new_error(ErrorKind::Io(::std::io::Error::new(
::std::io::ErrorKind::UnexpectedEof,
"no blob at this stream position",
))))
})
}
pub fn seek(&mut self, pos: ByteOffset) -> Result<()> {
match self.reader.seek(SeekFrom::Start(pos.0)) {
Ok(offset) => {
self.offset = Some(ByteOffset(offset));
Ok(())
}
Err(e) => {
self.offset = None;
Err(e.into())
}
}
}
pub fn seek_raw(&mut self, pos: SeekFrom) -> Result<u64> {
match self.reader.seek(pos) {
Ok(offset) => {
self.offset = Some(ByteOffset(offset));
Ok(offset)
}
Err(e) => {
self.offset = None;
Err(e.into())
}
}
}
pub fn next_header_skip_blob(&mut self) -> Option<Result<(BlobHeader, Option<ByteOffset>)>> {
if !self.last_blob_ok {
return None;
}
let prev_offset = self.offset;
let header = match self.read_blob_header() {
Some(Ok(header)) => header,
Some(Err(err)) => return Some(Err(err)),
None => return None,
};
if let Err(err) = self.seek_raw(SeekFrom::Current(header.datasize() as i64)) {
self.last_blob_ok = false;
return Some(Err(err));
}
Some(Ok((BlobHeader::new(header), prev_offset)))
}
}
impl BlobReader<BufReader<File>> {
pub fn seekable_from_path<P: AsRef<Path>>(path: P) -> Result<BlobReader<BufReader<File>>> {
let f = File::open(path.as_ref())?;
let buf_reader = BufReader::new(f);
Self::new_seekable(buf_reader)
}
}
pub(crate) fn decode_blob<T: Message>(blob: &fileformat::Blob) -> Result<T> {
if blob.has_raw() {
let size = blob.raw().len() as u64;
if size < MAX_BLOB_MESSAGE_SIZE {
T::parse_from_bytes(blob.raw()).map_err(|e| new_protobuf_error(e, "raw blob data"))
} else {
Err(new_blob_error(BlobError::MessageTooBig { size }))
}
} else if blob.has_zlib_data() {
let mut decoder = ZlibDecoder::new(blob.zlib_data()).take(MAX_BLOB_MESSAGE_SIZE);
T::parse_from_reader(&mut decoder).map_err(|e| new_protobuf_error(e, "blob zlib data"))
} else {
Err(new_blob_error(BlobError::Empty))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_type() {
let pairs = [
("", BlobType::Unknown("")),
("abc", BlobType::Unknown("abc")),
("OSMHeader", BlobType::OsmHeader),
("OSMData", BlobType::OsmData),
];
for (string, blob_type) in &pairs {
let mut ff_header = fileformat::BlobHeader::new();
ff_header.set_type(string.to_string());
let ff_blob = fileformat::Blob::new();
let blob = Blob::new(ff_header, ff_blob, None);
assert_eq!(blob.get_type(), *blob_type);
}
}
}