osm_pbf/
parse.rs

1use std::{
2    io::{Cursor, ErrorKind, SeekFrom},
3    pin::Pin,
4};
5
6use crate::{
7    protos::fileformat::{mod_Blob::OneOfdata as Data, Blob, BlobHeader},
8    FileBlock, BLOB_HEADER_MAX_LEN, BLOB_MAX_LEN,
9};
10use async_compression::tokio::bufread::*;
11use async_stream::try_stream;
12use futures::{pin_mut, Stream, TryStreamExt};
13use quick_protobuf::{BytesReader, MessageRead};
14use thiserror::Error;
15use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
16
17/// Any error encountered in [parse_osm_pbf]
18#[derive(Error, Debug)]
19pub enum ParseError {
20    #[error(transparent)]
21    Io(#[from] tokio::io::Error),
22    #[error("Failed to deserialize protobuf message: {0}")]
23    Proto(#[from] quick_protobuf::Error),
24    #[error("BlobHeader is too long: {0} bytes")]
25    BlobHeaderExceedsMaxLength(usize),
26    #[error("Blob is too long: {0} bytes")]
27    BlobExceedsMaxLength(usize),
28    #[error("Blob is compressed with an unsupported algorithm: {0}")]
29    UnsupportedCompression(&'static str),
30}
31
32/// Parse blob headers + blobs on the fly
33fn stream_blobs<'a, R: AsyncRead + Unpin + Send + 'a>(
34    mut pbf_reader: R,
35) -> impl Stream<Item = Result<(BlobHeader, Blob), ParseError>> + Send + 'a {
36    try_stream! {
37        while let Some(blob_header_len) = read_blob_header_len(&mut pbf_reader).await? {
38            let mut buf = vec![0; blob_header_len];
39            pbf_reader.read_exact(buf.as_mut()).await?;
40            let header: BlobHeader = deserialize_from_slice(buf.as_slice())?;
41
42            let datasize = header.datasize as usize;
43            if datasize > BLOB_MAX_LEN {
44                Err(ParseError::BlobExceedsMaxLength(datasize))?;
45            }
46
47            let mut buf = vec![0; datasize];
48            pbf_reader.read_exact(buf.as_mut()).await?;
49            let body: Blob = deserialize_from_slice(buf.as_slice())?;
50
51            yield (header, body);
52        }
53    }
54}
55
56/// Same as [stream_blobs] but seeks past the blobs
57fn stream_blob_headers<'a, R: AsyncRead + Unpin + Send + AsyncSeek + 'a>(
58    mut pbf_reader: R,
59) -> impl Stream<Item = Result<(BlobHeader, SeekFrom), ParseError>> + Send + 'a {
60    try_stream! {
61        let mut current_seek = 0;
62        while let Some(blob_header_len) = read_blob_header_len(&mut pbf_reader).await? {
63            // i32 = 4 bytes
64            current_seek += 4;
65
66            let mut buf = vec![0; blob_header_len];
67            pbf_reader.read_exact(buf.as_mut()).await?;
68            let header: BlobHeader = deserialize_from_slice(buf.as_slice())?;
69            current_seek += blob_header_len;
70            let datasize = header.datasize as usize;
71            if datasize > BLOB_MAX_LEN {
72                Err(ParseError::BlobExceedsMaxLength(datasize))?;
73            }
74
75            pbf_reader.seek(SeekFrom::Current(datasize as i64)).await?;
76
77            yield (header, SeekFrom::Start(current_seek as u64));
78
79            current_seek += datasize;
80        }
81    }
82}
83
84/// Convenience function for getting the length of the next blob header, if any remain
85async fn read_blob_header_len<R: AsyncRead + Unpin + Send>(
86    mut pbf_reader: R,
87) -> Result<Option<usize>, ParseError> {
88    match pbf_reader.read_i32().await.map(|len| len as usize) {
89        Ok(len) if len > BLOB_HEADER_MAX_LEN => Err(ParseError::BlobHeaderExceedsMaxLength(len)),
90        Ok(len) => Ok(Some(len)),
91        Err(err) if err.kind() == ErrorKind::UnexpectedEof => Ok(None),
92        Err(err) => Err(err.into()),
93    }
94}
95
96/// Creates a stream for the decoded data from this block
97fn decode_blob(blob: Blob) -> Result<Pin<Box<dyn AsyncRead + Send>>, ParseError> {
98    Ok(match blob.data {
99        Data::raw(raw) => Box::pin(Cursor::new(raw)),
100        #[cfg(feature = "zlib")]
101        Data::zlib_data(data) => Box::pin(ZlibDecoder::new(Cursor::new(data))),
102        #[cfg(feature = "lzma")]
103        Data::lzma_data(data) => Box::pin(LzmaDecoder::new(Cursor::new(data))),
104        #[cfg(feature = "zstd")]
105        Data::zstd_data(data) => Box::pin(ZstdDecoder::new(Cursor::new(data))),
106        Data::None => Box::pin(Cursor::new(vec![])),
107        other => {
108            return Err(ParseError::UnsupportedCompression(match other {
109                Data::raw(_) | Data::None => unreachable!(),
110                Data::zlib_data(_) => "zlib",
111                Data::lzma_data(_) => "lzma",
112                #[allow(deprecated)]
113                Data::OBSOLETE_bzip2_data(_) => "bzip2",
114                Data::lz4_data(_) => "lz4",
115                Data::zstd_data(_) => "zstd",
116            }))
117        }
118    })
119}
120
121/// Blobs to blocks
122fn stream_osm_blocks<'a, R: AsyncRead + Unpin + Send + 'a>(
123    pbf_reader: R,
124) -> impl Stream<Item = Result<FileBlock, ParseError>> + Send + 'a {
125    try_stream! {
126        let blob_stream = stream_blobs(pbf_reader);
127        pin_mut!(blob_stream);
128        while let Some((blob_header, blob_body)) = blob_stream.try_next().await? {
129            let blob_body_len = blob_body.raw_size.unwrap_or_default() as usize;
130            if blob_body_len > BLOB_MAX_LEN {
131                Err(ParseError::BlobExceedsMaxLength(blob_body_len))?;
132            }
133            let mut buf = Vec::with_capacity(blob_body_len);
134            let mut blob_stream = decode_blob(blob_body)?;
135            blob_stream.read_to_end(&mut buf).await?;
136
137            match blob_header.type_pb.as_str() {
138                "OSMHeader" => {
139                    yield FileBlock::Header(deserialize_from_slice(buf.as_slice())?);
140                },
141                "OSMData" => {
142                    yield FileBlock::Primitive(deserialize_from_slice(buf.as_slice())?);
143                }
144                other => {
145                    yield FileBlock::Other { r#type: other.to_string(), bytes: buf, }
146                }
147            }
148        }
149    }
150}
151
152/// Location of a block in an [AsyncSeek]
153///
154/// This struct is used in a parallelized read workflow.
155#[derive(Debug, PartialEq, Eq, Clone)]
156pub struct FileBlockLocation {
157    /// Type of the block
158    pub r#type: String,
159
160    /// Absolute location of this block for use with an [AsyncSeek]
161    pub seek: SeekFrom,
162
163    /// Bytes in the [AsyncRead] to read for this block
164    pub len: usize,
165}
166
167/// Parse the PBF format into a stream of [FileBlock]s
168pub fn parse_osm_pbf<'a, R: AsyncRead + Unpin + Send + 'a>(
169    pbf_reader: R,
170) -> impl Stream<Item = Result<FileBlock, ParseError>> + Send + 'a {
171    stream_osm_blocks(pbf_reader)
172}
173
174/// Cursory examination of the data to get a stream of [FileBlockLocation]s
175///
176/// Use this in combination with [parse_osm_pbf_from_locations] for reading in parallel.
177pub fn get_osm_pbf_locations<'a, R: AsyncRead + AsyncSeek + Unpin + Send + 'a>(
178    pbf_reader: R,
179) -> impl Stream<Item = Result<FileBlockLocation, ParseError>> + Send + 'a {
180    try_stream! {
181        let headers = stream_blob_headers(pbf_reader);
182        pin_mut!(headers);
183
184        while let Some((header, seek)) = headers.try_next().await? {
185            yield FileBlockLocation { r#type: header.type_pb, seek, len: header.datasize as usize }
186        }
187    }
188}
189
190/// Parse the PBF format at a given [FileBlockLocation] to get a [FileBlock]
191///
192/// Use this in combination with [get_osm_pbf_locations] for reading in parallel.
193pub async fn parse_osm_pbf_at_location<'a, R: AsyncRead + AsyncSeek + Unpin + Send + 'a>(
194    mut pbf_reader: R,
195    location: FileBlockLocation,
196) -> Result<FileBlock, ParseError> {
197    pbf_reader.seek(location.seek).await?;
198
199    let mut buf = vec![0; location.len];
200    pbf_reader.read_exact(buf.as_mut()).await?;
201    let blob_body: Blob = deserialize_from_slice(buf.as_slice())?;
202
203    let blob_body_len = blob_body.raw_size.unwrap_or_default() as usize;
204    if location.len > BLOB_MAX_LEN {
205        Err(ParseError::BlobExceedsMaxLength(blob_body_len))?;
206    }
207
208    let mut buf = Vec::with_capacity(blob_body_len);
209    let mut blob_stream = decode_blob(blob_body)?;
210    blob_stream.read_to_end(&mut buf).await?;
211
212    Ok(match location.r#type.as_str() {
213        "OSMHeader" => FileBlock::Header(deserialize_from_slice(buf.as_slice())?),
214        "OSMData" => FileBlock::Primitive(deserialize_from_slice(buf.as_slice())?),
215        _ => FileBlock::Other {
216            r#type: location.r#type,
217            bytes: buf,
218        },
219    })
220}
221
222/// [quick_protobuf::reader::deserialize_from_slice] but doesn't read length
223fn deserialize_from_slice<'a, M: MessageRead<'a>>(
224    bytes: &'a [u8],
225) -> Result<M, quick_protobuf::Error> {
226    let mut reader = BytesReader::from_bytes(bytes);
227    reader.read_message_by_len(bytes, bytes.len())
228}