1use byteorder::{BigEndian, ReadBytesExt};
2use bytes::Buf;
3use osm_pbf_proto::fileformat::blob::Data;
4pub use osm_pbf_proto::fileformat::{Blob as PbfBlob, BlobHeader as PbfBlobHeader};
5use osm_pbf_proto::osmformat::{HeaderBlock, PrimitiveBlock as PbfPrimitiveBlock};
6use osm_pbf_proto::protobuf::{self as pb, CodedInputStream, Message};
7use std::fs::File;
8use std::io::{self, Read};
9use std::iter;
10use std::path::Path;
11
12use crate::data::OSMDataBlob;
13use crate::error::{Error, Result};
14
15const MAX_HEADER_SIZE: u32 = 64 * 1024;
16const MAX_UNCOMPRESSED_DATA_SIZE: usize = 32 * 1024 * 1024;
17
18#[derive(PartialEq, Clone, Debug)]
19pub enum Blob<M> {
20 Encoded(PbfBlob),
21 Decoded(M),
22}
23
24impl<M> Blob<M> {
25 const INST: Self = Self::Encoded(PbfBlob {
26 raw_size: None,
27 data: None,
28 special_fields: pb::SpecialFields::new(),
29 });
30
31 #[inline]
32 const fn new(blob: PbfBlob) -> Self {
33 Self::Encoded(blob)
34 }
35}
36
37impl<M> Default for Blob<M> {
38 fn default() -> Self {
39 Self::Encoded(PbfBlob::new())
40 }
41}
42
43impl<M: Message> Blob<M> {
44 pub fn decode_into(mut self) -> Result<M> {
45 self.decode()?;
46 let Self::Decoded(d) = self else {
47 unreachable!();
48 };
49 Ok(d)
50 }
51
52 pub fn decode(&mut self) -> Result<&mut M> {
53 if let Self::Encoded(d) = self {
54 let r = match &d.data {
55 Some(Data::Raw(r)) => M::parse_from_tokio_bytes(r)?,
56 Some(Data::ZlibData(z)) => {
57 let mut decoder = flate2::bufread::ZlibDecoder::new(io::Cursor::new(z));
58 M::parse_from_reader(&mut decoder)?
59 }
60 Some(Data::LzmaData(z)) => {
61 let mut decoder = xz2::bufread::XzDecoder::new(io::Cursor::new(z));
62 M::parse_from_reader(&mut decoder)?
63 }
64 None => M::new(),
65 _ => {
66 return Err(Error::UnsupportedEncoding);
67 }
68 };
69 *self = Self::Decoded(r);
70 }
71 let Self::Decoded(d) = self else {
72 unreachable!();
73 };
74 Ok(d)
75 }
76
77 pub fn parse_and_decode(is: &mut CodedInputStream<'_>) -> pb::Result<M> {
78 let mut data = M::new();
79 while let Some(tag) = is.read_raw_tag_or_eof()? {
80 match tag {
81 10 => {
82 let len = is.read_raw_varint64()?;
84 let old_limit = is.push_limit(len)?;
85 data.merge_from(is)?;
86 is.pop_limit(old_limit);
87 }
88 #[cfg(feature = "zlib")]
89 26 => {
90 let len = is.read_raw_varint64()?;
92 let old_limit = is.push_limit(len)?;
93 let read: &mut dyn io::BufRead = is;
94 {
95 let mut decoder = flate2::bufread::ZlibDecoder::new(read);
96 let mut is = CodedInputStream::new(&mut decoder);
97 data.merge_from(&mut is)?;
98 }
99 is.pop_limit(old_limit);
100 }
101 #[cfg(feature = "lzma")]
102 34 => {
103 let len = is.read_raw_varint64()?;
105 let old_limit = is.push_limit(len)?;
106 let read: &mut dyn io::BufRead = is;
107 {
108 let mut decoder = xz2::bufread::XzDecoder::new(read);
109 let mut is = CodedInputStream::new(&mut decoder);
110 data.merge_from(&mut is)?;
111 }
112 is.pop_limit(old_limit);
113 }
114 tag => {
127 pb::rt::skip_field_for_tag(tag, is)?;
128 }
129 };
130 }
131 data.check_initialized()?;
132 Ok(data)
133 }
134}
135
136#[derive(Debug)]
137pub struct Blobs<R> {
138 header: HeaderBlock,
139 reader: R,
140}
141
142impl<R> Blobs<R> {
143 #[inline]
144 pub fn into_reader(self) -> R {
145 self.reader
146 }
147
148 #[inline]
149 pub fn header(&self) -> &HeaderBlock {
150 &self.header
151 }
152}
153
154impl<R: AsRef<[u8]>> Blobs<io::Cursor<R>> {
155 #[inline]
156 pub fn from_bytes(bytes: R) -> Result<Self> {
157 Self::from_buf_read(io::Cursor::new(bytes))
158 }
159}
160
161impl<R: Read> Blobs<io::BufReader<R>> {
162 #[inline]
163 pub fn from_read(read: R) -> Result<Self> {
164 Self::from_buf_read(io::BufReader::new(read))
165 }
166}
167
168impl Blobs<io::BufReader<File>> {
169 #[inline]
170 pub fn from_path(path: impl AsRef<Path>) -> Result<Self> {
171 let file = File::open(path)?;
172 Self::from_read(file)
173 }
174}
175
176impl<R: io::Seek> Blobs<R> {
177 #[inline]
178 pub fn rewind(&mut self) -> Result<()> {
179 self.reader.rewind()?;
180 Ok(())
181 }
182}
183
184impl<R: io::BufRead> Blobs<R> {
185 #[inline]
186 pub fn from_buf_read(reader: R) -> Result<Self> {
187 let mut r = Self {
188 header: HeaderBlock::new(),
189 reader,
190 };
191 r._read_header_block()?;
192 Ok(r)
193 }
194
195 fn _read_blob_header(&mut self) -> Result<Option<PbfBlobHeader>> {
196 let header_size = match self.reader.read_u32::<BigEndian>() {
197 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
198 return Ok(None); }
200 Err(e) => return Err(Error::IoError(e)),
201 Ok(header_size) if header_size > MAX_HEADER_SIZE => {
202 return Err(Error::BlobHeaderToLarge);
203 }
204 Ok(header_size) => header_size as usize,
205 };
206
207 let header: PbfBlobHeader = self.read_msg_exact(header_size)?;
208 let data_size = header.datasize() as usize;
209 if data_size > MAX_UNCOMPRESSED_DATA_SIZE {
210 return Err(Error::BlobDataToLarge);
211 }
212 Ok(Some(header))
213 }
214
215 fn read_msg_exact<M: Message>(&mut self, exact_size: usize) -> Result<M> {
216 let mut input = self.reader.by_ref().take(exact_size as u64);
217 let mut input = CodedInputStream::from_buf_read(&mut input);
218 let msg = M::parse_from_reader(&mut input)?;
219 input.check_eof()?;
220 Ok(msg)
221 }
222
223 pub fn next_blob(&mut self) -> Result<Option<(PbfBlobHeader, PbfBlob)>> {
224 let Some(header) = self._read_blob_header()? else {
225 return Ok(None);
226 };
227 let blob: PbfBlob = self.read_msg_exact(header.datasize() as usize)?;
228 Ok(Some((header, blob)))
229 }
230
231 fn _read_header_block(&mut self) -> Result<()> {
232 let Some(header) = self._read_blob_header()? else {
233 return Err(io::ErrorKind::UnexpectedEof.into());
234 };
235 if header.type_() != "OSMHeader" {
236 return Err(Error::UnexpectedBlobType(header.type_().to_string()));
237 }
238 let mut input = self.reader.by_ref().take(header.datasize() as u64);
239 let mut input = CodedInputStream::from_buf_read(&mut input);
240 self.header = Blob::parse_and_decode(&mut input)?;
241 input.check_eof()?;
242 Ok(())
243 }
244
245 pub fn next_primitive_block(&mut self) -> Result<Option<OSMDataBlob>> {
246 let Some(header) = self._read_blob_header()? else {
247 return Ok(None);
248 };
249 if header.type_() != "OSMData" {
250 return Err(Error::UnexpectedBlobType(header.type_().to_string()));
251 }
252 let blob: PbfBlob = self.read_msg_exact(header.datasize() as usize)?;
253 Ok(Some(Blob::Encoded(blob)))
254 }
255
256 pub fn next_primitive_block_decoded(&mut self) -> Result<Option<PbfPrimitiveBlock>> {
257 let Some(header) = self._read_blob_header()? else {
258 return Ok(None);
259 };
260 if header.type_() != "OSMData" {
261 return Err(Error::UnexpectedBlobType(header.type_().to_string()));
262 }
263 let mut input = self.reader.by_ref().take(header.datasize() as u64);
264 let mut input = CodedInputStream::from_buf_read(&mut input);
265 let decoded = Blob::parse_and_decode(&mut input)?;
266 input.check_eof()?;
267 Ok(Some(decoded))
268 }
269}
270
271impl<R: io::BufRead + io::Seek> Blobs<R> {
272 fn next_blob_with(
273 &mut self,
274 cond: impl Fn(&PbfBlobHeader) -> bool,
275 ) -> Result<Option<(PbfBlobHeader, PbfBlob)>> {
276 loop {
277 let Some(header) = self._read_blob_header()? else {
278 return Ok(None);
279 };
280 if cond(&header) {
281 let blob: PbfBlob = self.read_msg_exact((header.datasize() as u32) as usize)?;
282 return Ok(Some((header, blob)));
283 }
284 self.reader
285 .seek(io::SeekFrom::Current((header.datasize() as u32) as i64))?;
286 }
287 }
288}
289
290impl<R: io::BufRead> Iterator for Blobs<R> {
291 type Item = Result<OSMDataBlob>;
292
293 #[inline]
294 fn next(&mut self) -> Option<Result<OSMDataBlob>> {
295 self.next_primitive_block().transpose()
296 }
297}
298
299impl<R: io::BufRead> iter::FusedIterator for Blobs<R> {}