quick_xml/reader/async_tokio.rs
1//! This is an implementation of [`Reader`] for reading from a [`AsyncBufRead`]
2//! as underlying byte stream. This reader fully implements async/await so reading
3//! can use non-blocking I/O.
4
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, ReadBuf};
9
10use crate::errors::{Error, IllFormedError, Result, SyntaxError};
11use crate::events::{BytesRef, Event};
12use crate::name::{QName, ResolveResult};
13use crate::parser::{ElementParser, Parser, PiParser};
14use crate::reader::buffered_reader::impl_buffered_source;
15use crate::reader::{
16 BangType, BinaryStream, NsReader, ParseState, ReadRefResult, ReadTextResult, Reader, Span,
17};
18use crate::utils::is_whitespace;
19
20/// A struct for read XML asynchronously from an [`AsyncBufRead`].
21///
22/// Having own struct allows us to implement anything without risk of name conflicts
23/// and does not suffer from the impossibility of having `async` in traits.
24struct TokioAdapter<'a, R>(&'a mut R);
25
26impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> {
27 impl_buffered_source!('b, 0, async, await);
28}
29
30////////////////////////////////////////////////////////////////////////////////////////////////////
31
32impl<'r, R> AsyncRead for BinaryStream<'r, R>
33where
34 R: AsyncRead + Unpin,
35{
36 fn poll_read(
37 self: Pin<&mut Self>,
38 cx: &mut Context<'_>,
39 buf: &mut ReadBuf<'_>,
40 ) -> Poll<io::Result<()>> {
41 let start = buf.remaining();
42 let this = self.get_mut();
43 let poll = Pin::new(&mut *this.inner).poll_read(cx, buf);
44
45 // If something was read, update offset
46 if let Poll::Ready(Ok(_)) = poll {
47 let amt = start - buf.remaining();
48 *this.offset += amt as u64;
49 }
50 poll
51 }
52}
53
54impl<'r, R> AsyncBufRead for BinaryStream<'r, R>
55where
56 R: AsyncBufRead + Unpin,
57{
58 #[inline]
59 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
60 Pin::new(&mut *self.get_mut().inner).poll_fill_buf(cx)
61 }
62
63 #[inline]
64 fn consume(self: Pin<&mut Self>, amt: usize) {
65 let this = self.get_mut();
66 this.inner.consume(amt);
67 *this.offset += amt as u64;
68 }
69}
70
71////////////////////////////////////////////////////////////////////////////////////////////////////
72
73impl<R: AsyncBufRead + Unpin> Reader<R> {
74 /// An asynchronous version of [`read_event_into()`]. Reads the next event into
75 /// given buffer.
76 ///
77 /// This is the main entry point for reading XML `Event`s when using an async reader.
78 ///
79 /// See the documentation of [`read_event_into()`] for more information.
80 ///
81 /// # Examples
82 ///
83 /// ```
84 /// # tokio_test::block_on(async {
85 /// # use pretty_assertions::assert_eq;
86 /// use quick_xml::events::Event;
87 /// use quick_xml::reader::Reader;
88 ///
89 /// // This explicitly uses `from_reader("...".as_bytes())` to use a buffered
90 /// // reader instead of relying on the zero-copy optimizations for reading
91 /// // from byte slices, which provides the sync interface anyway.
92 /// let mut reader = Reader::from_reader(r#"
93 /// <tag1 att1 = "test">
94 /// <tag2><!--Test comment-->Test</tag2>
95 /// <tag2>Test 2</tag2>
96 /// </tag1>
97 /// "#.as_bytes());
98 /// reader.config_mut().trim_text(true);
99 ///
100 /// let mut count = 0;
101 /// let mut buf = Vec::new();
102 /// let mut txt = Vec::new();
103 /// loop {
104 /// match reader.read_event_into_async(&mut buf).await {
105 /// Ok(Event::Start(_)) => count += 1,
106 /// Ok(Event::Text(e)) => txt.push(e.decode().unwrap().into_owned()),
107 /// Err(e) => panic!("Error at position {}: {:?}", reader.error_position(), e),
108 /// Ok(Event::Eof) => break,
109 /// _ => (),
110 /// }
111 /// buf.clear();
112 /// }
113 /// assert_eq!(count, 3);
114 /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
115 /// # }) // tokio_test::block_on
116 /// ```
117 ///
118 /// [`read_event_into()`]: Reader::read_event_into
119 pub async fn read_event_into_async<'b>(
120 &mut self,
121 mut buf: &'b mut Vec<u8>,
122 ) -> Result<Event<'b>> {
123 read_event_impl!(
124 self,
125 buf,
126 TokioAdapter(&mut self.reader),
127 read_until_close_async,
128 await
129 )
130 }
131
132 /// An asynchronous version of [`read_to_end_into()`].
133 /// Reads asynchronously until end element is found using provided buffer as
134 /// intermediate storage for events content. This function is supposed to be
135 /// called after you already read a [`Start`] event.
136 ///
137 /// See the documentation of [`read_to_end_into()`] for more information.
138 ///
139 /// # Examples
140 ///
141 /// This example shows, how you can skip XML content after you read the
142 /// start event.
143 ///
144 /// ```
145 /// # tokio_test::block_on(async {
146 /// # use pretty_assertions::assert_eq;
147 /// use quick_xml::events::{BytesStart, Event};
148 /// use quick_xml::reader::Reader;
149 ///
150 /// let mut reader = Reader::from_reader(r#"
151 /// <outer>
152 /// <inner>
153 /// <inner></inner>
154 /// <inner/>
155 /// <outer></outer>
156 /// <outer/>
157 /// </inner>
158 /// </outer>
159 /// "#.as_bytes());
160 /// reader.config_mut().trim_text(true);
161 /// let mut buf = Vec::new();
162 ///
163 /// let start = BytesStart::new("outer");
164 /// let end = start.to_end().into_owned();
165 ///
166 /// // First, we read a start event...
167 /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start));
168 ///
169 /// // ...then, we could skip all events to the corresponding end event.
170 /// // This call will correctly handle nested <outer> elements.
171 /// // Note, however, that this method does not handle namespaces.
172 /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
173 ///
174 /// // At the end we should get an Eof event, because we ate the whole XML
175 /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof);
176 /// # }) // tokio_test::block_on
177 /// ```
178 ///
179 /// [`read_to_end_into()`]: Self::read_to_end_into
180 /// [`Start`]: Event::Start
181 pub async fn read_to_end_into_async<'n>(
182 &mut self,
183 // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033`
184 end: QName<'n>,
185 buf: &mut Vec<u8>,
186 ) -> Result<Span> {
187 Ok(read_to_end!(
188 self,
189 end,
190 buf,
191 read_event_into_async,
192 {
193 buf.clear();
194 },
195 await
196 ))
197 }
198
199 /// Private function to read until `>` is found. This function expects that
200 /// it was called just after encounter a `<` symbol.
201 async fn read_until_close_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
202 read_until_close!(self, buf, TokioAdapter(&mut self.reader), await)
203 }
204}
205
206////////////////////////////////////////////////////////////////////////////////////////////////////
207
208impl<R: AsyncBufRead + Unpin> NsReader<R> {
209 /// An asynchronous version of [`read_event_into()`]. Reads the next event into
210 /// given buffer.
211 ///
212 /// This method manages namespaces but doesn't resolve them automatically.
213 /// You should call [`resolve_element()`] if you want to get a namespace.
214 ///
215 /// You also can use [`read_resolved_event_into_async()`] instead if you want
216 /// to resolve namespace as soon as you get an event.
217 ///
218 /// # Examples
219 ///
220 /// ```
221 /// # tokio_test::block_on(async {
222 /// # use pretty_assertions::assert_eq;
223 /// use quick_xml::events::Event;
224 /// use quick_xml::name::{Namespace, ResolveResult::*};
225 /// use quick_xml::reader::NsReader;
226 ///
227 /// let mut reader = NsReader::from_reader(r#"
228 /// <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
229 /// <y:tag2><!--Test comment-->Test</y:tag2>
230 /// <y:tag2>Test 2</y:tag2>
231 /// </x:tag1>
232 /// "#.as_bytes());
233 /// reader.config_mut().trim_text(true);
234 ///
235 /// let mut count = 0;
236 /// let mut buf = Vec::new();
237 /// let mut txt = Vec::new();
238 /// loop {
239 /// match reader.read_event_into_async(&mut buf).await.unwrap() {
240 /// Event::Start(e) => {
241 /// count += 1;
242 /// let (ns, local) = reader.resolve_element(e.name());
243 /// match local.as_ref() {
244 /// b"tag1" => assert_eq!(ns, Bound(Namespace(b"www.xxxx"))),
245 /// b"tag2" => assert_eq!(ns, Bound(Namespace(b"www.yyyy"))),
246 /// _ => unreachable!(),
247 /// }
248 /// }
249 /// Event::Text(e) => {
250 /// txt.push(e.decode().unwrap().into_owned())
251 /// }
252 /// Event::Eof => break,
253 /// _ => (),
254 /// }
255 /// buf.clear();
256 /// }
257 /// assert_eq!(count, 3);
258 /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
259 /// # }) // tokio_test::block_on
260 /// ```
261 ///
262 /// [`read_event_into()`]: NsReader::read_event_into
263 /// [`resolve_element()`]: Self::resolve_element
264 /// [`read_resolved_event_into_async()`]: Self::read_resolved_event_into_async
265 pub async fn read_event_into_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
266 self.pop();
267 let event = self.reader.read_event_into_async(buf).await;
268 self.process_event(event)
269 }
270
271 /// An asynchronous version of [`read_to_end_into()`].
272 /// Reads asynchronously until end element is found using provided buffer as
273 /// intermediate storage for events content. This function is supposed to be
274 /// called after you already read a [`Start`] event.
275 ///
276 /// See the documentation of [`read_to_end_into()`] for more information.
277 ///
278 /// # Examples
279 ///
280 /// This example shows, how you can skip XML content after you read the
281 /// start event.
282 ///
283 /// ```
284 /// # tokio_test::block_on(async {
285 /// # use pretty_assertions::assert_eq;
286 /// use quick_xml::name::{Namespace, ResolveResult};
287 /// use quick_xml::events::{BytesStart, Event};
288 /// use quick_xml::reader::NsReader;
289 ///
290 /// let mut reader = NsReader::from_reader(r#"
291 /// <outer xmlns="namespace 1">
292 /// <inner xmlns="namespace 2">
293 /// <outer></outer>
294 /// </inner>
295 /// <inner>
296 /// <inner></inner>
297 /// <inner/>
298 /// <outer></outer>
299 /// <p:outer xmlns:p="ns"></p:outer>
300 /// <outer/>
301 /// </inner>
302 /// </outer>
303 /// "#.as_bytes());
304 /// reader.config_mut().trim_text(true);
305 /// let mut buf = Vec::new();
306 ///
307 /// let ns = Namespace(b"namespace 1");
308 /// let start = BytesStart::from_content(r#"outer xmlns="namespace 1""#, 5);
309 /// let end = start.to_end().into_owned();
310 ///
311 /// // First, we read a start event...
312 /// assert_eq!(
313 /// reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
314 /// (ResolveResult::Bound(ns), Event::Start(start))
315 /// );
316 ///
317 /// // ...then, we could skip all events to the corresponding end event.
318 /// // This call will correctly handle nested <outer> elements.
319 /// // Note, however, that this method does not handle namespaces.
320 /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
321 ///
322 /// // At the end we should get an Eof event, because we ate the whole XML
323 /// assert_eq!(
324 /// reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
325 /// (ResolveResult::Unbound, Event::Eof)
326 /// );
327 /// # }) // tokio_test::block_on
328 /// ```
329 ///
330 /// [`read_to_end_into()`]: Self::read_to_end_into
331 /// [`Start`]: Event::Start
332 pub async fn read_to_end_into_async<'n>(
333 &mut self,
334 // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033`
335 end: QName<'n>,
336 buf: &mut Vec<u8>,
337 ) -> Result<Span> {
338 // According to the https://www.w3.org/TR/xml11/#dt-etag, end name should
339 // match literally the start name. See `Config::check_end_names` documentation
340 self.reader.read_to_end_into_async(end, buf).await
341 }
342
343 /// An asynchronous version of [`read_resolved_event_into()`]. Reads the next
344 /// event into given buffer asynchronously and resolves its namespace (if applicable).
345 ///
346 /// Namespace is resolved only for [`Start`], [`Empty`] and [`End`] events.
347 /// For all other events the concept of namespace is not defined, so
348 /// a [`ResolveResult::Unbound`] is returned.
349 ///
350 /// If you are not interested in namespaces, you can use [`read_event_into_async()`]
351 /// which will not automatically resolve namespaces for you.
352 ///
353 /// # Examples
354 ///
355 /// ```
356 /// # tokio_test::block_on(async {
357 /// # use pretty_assertions::assert_eq;
358 /// use quick_xml::events::Event;
359 /// use quick_xml::name::{Namespace, QName, ResolveResult::*};
360 /// use quick_xml::reader::NsReader;
361 ///
362 /// let mut reader = NsReader::from_reader(r#"
363 /// <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
364 /// <y:tag2><!--Test comment-->Test</y:tag2>
365 /// <y:tag2>Test 2</y:tag2>
366 /// </x:tag1>
367 /// "#.as_bytes());
368 /// reader.config_mut().trim_text(true);
369 ///
370 /// let mut count = 0;
371 /// let mut buf = Vec::new();
372 /// let mut txt = Vec::new();
373 /// loop {
374 /// match reader.read_resolved_event_into_async(&mut buf).await.unwrap() {
375 /// (Bound(Namespace(b"www.xxxx")), Event::Start(e)) => {
376 /// count += 1;
377 /// assert_eq!(e.local_name(), QName(b"tag1").into());
378 /// }
379 /// (Bound(Namespace(b"www.yyyy")), Event::Start(e)) => {
380 /// count += 1;
381 /// assert_eq!(e.local_name(), QName(b"tag2").into());
382 /// }
383 /// (_, Event::Start(_)) => unreachable!(),
384 ///
385 /// (_, Event::Text(e)) => {
386 /// txt.push(e.decode().unwrap().into_owned())
387 /// }
388 /// (_, Event::Eof) => break,
389 /// _ => (),
390 /// }
391 /// buf.clear();
392 /// }
393 /// assert_eq!(count, 3);
394 /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
395 /// # }) // tokio_test::block_on
396 /// ```
397 ///
398 /// [`read_resolved_event_into()`]: NsReader::read_resolved_event_into
399 /// [`Start`]: Event::Start
400 /// [`Empty`]: Event::Empty
401 /// [`End`]: Event::End
402 /// [`read_event_into_async()`]: Self::read_event_into_async
403 pub async fn read_resolved_event_into_async<'ns, 'b>(
404 // Name 'ns lifetime, because otherwise we get an error
405 // "implicit elided lifetime not allowed here" on ResolveResult
406 &'ns mut self,
407 buf: &'b mut Vec<u8>,
408 ) -> Result<(ResolveResult<'ns>, Event<'b>)> {
409 let event = self.read_event_into_async(buf).await;
410 self.resolve_event(event)
411 }
412}
413
414#[cfg(test)]
415mod test {
416 use super::TokioAdapter;
417 use crate::reader::test::check;
418
419 check!(
420 #[tokio::test]
421 read_event_into_async,
422 read_until_close_async,
423 TokioAdapter,
424 &mut Vec::new(),
425 async,
426 await
427 );
428
429 #[test]
430 fn test_future_is_send() {
431 // This test should just compile, no actual runtime checks are performed here.
432 use super::*;
433 use tokio::io::BufReader;
434 fn check_send<T: Send>(_: T) {}
435
436 let input = vec![];
437 let mut reading_buf = vec![];
438 let mut reader = Reader::from_reader(BufReader::new(input.as_slice()));
439
440 check_send(reader.read_event_into_async(&mut reading_buf));
441 }
442}