osm_pbf_reader/
blob.rs

1use byteorder::{BigEndian, ReadBytesExt};
2use bytes::Buf;
3use osm_pbf_proto::fileformat::blob::Data;
4pub use osm_pbf_proto::fileformat::{Blob as PbfBlob, BlobHeader as PbfBlobHeader};
5use osm_pbf_proto::osmformat::{HeaderBlock, PrimitiveBlock as PbfPrimitiveBlock};
6use osm_pbf_proto::protobuf::{self as pb, CodedInputStream, Message};
7use std::fs::File;
8use std::io::{self, Read};
9use std::iter;
10use std::path::Path;
11
12use crate::data::OSMDataBlob;
13use crate::error::{Error, Result};
14
15const MAX_HEADER_SIZE: u32 = 64 * 1024;
16const MAX_UNCOMPRESSED_DATA_SIZE: usize = 32 * 1024 * 1024;
17
18#[derive(PartialEq, Clone, Debug)]
19pub enum Blob<M> {
20    Encoded(PbfBlob),
21    Decoded(M),
22}
23
24impl<M> Blob<M> {
25    const INST: Self = Self::Encoded(PbfBlob {
26        raw_size: None,
27        data: None,
28        special_fields: pb::SpecialFields::new(),
29    });
30
31    #[inline]
32    const fn new(blob: PbfBlob) -> Self {
33        Self::Encoded(blob)
34    }
35}
36
37impl<M> Default for Blob<M> {
38    fn default() -> Self {
39        Self::Encoded(PbfBlob::new())
40    }
41}
42
43impl<M: Message> Blob<M> {
44    pub fn decode_into(mut self) -> Result<M> {
45        self.decode()?;
46        let Self::Decoded(d) = self else {
47            unreachable!();
48        };
49        Ok(d)
50    }
51
52    pub fn decode(&mut self) -> Result<&mut M> {
53        if let Self::Encoded(d) = self {
54            let r = match &d.data {
55                Some(Data::Raw(r)) => M::parse_from_tokio_bytes(r)?,
56                Some(Data::ZlibData(z)) => {
57                    let mut decoder = flate2::bufread::ZlibDecoder::new(io::Cursor::new(z));
58                    M::parse_from_reader(&mut decoder)?
59                }
60                Some(Data::LzmaData(z)) => {
61                    let mut decoder = xz2::bufread::XzDecoder::new(io::Cursor::new(z));
62                    M::parse_from_reader(&mut decoder)?
63                }
64                None => M::new(),
65                _ => {
66                    return Err(Error::UnsupportedEncoding);
67                }
68            };
69            *self = Self::Decoded(r);
70        }
71        let Self::Decoded(d) = self else {
72            unreachable!();
73        };
74        Ok(d)
75    }
76
77    pub fn parse_and_decode(is: &mut CodedInputStream<'_>) -> pb::Result<M> {
78        let mut data = M::new();
79        while let Some(tag) = is.read_raw_tag_or_eof()? {
80            match tag {
81                10 => {
82                    // Raw (1)
83                    let len = is.read_raw_varint64()?;
84                    let old_limit = is.push_limit(len)?;
85                    data.merge_from(is)?;
86                    is.pop_limit(old_limit);
87                }
88                #[cfg(feature = "zlib")]
89                26 => {
90                    // ZlibData (3)
91                    let len = is.read_raw_varint64()?;
92                    let old_limit = is.push_limit(len)?;
93                    let read: &mut dyn io::BufRead = is;
94                    {
95                        let mut decoder = flate2::bufread::ZlibDecoder::new(read);
96                        let mut is = CodedInputStream::new(&mut decoder);
97                        data.merge_from(&mut is)?;
98                    }
99                    is.pop_limit(old_limit);
100                }
101                #[cfg(feature = "lzma")]
102                34 => {
103                    // LzmaData (4)
104                    let len = is.read_raw_varint64()?;
105                    let old_limit = is.push_limit(len)?;
106                    let read: &mut dyn io::BufRead = is;
107                    {
108                        let mut decoder = xz2::bufread::XzDecoder::new(read);
109                        let mut is = CodedInputStream::new(&mut decoder);
110                        data.merge_from(&mut is)?;
111                    }
112                    is.pop_limit(old_limit);
113                }
114                /*
115                42 => { // OBSOLETEzip2Data (5)
116                    todo!()
117                },
118                50 => { // Lz4Data (6)
119                    todo!()
120                },
121                58 => { // ZstdData (
122                        // 7)
123                    todo!()
124                },
125                */
126                tag => {
127                    pb::rt::skip_field_for_tag(tag, is)?;
128                }
129            };
130        }
131        data.check_initialized()?;
132        Ok(data)
133    }
134}
135
136#[derive(Debug)]
137pub struct Blobs<R> {
138    header: HeaderBlock,
139    reader: R,
140}
141
142impl<R> Blobs<R> {
143    #[inline]
144    pub fn into_reader(self) -> R {
145        self.reader
146    }
147
148    #[inline]
149    pub fn header(&self) -> &HeaderBlock {
150        &self.header
151    }
152}
153
154impl<R: AsRef<[u8]>> Blobs<io::Cursor<R>> {
155    #[inline]
156    pub fn from_bytes(bytes: R) -> Result<Self> {
157        Self::from_buf_read(io::Cursor::new(bytes))
158    }
159}
160
161impl<R: Read> Blobs<io::BufReader<R>> {
162    #[inline]
163    pub fn from_read(read: R) -> Result<Self> {
164        Self::from_buf_read(io::BufReader::new(read))
165    }
166}
167
168impl Blobs<io::BufReader<File>> {
169    #[inline]
170    pub fn from_path(path: impl AsRef<Path>) -> Result<Self> {
171        let file = File::open(path)?;
172        Self::from_read(file)
173    }
174}
175
176impl<R: io::Seek> Blobs<R> {
177    #[inline]
178    pub fn rewind(&mut self) -> Result<()> {
179        self.reader.rewind()?;
180        Ok(())
181    }
182}
183
184impl<R: io::BufRead> Blobs<R> {
185    #[inline]
186    pub fn from_buf_read(reader: R) -> Result<Self> {
187        let mut r = Self {
188            header: HeaderBlock::new(),
189            reader,
190        };
191        r._read_header_block()?;
192        Ok(r)
193    }
194
195    fn _read_blob_header(&mut self) -> Result<Option<PbfBlobHeader>> {
196        let header_size = match self.reader.read_u32::<BigEndian>() {
197            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
198                return Ok(None); // Expected EOF
199            }
200            Err(e) => return Err(Error::IoError(e)),
201            Ok(header_size) if header_size > MAX_HEADER_SIZE => {
202                return Err(Error::BlobHeaderToLarge);
203            }
204            Ok(header_size) => header_size as usize,
205        };
206
207        let header: PbfBlobHeader = self.read_msg_exact(header_size)?;
208        let data_size = header.datasize() as usize;
209        if data_size > MAX_UNCOMPRESSED_DATA_SIZE {
210            return Err(Error::BlobDataToLarge);
211        }
212        Ok(Some(header))
213    }
214
215    fn read_msg_exact<M: Message>(&mut self, exact_size: usize) -> Result<M> {
216        let mut input = self.reader.by_ref().take(exact_size as u64);
217        let mut input = CodedInputStream::from_buf_read(&mut input);
218        let msg = M::parse_from_reader(&mut input)?;
219        input.check_eof()?;
220        Ok(msg)
221    }
222
223    pub fn next_blob(&mut self) -> Result<Option<(PbfBlobHeader, PbfBlob)>> {
224        let Some(header) = self._read_blob_header()? else {
225            return Ok(None);
226        };
227        let blob: PbfBlob = self.read_msg_exact(header.datasize() as usize)?;
228        Ok(Some((header, blob)))
229    }
230
231    fn _read_header_block(&mut self) -> Result<()> {
232        let Some(header) = self._read_blob_header()? else {
233            return Err(io::ErrorKind::UnexpectedEof.into());
234        };
235        if header.type_() != "OSMHeader" {
236            return Err(Error::UnexpectedBlobType(header.type_().to_string()));
237        }
238        let mut input = self.reader.by_ref().take(header.datasize() as u64);
239        let mut input = CodedInputStream::from_buf_read(&mut input);
240        self.header = Blob::parse_and_decode(&mut input)?;
241        input.check_eof()?;
242        Ok(())
243    }
244
245    pub fn next_primitive_block(&mut self) -> Result<Option<OSMDataBlob>> {
246        let Some(header) = self._read_blob_header()? else {
247            return Ok(None);
248        };
249        if header.type_() != "OSMData" {
250            return Err(Error::UnexpectedBlobType(header.type_().to_string()));
251        }
252        let blob: PbfBlob = self.read_msg_exact(header.datasize() as usize)?;
253        Ok(Some(Blob::Encoded(blob)))
254    }
255
256    pub fn next_primitive_block_decoded(&mut self) -> Result<Option<PbfPrimitiveBlock>> {
257        let Some(header) = self._read_blob_header()? else {
258            return Ok(None);
259        };
260        if header.type_() != "OSMData" {
261            return Err(Error::UnexpectedBlobType(header.type_().to_string()));
262        }
263        let mut input = self.reader.by_ref().take(header.datasize() as u64);
264        let mut input = CodedInputStream::from_buf_read(&mut input);
265        let decoded = Blob::parse_and_decode(&mut input)?;
266        input.check_eof()?;
267        Ok(Some(decoded))
268    }
269}
270
271impl<R: io::BufRead + io::Seek> Blobs<R> {
272    fn next_blob_with(
273        &mut self,
274        cond: impl Fn(&PbfBlobHeader) -> bool,
275    ) -> Result<Option<(PbfBlobHeader, PbfBlob)>> {
276        loop {
277            let Some(header) = self._read_blob_header()? else {
278                return Ok(None);
279            };
280            if cond(&header) {
281                let blob: PbfBlob = self.read_msg_exact((header.datasize() as u32) as usize)?;
282                return Ok(Some((header, blob)));
283            }
284            self.reader
285                .seek(io::SeekFrom::Current((header.datasize() as u32) as i64))?;
286        }
287    }
288}
289
290impl<R: io::BufRead> Iterator for Blobs<R> {
291    type Item = Result<OSMDataBlob>;
292
293    #[inline]
294    fn next(&mut self) -> Option<Result<OSMDataBlob>> {
295        self.next_primitive_block().transpose()
296    }
297}
298
299impl<R: io::BufRead> iter::FusedIterator for Blobs<R> {}