1#![warn(clippy::future_not_send)]
2#![feature(async_closure,backtrace)]
3#![doc=include_str!("../readme.md")]
4
5use async_std::{prelude::*,stream::Stream,io};
6use std::collections::VecDeque;
7
8mod unfold;
9mod data;
10pub use data::*;
11pub mod parse;
12
13type Error = Box<dyn std::error::Error+Send+Sync>;
14
15pub type DecodeItem = Result<Dataset,DecodeError>;
16pub type DecodeStream = Box<dyn Stream<Item=DecodeItem>+Send+Unpin>;
17
18#[derive(Clone,PartialEq,Debug)]
19enum State { Begin(), Type(), Len(), Data(), End() }
20
21use std::backtrace::Backtrace;
22
23#[derive(thiserror::Error,Debug)]
24pub enum DecodeError {
25 #[error("string at index {index} not available")]
26 StringUnavailable {
27 index: usize,
28 #[backtrace] backtrace: Backtrace,
29 },
30 #[error("{info:?}. expected: 0x{expected:02x}, received: 0x{received:02x}")]
31 UnexpectedByte {
32 info: String,
33 expected: u8,
34 received: u8,
35 #[backtrace] backtrace: Backtrace,
36 },
37 #[error("expected 0x30, 0x31, or 0x32 for element type. \
38 received: 0x{received:02x}\n{backtrace}")]
39 UnexpectedElementType {
40 received: u8,
41 #[backtrace] backtrace: Backtrace,
42 },
43 #[error("stream read error {source:?}")]
44 StreamReadError { #[source] source: Box<Error> },
45 #[error("string encoding error {source:?}")]
46 StringEncodingError { #[source] source: Box<Error> },
47 #[error("unterminated signed integer\n{backtrace}")]
48 UnterminatedSignedInteger { #[backtrace] backtrace: Backtrace },
49 #[error("unterminated unsigned integer\n{backtrace}")]
50 UnterminatedUnsignedInteger { #[backtrace] backtrace: Backtrace },
51}
52
53struct Decoder {
54 reader: Box<dyn io::Read+Send+Unpin>,
55 buffer: Vec<u8>,
56 index: usize,
57 buffer_len: usize,
58 state: State,
59 data_type: Option<DatasetType>,
60 len: usize,
61 npow: u64,
62 chunk: Vec<u8>,
63 size: usize,
64 strings: VecDeque<(Vec<u8>,Vec<u8>)>,
65 prev_id: Option<u64>,
66 prev_info: Option<Info>,
67 prev: Option<Dataset>,
68}
69
70impl Decoder {
71 pub fn new(reader: Box<dyn io::Read+Send+Unpin>) -> Self {
72 Self {
73 reader,
74 buffer: vec![0;4096],
75 index: 0,
76 buffer_len: 0,
77 state: State::Begin(),
78 data_type: None,
79 len: 0,
80 npow: 1,
81 chunk: vec![],
82 size: 0,
83 strings: VecDeque::new(),
84 prev: None,
85 prev_id: None,
86 prev_info: None,
87 }
88 }
89 pub async fn next_item(&mut self) -> Result<Option<Dataset>,DecodeError> {
90 loop {
91 if self.index >= self.buffer_len {
92 self.buffer_len = self.reader.read(&mut self.buffer).await
93 .map_err(|e| DecodeError::StreamReadError { source: Box::new(e.into()) })?;
94 self.index = 0;
95 if self.buffer_len == 0 { break }
96 }
97 while self.index < self.buffer_len {
98 let b = self.buffer[self.index];
99 if self.state == State::Begin() && b != 0xff {
100 return Err(DecodeError::UnexpectedByte {
101 info: "first byte in frame".to_string(),
102 expected: 0xff,
103 received: b,
104 backtrace: Backtrace::capture(),
105 });
106 } else if self.state == State::Begin() {
107 self.state = State::Type();
108 } else if self.state == State::Type() && b == 0xff { self.state = State::Type();
110 self.prev = None;
111 self.prev_id = None;
112 self.prev_info = None;
113 } else if self.state == State::Type() {
114 self.state = State::Len();
115 self.data_type = match b {
116 0x10 => Some(DatasetType::Node()),
117 0x11 => Some(DatasetType::Way()),
118 0x12 => Some(DatasetType::Relation()),
119 0xdb => Some(DatasetType::BBox()),
120 0xdc => Some(DatasetType::Timestamp()),
121 0xe0 => Some(DatasetType::Header()),
122 0xee => Some(DatasetType::Sync()),
123 0xef => Some(DatasetType::Jump()),
124 0xff => Some(DatasetType::Reset()),
125 _ => None,
126 };
127 } else if self.state == State::Len() {
128 self.len += ((b & 0x7f) as usize) * (self.npow as usize);
129 self.npow *= 0x80;
130 if b < 0x80 {
131 self.npow = 1;
132 self.state = State::Data();
133 }
134 } else if self.state == State::Data() {
135 let j = self.buffer_len.min(self.index+self.len-self.size);
136 self.chunk.extend_from_slice(&self.buffer[self.index..j]);
137 self.size += j-self.index;
138 if self.size >= self.len {
139 let res = self.flush()?;
140 self.state = State::Type();
141 self.len = 0;
142 self.size = 0;
143 self.chunk.clear();
144 if let Some(data) = res {
145 let save = match &data {
146 Dataset::Node(node) => node.data.is_some(),
147 Dataset::Way(way) => way.data.is_some(),
148 Dataset::Relation(relation) => relation.data.is_some(),
149 _ => true,
150 };
151 if save {
152 self.prev = Some(data.clone());
153 }
154 if let Some(id) = data.get_id() {
155 self.prev_id = Some(id);
156 }
157 if let Some(info) = data.get_info() {
158 self.prev_info = Some(info);
159 }
160 self.index = j;
161 return Ok(Some(data));
162 }
163 }
164 self.index = j - 1;
165 } else if self.state == State::End() && b != 0xfe {
166 return Err(DecodeError::UnexpectedByte {
167 info: "last byte in frame".to_string(),
168 expected: 0xf3,
169 received: b,
170 backtrace: Backtrace::capture(),
171 });
172 } else if self.state == State::End() {
173 }
175 self.index += 1;
176 }
177 }
178 Ok(None)
179 }
180 fn flush(&mut self) -> Result<Option<Dataset>,DecodeError> {
181 let mut offset = 0;
182 let buf = &self.chunk;
183 Ok(match self.data_type {
184 Some(DatasetType::Node()) => {
185 let (s,(id,info)) = parse::info(
186 &buf[offset..],
187 &self.prev_id,
188 &self.prev_info,
189 &mut self.strings
190 )?;
191 offset += s;
192 if offset == buf.len() {
193 Some(Dataset::Node(Node {
194 id,
195 info,
196 data: None,
197 tags: std::collections::HashMap::new(),
198 }))
199 } else {
200 let longitude = {
201 let (s,x) = parse::signed(&buf[offset..])?;
202 offset += s;
203 (x + (match &self.prev {
204 Some(Dataset::Node(node)) => node.data.as_ref()
205 .map(|data| data.longitude),
206 _ => None,
207 }.unwrap_or(0) as i64)) as i32
208 };
209 let latitude = {
210 let (s,x) = parse::signed(&buf[offset..])?;
211 offset += s;
212 (x + (match &self.prev {
213 Some(Dataset::Node(node)) => node.data.as_ref()
214 .map(|data| data.latitude),
215 _ => None,
216 }.unwrap_or(0) as i64)) as i32
217 };
218 let (_,tags) = parse::tags(&buf[offset..], &mut self.strings)?;
219 Some(Dataset::Node(Node {
220 id,
221 info,
222 data: Some(NodeData { longitude, latitude }),
223 tags,
224 }))
225 }
226 },
227 Some(DatasetType::Way()) => {
228 let (s,(id,info)) = parse::info(
229 &buf[offset..],
230 &self.prev_id,
231 &self.prev_info,
232 &mut self.strings
233 )?;
234 offset += s;
235 if offset == buf.len() {
236 return Ok(Some(Dataset::Way(Way {
237 id,
238 info,
239 data: None,
240 tags: std::collections::HashMap::new(),
241 })));
242 }
243 let (s,reflen) = parse::unsigned(&buf[offset..])?;
245 offset += s;
246 let mut refs = vec![];
247 let mut prev_ref = match &self.prev {
248 Some(Dataset::Way(way)) => way.data.as_ref().and_then(|d| {
249 d.refs.last().copied()
250 }).unwrap_or(0),
251 _ => 0
252 };
253 let ref_end = offset + reflen as usize;
254 while offset < ref_end {
255 let (s,x) = parse::signed(&buf[offset..])?;
256 offset += s;
257 let r = (x + (prev_ref as i64)) as u64;
258 refs.push(r);
259 prev_ref = r;
260 }
261 let (_,tags) = parse::tags(&buf[offset..], &mut self.strings)?;
262 Some(Dataset::Way(Way {
263 id,
264 info,
265 data: Some(WayData { refs }),
266 tags
267 }))
268 },
269 Some(DatasetType::Relation()) => {
270 let (s,(id,info)) = parse::info(
271 &buf[offset..],
272 &self.prev_id,
273 &self.prev_info,
274 &mut self.strings
275 )?;
276 offset += s;
277 if offset == buf.len() {
278 return Ok(Some(Dataset::Relation(Relation {
279 id,
280 info,
281 data: None,
282 tags: std::collections::HashMap::new(),
283 })));
284 }
285 let (s,reflen) = parse::unsigned(&buf[offset..])?;
287 offset += s;
288 let mut members = vec![];
289 let mut prev_id = match &self.prev {
290 Some(Dataset::Relation(rel)) => rel.data.as_ref().and_then(|d| {
291 d.members.last().map(|m| m.id)
292 }).unwrap_or(0),
293 _ => 0
294 };
295 let ref_end = offset + reflen as usize;
296 while offset < ref_end {
297 let m_id = {
298 let (s,x) = parse::signed(&buf[offset..])?;
299 offset += s;
300 (x + (prev_id as i64)) as u64
301 };
302 prev_id = m_id;
303 let mstring = {
304 let (s,x) = parse::unsigned(&buf[offset..])?;
305 offset += s;
306 if x == 0 {
307 let i = offset + buf[offset..].iter()
308 .position(|p| *p == 0x00).unwrap_or(buf.len()-offset);
309 let mbytes = &buf[offset..i];
310 offset = i+1;
311 if mbytes.len() <= 250 {
312 self.strings.push_front((mbytes.to_vec(),vec![]));
313 if self.strings.len() > 15_000 { self.strings.pop_back(); }
314 }
315 mbytes
316 } else {
317 let pair = self.strings.get((x as usize)-1);
318 if pair.is_none() {
319 return Err(DecodeError::StringUnavailable {
320 index: x as usize,
321 backtrace: Backtrace::capture(),
322 });
323 }
324 &pair.unwrap().0
325 }
326 };
327 members.push(RelationMember {
328 id: m_id,
329 element_type: match mstring[0] {
330 0x30 => ElementType::Node(),
331 0x31 => ElementType::Way(),
332 0x32 => ElementType::Relation(),
333 x => return Err(DecodeError::UnexpectedElementType {
334 received: x,
335 backtrace: Backtrace::capture(),
336 }),
337 },
338 role: String::from_utf8(mstring[1..].to_vec())
339 .map_err(|e| DecodeError::StringEncodingError { source: Box::new(e.into()) })?,
340 });
341 }
342 let (_,tags) = parse::tags(&buf[offset..], &mut self.strings)?;
343 Some(Dataset::Relation(Relation {
344 id,
345 info,
346 data: Some(RelationData { members }),
347 tags
348 }))
349 },
350 Some(DatasetType::Timestamp()) => {
351 let (_,time) = parse::signed(&buf[offset..])?;
352 Some(Dataset::Timestamp(Timestamp { time }))
353 },
354 Some(DatasetType::BBox()) => {
355 let (s,x1) = parse::signed(&buf[offset..])?;
356 offset += s;
357 let (s,y1) = parse::signed(&buf[offset..])?;
358 offset += s;
359 let (s,x2) = parse::signed(&buf[offset..])?;
360 offset += s;
361 let (_,y2) = parse::signed(&buf[offset..])?;
362 Some(Dataset::BBox(BBox {
363 x1: x1 as i32,
364 y1: y1 as i32,
365 x2: x2 as i32,
366 y2: y2 as i32,
367 }))
368 },
369 Some(DatasetType::Header()) => None,
370 Some(DatasetType::Sync()) => None,
371 Some(DatasetType::Jump()) => None,
372 Some(DatasetType::Reset()) => None,
373 None => None,
374 })
375 }
376}
377
378pub fn decode(reader: Box<dyn io::Read+Send+Unpin>) -> DecodeStream {
380 let state = Decoder::new(reader);
381 Box::new(unfold::unfold(state, async move |mut qs| {
382 match qs.next_item().await {
383 Ok(None) => None,
384 Ok(Some(x)) => Some((Ok(x),qs)),
385 Err(e) => Some((Err(e),qs)),
386 }
387 }))
388}