o5m_stream/
lib.rs

1#![warn(clippy::future_not_send)]
2#![feature(async_closure,backtrace)]
3#![doc=include_str!("../readme.md")]
4
5use async_std::{prelude::*,stream::Stream,io};
6use std::collections::VecDeque;
7
8mod unfold;
9mod data;
10pub use data::*;
11pub mod parse;
12
13type Error = Box<dyn std::error::Error+Send+Sync>;
14
15pub type DecodeItem = Result<Dataset,DecodeError>;
16pub type DecodeStream = Box<dyn Stream<Item=DecodeItem>+Send+Unpin>;
17
18#[derive(Clone,PartialEq,Debug)]
19enum State { Begin(), Type(), Len(), Data(), End() }
20
21use std::backtrace::Backtrace;
22
23#[derive(thiserror::Error,Debug)]
24pub enum DecodeError {
25  #[error("string at index {index} not available")]
26  StringUnavailable {
27    index: usize,
28    #[backtrace] backtrace: Backtrace,
29  },
30  #[error("{info:?}. expected: 0x{expected:02x}, received: 0x{received:02x}")]
31  UnexpectedByte {
32    info: String,
33    expected: u8,
34    received: u8,
35    #[backtrace] backtrace: Backtrace,
36  },
37  #[error("expected 0x30, 0x31, or 0x32 for element type. \
38    received: 0x{received:02x}\n{backtrace}")]
39  UnexpectedElementType {
40    received: u8,
41    #[backtrace] backtrace: Backtrace,
42  },
43  #[error("stream read error {source:?}")]
44  StreamReadError { #[source] source: Box<Error> },
45  #[error("string encoding error {source:?}")]
46  StringEncodingError { #[source] source: Box<Error> },
47  #[error("unterminated signed integer\n{backtrace}")]
48  UnterminatedSignedInteger { #[backtrace] backtrace: Backtrace },
49  #[error("unterminated unsigned integer\n{backtrace}")]
50  UnterminatedUnsignedInteger { #[backtrace] backtrace: Backtrace },
51}
52
53struct Decoder {
54  reader: Box<dyn io::Read+Send+Unpin>,
55  buffer: Vec<u8>,
56  index: usize,
57  buffer_len: usize,
58  state: State,
59  data_type: Option<DatasetType>,
60  len: usize,
61  npow: u64,
62  chunk: Vec<u8>,
63  size: usize,
64  strings: VecDeque<(Vec<u8>,Vec<u8>)>,
65  prev_id: Option<u64>,
66  prev_info: Option<Info>,
67  prev: Option<Dataset>,
68}
69
70impl Decoder {
71  pub fn new(reader: Box<dyn io::Read+Send+Unpin>) -> Self {
72    Self {
73      reader,
74      buffer: vec![0;4096],
75      index: 0,
76      buffer_len: 0,
77      state: State::Begin(),
78      data_type: None,
79      len: 0,
80      npow: 1,
81      chunk: vec![],
82      size: 0,
83      strings: VecDeque::new(),
84      prev: None,
85      prev_id: None,
86      prev_info: None,
87    }
88  }
89  pub async fn next_item(&mut self) -> Result<Option<Dataset>,DecodeError> {
90    loop {
91      if self.index >= self.buffer_len {
92        self.buffer_len = self.reader.read(&mut self.buffer).await
93          .map_err(|e| DecodeError::StreamReadError { source: Box::new(e.into()) })?;
94        self.index = 0;
95        if self.buffer_len == 0 { break }
96      }
97      while self.index < self.buffer_len {
98        let b = self.buffer[self.index];
99        if self.state == State::Begin() && b != 0xff {
100          return Err(DecodeError::UnexpectedByte {
101            info: "first byte in frame".to_string(),
102            expected: 0xff,
103            received: b,
104            backtrace: Backtrace::capture(),
105          });
106        } else if self.state == State::Begin() {
107          self.state = State::Type();
108        } else if self.state == State::Type() && b == 0xff { // reset
109          self.state = State::Type();
110          self.prev = None;
111          self.prev_id = None;
112          self.prev_info = None;
113        } else if self.state == State::Type() {
114          self.state = State::Len();
115          self.data_type = match b {
116            0x10 => Some(DatasetType::Node()),
117            0x11 => Some(DatasetType::Way()),
118            0x12 => Some(DatasetType::Relation()),
119            0xdb => Some(DatasetType::BBox()),
120            0xdc => Some(DatasetType::Timestamp()),
121            0xe0 => Some(DatasetType::Header()),
122            0xee => Some(DatasetType::Sync()),
123            0xef => Some(DatasetType::Jump()),
124            0xff => Some(DatasetType::Reset()),
125            _ => None,
126          };
127        } else if self.state == State::Len() {
128          self.len += ((b & 0x7f) as usize) * (self.npow as usize);
129          self.npow *= 0x80;
130          if b < 0x80 {
131            self.npow = 1;
132            self.state = State::Data();
133          }
134        } else if self.state == State::Data() {
135          let j = self.buffer_len.min(self.index+self.len-self.size);
136          self.chunk.extend_from_slice(&self.buffer[self.index..j]);
137          self.size += j-self.index;
138          if self.size >= self.len {
139            let res = self.flush()?;
140            self.state = State::Type();
141            self.len = 0;
142            self.size = 0;
143            self.chunk.clear();
144            if let Some(data) = res {
145              let save = match &data {
146                Dataset::Node(node) => node.data.is_some(),
147                Dataset::Way(way) => way.data.is_some(),
148                Dataset::Relation(relation) => relation.data.is_some(),
149                _ => true,
150              };
151              if save {
152                self.prev = Some(data.clone());
153              }
154              if let Some(id) = data.get_id() {
155                self.prev_id = Some(id);
156              }
157              if let Some(info) = data.get_info() {
158                self.prev_info = Some(info);
159              }
160              self.index = j;
161              return Ok(Some(data));
162            }
163          }
164          self.index = j - 1;
165        } else if self.state == State::End() && b != 0xfe {
166          return Err(DecodeError::UnexpectedByte {
167            info: "last byte in frame".to_string(),
168            expected: 0xf3,
169            received: b,
170            backtrace: Backtrace::capture(),
171          });
172        } else if self.state == State::End() {
173          // ...
174        }
175        self.index += 1;
176      }
177    }
178    Ok(None)
179  }
180  fn flush(&mut self) -> Result<Option<Dataset>,DecodeError> {
181    let mut offset = 0;
182    let buf = &self.chunk;
183    Ok(match self.data_type {
184      Some(DatasetType::Node()) => {
185        let (s,(id,info)) = parse::info(
186          &buf[offset..],
187          &self.prev_id,
188          &self.prev_info,
189          &mut self.strings
190        )?;
191        offset += s;
192        if offset == buf.len() {
193          Some(Dataset::Node(Node {
194            id,
195            info,
196            data: None,
197            tags: std::collections::HashMap::new(),
198          }))
199        } else {
200          let longitude = {
201            let (s,x) = parse::signed(&buf[offset..])?;
202            offset += s;
203            (x + (match &self.prev {
204              Some(Dataset::Node(node)) => node.data.as_ref()
205                .map(|data| data.longitude),
206              _ => None,
207            }.unwrap_or(0) as i64)) as i32
208          };
209          let latitude = {
210            let (s,x) = parse::signed(&buf[offset..])?;
211            offset += s;
212            (x + (match &self.prev {
213              Some(Dataset::Node(node)) => node.data.as_ref()
214                .map(|data| data.latitude),
215              _ => None,
216            }.unwrap_or(0) as i64)) as i32
217          };
218          let (_,tags) = parse::tags(&buf[offset..], &mut self.strings)?;
219          Some(Dataset::Node(Node {
220            id,
221            info,
222            data: Some(NodeData { longitude, latitude }),
223            tags,
224          }))
225        }
226      },
227      Some(DatasetType::Way()) => {
228        let (s,(id,info)) = parse::info(
229          &buf[offset..],
230          &self.prev_id,
231          &self.prev_info,
232          &mut self.strings
233        )?;
234        offset += s;
235        if offset == buf.len() {
236          return Ok(Some(Dataset::Way(Way {
237            id,
238            info,
239            data: None,
240            tags: std::collections::HashMap::new(),
241          })));
242        }
243        // reflen is the number of BYTES, not the number of refs
244        let (s,reflen) = parse::unsigned(&buf[offset..])?;
245        offset += s;
246        let mut refs = vec![];
247        let mut prev_ref = match &self.prev {
248          Some(Dataset::Way(way)) => way.data.as_ref().and_then(|d| {
249            d.refs.last().copied()
250          }).unwrap_or(0),
251          _ => 0
252        };
253        let ref_end = offset + reflen as usize;
254        while offset < ref_end {
255          let (s,x) = parse::signed(&buf[offset..])?;
256          offset += s;
257          let r = (x + (prev_ref as i64)) as u64;
258          refs.push(r);
259          prev_ref = r;
260        }
261        let (_,tags) = parse::tags(&buf[offset..], &mut self.strings)?;
262        Some(Dataset::Way(Way {
263          id,
264          info,
265          data: Some(WayData { refs }),
266          tags
267        }))
268      },
269      Some(DatasetType::Relation()) => {
270        let (s,(id,info)) = parse::info(
271          &buf[offset..],
272          &self.prev_id,
273          &self.prev_info,
274          &mut self.strings
275        )?;
276        offset += s;
277        if offset == buf.len() {
278          return Ok(Some(Dataset::Relation(Relation {
279            id,
280            info,
281            data: None,
282            tags: std::collections::HashMap::new(),
283          })));
284        }
285        // reflen is the number of BYTES, not the number of refs
286        let (s,reflen) = parse::unsigned(&buf[offset..])?;
287        offset += s;
288        let mut members = vec![];
289        let mut prev_id = match &self.prev {
290          Some(Dataset::Relation(rel)) => rel.data.as_ref().and_then(|d| {
291            d.members.last().map(|m| m.id)
292          }).unwrap_or(0),
293          _ => 0
294        };
295        let ref_end = offset + reflen as usize;
296        while offset < ref_end {
297          let m_id = {
298            let (s,x) = parse::signed(&buf[offset..])?;
299            offset += s;
300            (x + (prev_id as i64)) as u64
301          };
302          prev_id = m_id;
303          let mstring = {
304            let (s,x) = parse::unsigned(&buf[offset..])?;
305            offset += s;
306            if x == 0 {
307              let i = offset + buf[offset..].iter()
308                .position(|p| *p == 0x00).unwrap_or(buf.len()-offset);
309              let mbytes = &buf[offset..i];
310              offset = i+1;
311              if mbytes.len() <= 250 {
312                self.strings.push_front((mbytes.to_vec(),vec![]));
313                if self.strings.len() > 15_000 { self.strings.pop_back(); }
314              }
315              mbytes
316            } else {
317              let pair = self.strings.get((x as usize)-1);
318              if pair.is_none() {
319                return Err(DecodeError::StringUnavailable {
320                  index: x as usize,
321                  backtrace: Backtrace::capture(),
322                });
323              }
324              &pair.unwrap().0
325            }
326          };
327          members.push(RelationMember {
328            id: m_id,
329            element_type: match mstring[0] {
330              0x30 => ElementType::Node(),
331              0x31 => ElementType::Way(),
332              0x32 => ElementType::Relation(),
333              x => return Err(DecodeError::UnexpectedElementType {
334                received: x,
335                backtrace: Backtrace::capture(),
336              }),
337            },
338            role: String::from_utf8(mstring[1..].to_vec())
339              .map_err(|e| DecodeError::StringEncodingError { source: Box::new(e.into()) })?,
340          });
341        }
342        let (_,tags) = parse::tags(&buf[offset..], &mut self.strings)?;
343        Some(Dataset::Relation(Relation {
344          id,
345          info,
346          data: Some(RelationData { members }),
347          tags
348        }))
349      },
350      Some(DatasetType::Timestamp()) => {
351        let (_,time) = parse::signed(&buf[offset..])?;
352        Some(Dataset::Timestamp(Timestamp { time }))
353      },
354      Some(DatasetType::BBox()) => {
355        let (s,x1) = parse::signed(&buf[offset..])?;
356        offset += s;
357        let (s,y1) = parse::signed(&buf[offset..])?;
358        offset += s;
359        let (s,x2) = parse::signed(&buf[offset..])?;
360        offset += s;
361        let (_,y2) = parse::signed(&buf[offset..])?;
362        Some(Dataset::BBox(BBox {
363          x1: x1 as i32,
364          y1: y1 as i32,
365          x2: x2 as i32,
366          y2: y2 as i32,
367        }))
368      },
369      Some(DatasetType::Header()) => None,
370      Some(DatasetType::Sync()) => None,
371      Some(DatasetType::Jump()) => None,
372      Some(DatasetType::Reset()) => None,
373      None => None,
374    })
375  }
376}
377
378/// Transform the given binary stream `reader` into an stream of fallible `Dataset` items.
379pub fn decode(reader: Box<dyn io::Read+Send+Unpin>) -> DecodeStream {
380  let state = Decoder::new(reader);
381  Box::new(unfold::unfold(state, async move |mut qs| {
382    match qs.next_item().await {
383      Ok(None) => None,
384      Ok(Some(x)) => Some((Ok(x),qs)),
385      Err(e) => Some((Err(e),qs)),
386    }
387  }))
388}