1use std::{
2 io::{Cursor, ErrorKind, SeekFrom},
3 pin::Pin,
4};
5
6use crate::{
7 protos::fileformat::{mod_Blob::OneOfdata as Data, Blob, BlobHeader},
8 FileBlock, BLOB_HEADER_MAX_LEN, BLOB_MAX_LEN,
9};
10use async_compression::tokio::bufread::*;
11use async_stream::try_stream;
12use futures::{pin_mut, Stream, TryStreamExt};
13use quick_protobuf::{BytesReader, MessageRead};
14use thiserror::Error;
15use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
16
17#[derive(Error, Debug)]
19pub enum ParseError {
20 #[error(transparent)]
21 Io(#[from] tokio::io::Error),
22 #[error("Failed to deserialize protobuf message: {0}")]
23 Proto(#[from] quick_protobuf::Error),
24 #[error("BlobHeader is too long: {0} bytes")]
25 BlobHeaderExceedsMaxLength(usize),
26 #[error("Blob is too long: {0} bytes")]
27 BlobExceedsMaxLength(usize),
28 #[error("Blob is compressed with an unsupported algorithm: {0}")]
29 UnsupportedCompression(&'static str),
30}
31
32fn stream_blobs<'a, R: AsyncRead + Unpin + Send + 'a>(
34 mut pbf_reader: R,
35) -> impl Stream<Item = Result<(BlobHeader, Blob), ParseError>> + Send + 'a {
36 try_stream! {
37 while let Some(blob_header_len) = read_blob_header_len(&mut pbf_reader).await? {
38 let mut buf = vec![0; blob_header_len];
39 pbf_reader.read_exact(buf.as_mut()).await?;
40 let header: BlobHeader = deserialize_from_slice(buf.as_slice())?;
41
42 let datasize = header.datasize as usize;
43 if datasize > BLOB_MAX_LEN {
44 Err(ParseError::BlobExceedsMaxLength(datasize))?;
45 }
46
47 let mut buf = vec![0; datasize];
48 pbf_reader.read_exact(buf.as_mut()).await?;
49 let body: Blob = deserialize_from_slice(buf.as_slice())?;
50
51 yield (header, body);
52 }
53 }
54}
55
56fn stream_blob_headers<'a, R: AsyncRead + Unpin + Send + AsyncSeek + 'a>(
58 mut pbf_reader: R,
59) -> impl Stream<Item = Result<(BlobHeader, SeekFrom), ParseError>> + Send + 'a {
60 try_stream! {
61 let mut current_seek = 0;
62 while let Some(blob_header_len) = read_blob_header_len(&mut pbf_reader).await? {
63 current_seek += 4;
65
66 let mut buf = vec![0; blob_header_len];
67 pbf_reader.read_exact(buf.as_mut()).await?;
68 let header: BlobHeader = deserialize_from_slice(buf.as_slice())?;
69 current_seek += blob_header_len;
70 let datasize = header.datasize as usize;
71 if datasize > BLOB_MAX_LEN {
72 Err(ParseError::BlobExceedsMaxLength(datasize))?;
73 }
74
75 pbf_reader.seek(SeekFrom::Current(datasize as i64)).await?;
76
77 yield (header, SeekFrom::Start(current_seek as u64));
78
79 current_seek += datasize;
80 }
81 }
82}
83
84async fn read_blob_header_len<R: AsyncRead + Unpin + Send>(
86 mut pbf_reader: R,
87) -> Result<Option<usize>, ParseError> {
88 match pbf_reader.read_i32().await.map(|len| len as usize) {
89 Ok(len) if len > BLOB_HEADER_MAX_LEN => Err(ParseError::BlobHeaderExceedsMaxLength(len)),
90 Ok(len) => Ok(Some(len)),
91 Err(err) if err.kind() == ErrorKind::UnexpectedEof => Ok(None),
92 Err(err) => Err(err.into()),
93 }
94}
95
96fn decode_blob(blob: Blob) -> Result<Pin<Box<dyn AsyncRead + Send>>, ParseError> {
98 Ok(match blob.data {
99 Data::raw(raw) => Box::pin(Cursor::new(raw)),
100 #[cfg(feature = "zlib")]
101 Data::zlib_data(data) => Box::pin(ZlibDecoder::new(Cursor::new(data))),
102 #[cfg(feature = "lzma")]
103 Data::lzma_data(data) => Box::pin(LzmaDecoder::new(Cursor::new(data))),
104 #[cfg(feature = "zstd")]
105 Data::zstd_data(data) => Box::pin(ZstdDecoder::new(Cursor::new(data))),
106 Data::None => Box::pin(Cursor::new(vec![])),
107 other => {
108 return Err(ParseError::UnsupportedCompression(match other {
109 Data::raw(_) | Data::None => unreachable!(),
110 Data::zlib_data(_) => "zlib",
111 Data::lzma_data(_) => "lzma",
112 #[allow(deprecated)]
113 Data::OBSOLETE_bzip2_data(_) => "bzip2",
114 Data::lz4_data(_) => "lz4",
115 Data::zstd_data(_) => "zstd",
116 }))
117 }
118 })
119}
120
121fn stream_osm_blocks<'a, R: AsyncRead + Unpin + Send + 'a>(
123 pbf_reader: R,
124) -> impl Stream<Item = Result<FileBlock, ParseError>> + Send + 'a {
125 try_stream! {
126 let blob_stream = stream_blobs(pbf_reader);
127 pin_mut!(blob_stream);
128 while let Some((blob_header, blob_body)) = blob_stream.try_next().await? {
129 let blob_body_len = blob_body.raw_size.unwrap_or_default() as usize;
130 if blob_body_len > BLOB_MAX_LEN {
131 Err(ParseError::BlobExceedsMaxLength(blob_body_len))?;
132 }
133 let mut buf = Vec::with_capacity(blob_body_len);
134 let mut blob_stream = decode_blob(blob_body)?;
135 blob_stream.read_to_end(&mut buf).await?;
136
137 match blob_header.type_pb.as_str() {
138 "OSMHeader" => {
139 yield FileBlock::Header(deserialize_from_slice(buf.as_slice())?);
140 },
141 "OSMData" => {
142 yield FileBlock::Primitive(deserialize_from_slice(buf.as_slice())?);
143 }
144 other => {
145 yield FileBlock::Other { r#type: other.to_string(), bytes: buf, }
146 }
147 }
148 }
149 }
150}
151
152#[derive(Debug, PartialEq, Eq, Clone)]
156pub struct FileBlockLocation {
157 pub r#type: String,
159
160 pub seek: SeekFrom,
162
163 pub len: usize,
165}
166
167pub fn parse_osm_pbf<'a, R: AsyncRead + Unpin + Send + 'a>(
169 pbf_reader: R,
170) -> impl Stream<Item = Result<FileBlock, ParseError>> + Send + 'a {
171 stream_osm_blocks(pbf_reader)
172}
173
174pub fn get_osm_pbf_locations<'a, R: AsyncRead + AsyncSeek + Unpin + Send + 'a>(
178 pbf_reader: R,
179) -> impl Stream<Item = Result<FileBlockLocation, ParseError>> + Send + 'a {
180 try_stream! {
181 let headers = stream_blob_headers(pbf_reader);
182 pin_mut!(headers);
183
184 while let Some((header, seek)) = headers.try_next().await? {
185 yield FileBlockLocation { r#type: header.type_pb, seek, len: header.datasize as usize }
186 }
187 }
188}
189
190pub async fn parse_osm_pbf_at_location<'a, R: AsyncRead + AsyncSeek + Unpin + Send + 'a>(
194 mut pbf_reader: R,
195 location: FileBlockLocation,
196) -> Result<FileBlock, ParseError> {
197 pbf_reader.seek(location.seek).await?;
198
199 let mut buf = vec![0; location.len];
200 pbf_reader.read_exact(buf.as_mut()).await?;
201 let blob_body: Blob = deserialize_from_slice(buf.as_slice())?;
202
203 let blob_body_len = blob_body.raw_size.unwrap_or_default() as usize;
204 if location.len > BLOB_MAX_LEN {
205 Err(ParseError::BlobExceedsMaxLength(blob_body_len))?;
206 }
207
208 let mut buf = Vec::with_capacity(blob_body_len);
209 let mut blob_stream = decode_blob(blob_body)?;
210 blob_stream.read_to_end(&mut buf).await?;
211
212 Ok(match location.r#type.as_str() {
213 "OSMHeader" => FileBlock::Header(deserialize_from_slice(buf.as_slice())?),
214 "OSMData" => FileBlock::Primitive(deserialize_from_slice(buf.as_slice())?),
215 _ => FileBlock::Other {
216 r#type: location.r#type,
217 bytes: buf,
218 },
219 })
220}
221
222fn deserialize_from_slice<'a, M: MessageRead<'a>>(
224 bytes: &'a [u8],
225) -> Result<M, quick_protobuf::Error> {
226 let mut reader = BytesReader::from_bytes(bytes);
227 reader.read_message_by_len(bytes, bytes.len())
228}