quick_xml/reader/async_tokio.rs
1//! This is an implementation of [`Reader`] for reading from a [`AsyncBufRead`]
2//! as underlying byte stream. This reader fully implements async/await so reading
3//! can use non-blocking I/O.
4
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, ReadBuf};
9
10use crate::errors::{Error, IllFormedError, Result, SyntaxError};
11use crate::events::{BytesRef, BytesText, Event};
12use crate::name::{QName, ResolveResult};
13use crate::parser::{ElementParser, Parser, PiParser};
14use crate::reader::buffered_reader::impl_buffered_source;
15use crate::reader::{
16 BangType, BinaryStream, NsReader, ParseState, ReadRefResult, ReadTextResult, Reader, Span,
17};
18use crate::utils::is_whitespace;
19
20/// A struct for read XML asynchronously from an [`AsyncBufRead`].
21///
22/// Having own struct allows us to implement anything without risk of name conflicts
23/// and does not suffer from the impossibility of having `async` in traits.
24struct TokioAdapter<'a, R>(&'a mut R);
25
26impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> {
27 impl_buffered_source!('b, 0, async, await);
28}
29
30////////////////////////////////////////////////////////////////////////////////////////////////////
31
32impl<'r, R> AsyncRead for BinaryStream<'r, R>
33where
34 R: AsyncRead + Unpin,
35{
36 fn poll_read(
37 self: Pin<&mut Self>,
38 cx: &mut Context<'_>,
39 buf: &mut ReadBuf<'_>,
40 ) -> Poll<io::Result<()>> {
41 let start = buf.remaining();
42 let this = self.get_mut();
43 let poll = Pin::new(&mut *this.inner).poll_read(cx, buf);
44
45 // If something was read, update offset
46 if let Poll::Ready(Ok(_)) = poll {
47 let amt = start - buf.remaining();
48 *this.offset += amt as u64;
49 }
50 poll
51 }
52}
53
54impl<'r, R> AsyncBufRead for BinaryStream<'r, R>
55where
56 R: AsyncBufRead + Unpin,
57{
58 #[inline]
59 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
60 Pin::new(&mut *self.get_mut().inner).poll_fill_buf(cx)
61 }
62
63 #[inline]
64 fn consume(self: Pin<&mut Self>, amt: usize) {
65 let this = self.get_mut();
66 this.inner.consume(amt);
67 *this.offset += amt as u64;
68 }
69}
70
71////////////////////////////////////////////////////////////////////////////////////////////////////
72
73impl<R: AsyncBufRead + Unpin> Reader<R> {
74 /// An asynchronous version of [`read_event_into()`]. Reads the next event into
75 /// given buffer.
76 ///
77 /// This is the main entry point for reading XML `Event`s when using an async reader.
78 ///
79 /// See the documentation of [`read_event_into()`] for more information.
80 ///
81 /// # Examples
82 ///
83 /// ```
84 /// # tokio_test::block_on(async {
85 /// # use pretty_assertions::assert_eq;
86 /// use quick_xml::events::Event;
87 /// use quick_xml::reader::Reader;
88 ///
89 /// // This explicitly uses `from_reader("...".as_bytes())` to use a buffered
90 /// // reader instead of relying on the zero-copy optimizations for reading
91 /// // from byte slices, which provides the sync interface anyway.
92 /// let mut reader = Reader::from_reader(r#"
93 /// <tag1 att1 = "test">
94 /// <tag2><!--Test comment-->Test</tag2>
95 /// <tag2>Test 2</tag2>
96 /// </tag1>
97 /// "#.as_bytes());
98 /// reader.config_mut().trim_text(true);
99 ///
100 /// let mut count = 0;
101 /// let mut buf = Vec::new();
102 /// let mut txt = Vec::new();
103 /// loop {
104 /// match reader.read_event_into_async(&mut buf).await {
105 /// Ok(Event::Start(_)) => count += 1,
106 /// Ok(Event::Text(e)) => txt.push(e.decode().unwrap().into_owned()),
107 /// Err(e) => panic!("Error at position {}: {:?}", reader.error_position(), e),
108 /// Ok(Event::Eof) => break,
109 /// _ => (),
110 /// }
111 /// buf.clear();
112 /// }
113 /// assert_eq!(count, 3);
114 /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
115 /// # }) // tokio_test::block_on
116 /// ```
117 ///
118 /// [`read_event_into()`]: Reader::read_event_into
119 pub async fn read_event_into_async<'b>(
120 &mut self,
121 mut buf: &'b mut Vec<u8>,
122 ) -> Result<Event<'b>> {
123 read_event_impl!(
124 self,
125 buf,
126 TokioAdapter(&mut self.reader),
127 read_until_close_async,
128 await
129 )
130 }
131
132 /// An asynchronous version of [`read_to_end_into()`].
133 /// Reads asynchronously until end element is found using provided buffer as
134 /// intermediate storage for events content. This function is supposed to be
135 /// called after you already read a [`Start`] event.
136 ///
137 /// See the documentation of [`read_to_end_into()`] for more information.
138 ///
139 /// # Examples
140 ///
141 /// This example shows, how you can skip XML content after you read the
142 /// start event.
143 ///
144 /// ```
145 /// # tokio_test::block_on(async {
146 /// # use pretty_assertions::assert_eq;
147 /// use quick_xml::events::{BytesStart, Event};
148 /// use quick_xml::reader::Reader;
149 ///
150 /// let mut reader = Reader::from_reader(r#"
151 /// <outer>
152 /// <inner>
153 /// <inner></inner>
154 /// <inner/>
155 /// <outer></outer>
156 /// <outer/>
157 /// </inner>
158 /// </outer>
159 /// "#.as_bytes());
160 /// reader.config_mut().trim_text(true);
161 /// let mut buf = Vec::new();
162 ///
163 /// let start = BytesStart::new("outer");
164 /// let end = start.to_end().into_owned();
165 ///
166 /// // First, we read a start event...
167 /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start));
168 ///
169 /// // ...then, we could skip all events to the corresponding end event.
170 /// // This call will correctly handle nested <outer> elements.
171 /// // Note, however, that this method does not handle namespaces.
172 /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
173 ///
174 /// // At the end we should get an Eof event, because we ate the whole XML
175 /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof);
176 /// # }) // tokio_test::block_on
177 /// ```
178 ///
179 /// [`read_to_end_into()`]: Self::read_to_end_into
180 /// [`Start`]: Event::Start
181 pub async fn read_to_end_into_async<'n>(
182 &mut self,
183 // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033
184 end: QName<'n>,
185 buf: &mut Vec<u8>,
186 ) -> Result<Span> {
187 Ok(read_to_end!(
188 self,
189 end,
190 buf,
191 read_event_into_async,
192 {
193 buf.clear();
194 },
195 await
196 ))
197 }
198
199 /// An asynchronous version of [`read_text_into()`].
200 /// Reads asynchronously until end element is found using provided buffer as
201 /// intermediate storage for events content. This function is supposed to be
202 /// called after you already read a [`Start`] event.
203 ///
204 /// See the documentation of [`read_text_into()`] for more information.
205 ///
206 /// # Examples
207 ///
208 /// This example shows, how you can read a HTML content from your XML document.
209 ///
210 /// ```
211 /// # tokio_test::block_on(async {
212 /// # use pretty_assertions::assert_eq;
213 /// # use std::borrow::Cow;
214 /// use quick_xml::events::{BytesStart, Event};
215 /// use quick_xml::reader::Reader;
216 ///
217 /// let mut reader = Reader::from_reader("
218 /// <html>
219 /// <title>This is a HTML text</title>
220 /// <p>Usual XML rules does not apply inside it
221 /// <p>For example, elements not needed to be "closed"
222 /// </html>
223 /// ".as_bytes());
224 /// reader.config_mut().trim_text(true);
225 ///
226 /// let start = BytesStart::new("html");
227 /// let end = start.to_end().into_owned();
228 ///
229 /// let mut buf = Vec::new();
230 ///
231 /// // First, we read a start event...
232 /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start));
233 /// // ...and disable checking of end names because we expect HTML further...
234 /// reader.config_mut().check_end_names = false;
235 ///
236 /// // ...then, we could read text content until close tag.
237 /// // This call will correctly handle nested <html> elements.
238 /// let text = reader.read_text_into_async(end.name(), &mut buf).await.unwrap();
239 /// let text = text.decode().unwrap();
240 /// assert_eq!(text, r#"
241 /// <title>This is a HTML text</title>
242 /// <p>Usual XML rules does not apply inside it
243 /// <p>For example, elements not needed to be "closed"
244 /// "#);
245 /// assert!(matches!(text, Cow::Borrowed(_)));
246 ///
247 /// // Now we can enable checks again
248 /// reader.config_mut().check_end_names = true;
249 ///
250 /// // At the end we should get an Eof event, because we ate the whole XML
251 /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof);
252 /// # }) // tokio_test::block_on
253 /// ```
254 ///
255 /// [`read_text_into()`]: Self::read_text_into
256 /// [`Start`]: Event::Start
257 pub async fn read_text_into_async<'n, 'b>(
258 &mut self,
259 // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033
260 end: QName<'n>,
261 buf: &'b mut Vec<u8>,
262 ) -> Result<BytesText<'b>> {
263 let start = buf.len();
264 let span = read_to_end!(self, end, buf, read_event_into_async, {}, await);
265
266 let len = span.end - span.start;
267 // SAFETY: `buf` may contain not more than isize::MAX bytes and because it is
268 // not cleared when reading event, length of the returned span should fit into
269 // usize (because otherwise we panic at appending to the buffer before that point)
270 let end = start + len as usize;
271
272 Ok(BytesText::wrap(&buf[start..end], self.decoder()))
273 }
274
275 /// Private function to read until `>` is found. This function expects that
276 /// it was called just after encounter a `<` symbol.
277 async fn read_until_close_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
278 read_until_close!(self, buf, TokioAdapter(&mut self.reader), await)
279 }
280}
281
282////////////////////////////////////////////////////////////////////////////////////////////////////
283
284impl<R: AsyncBufRead + Unpin> NsReader<R> {
285 /// An asynchronous version of [`read_event_into()`]. Reads the next event into
286 /// given buffer.
287 ///
288 /// This method manages namespaces but doesn't resolve them automatically.
289 /// You should call [`resolver().resolve_element()`] if you want to get a namespace.
290 ///
291 /// You also can use [`read_resolved_event_into_async()`] instead if you want
292 /// to resolve namespace as soon as you get an event.
293 ///
294 /// # Examples
295 ///
296 /// ```
297 /// # tokio_test::block_on(async {
298 /// # use pretty_assertions::assert_eq;
299 /// use quick_xml::events::Event;
300 /// use quick_xml::name::{Namespace, ResolveResult::*};
301 /// use quick_xml::reader::NsReader;
302 ///
303 /// let mut reader = NsReader::from_reader(r#"
304 /// <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
305 /// <y:tag2><!--Test comment-->Test</y:tag2>
306 /// <y:tag2>Test 2</y:tag2>
307 /// </x:tag1>
308 /// "#.as_bytes());
309 /// reader.config_mut().trim_text(true);
310 ///
311 /// let mut count = 0;
312 /// let mut buf = Vec::new();
313 /// let mut txt = Vec::new();
314 /// loop {
315 /// match reader.read_event_into_async(&mut buf).await.unwrap() {
316 /// Event::Start(e) => {
317 /// count += 1;
318 /// let (ns, local) = reader.resolver().resolve_element(e.name());
319 /// match local.as_ref() {
320 /// b"tag1" => assert_eq!(ns, Bound(Namespace(b"www.xxxx"))),
321 /// b"tag2" => assert_eq!(ns, Bound(Namespace(b"www.yyyy"))),
322 /// _ => unreachable!(),
323 /// }
324 /// }
325 /// Event::Text(e) => {
326 /// txt.push(e.decode().unwrap().into_owned())
327 /// }
328 /// Event::Eof => break,
329 /// _ => (),
330 /// }
331 /// buf.clear();
332 /// }
333 /// assert_eq!(count, 3);
334 /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
335 /// # }) // tokio_test::block_on
336 /// ```
337 ///
338 /// [`read_event_into()`]: NsReader::read_event_into
339 /// [`resolver().resolve_element()`]: crate::name::NamespaceResolver::resolve_element
340 /// [`read_resolved_event_into_async()`]: Self::read_resolved_event_into_async
341 pub async fn read_event_into_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
342 self.pop();
343 let event = self.reader.read_event_into_async(buf).await;
344 self.process_event(event)
345 }
346
347 /// An asynchronous version of [`read_to_end_into()`].
348 /// Reads asynchronously until end element is found using provided buffer as
349 /// intermediate storage for events content. This function is supposed to be
350 /// called after you already read a [`Start`] event.
351 ///
352 /// See the documentation of [`read_to_end_into()`] for more information.
353 ///
354 /// # Examples
355 ///
356 /// This example shows, how you can skip XML content after you read the
357 /// start event.
358 ///
359 /// ```
360 /// # tokio_test::block_on(async {
361 /// # use pretty_assertions::assert_eq;
362 /// use quick_xml::name::{Namespace, ResolveResult};
363 /// use quick_xml::events::{BytesStart, Event};
364 /// use quick_xml::reader::NsReader;
365 ///
366 /// let mut reader = NsReader::from_reader(r#"
367 /// <outer xmlns="namespace 1">
368 /// <inner xmlns="namespace 2">
369 /// <outer></outer>
370 /// </inner>
371 /// <inner>
372 /// <inner></inner>
373 /// <inner/>
374 /// <outer></outer>
375 /// <p:outer xmlns:p="ns"></p:outer>
376 /// <outer/>
377 /// </inner>
378 /// </outer>
379 /// "#.as_bytes());
380 /// reader.config_mut().trim_text(true);
381 /// let mut buf = Vec::new();
382 ///
383 /// let ns = Namespace(b"namespace 1");
384 /// let start = BytesStart::from_content(r#"outer xmlns="namespace 1""#, 5);
385 /// let end = start.to_end().into_owned();
386 ///
387 /// // First, we read a start event...
388 /// assert_eq!(
389 /// reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
390 /// (ResolveResult::Bound(ns), Event::Start(start))
391 /// );
392 ///
393 /// // ...then, we could skip all events to the corresponding end event.
394 /// // This call will correctly handle nested <outer> elements.
395 /// // Note, however, that this method does not handle namespaces.
396 /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
397 ///
398 /// // At the end we should get an Eof event, because we ate the whole XML
399 /// assert_eq!(
400 /// reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
401 /// (ResolveResult::Unbound, Event::Eof)
402 /// );
403 /// # }) // tokio_test::block_on
404 /// ```
405 ///
406 /// [`read_to_end_into()`]: Self::read_to_end_into
407 /// [`Start`]: Event::Start
408 pub async fn read_to_end_into_async<'n>(
409 &mut self,
410 // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033`
411 end: QName<'n>,
412 buf: &mut Vec<u8>,
413 ) -> Result<Span> {
414 // According to the https://www.w3.org/TR/xml11/#dt-etag, end name should
415 // match literally the start name. See `Config::check_end_names` documentation
416 let result = self.reader.read_to_end_into_async(end, buf).await?;
417 // read_to_end_into_async will consume closing tag. Because nobody can access to its
418 // content anymore, we directly pop namespace of the opening tag
419 self.ns_resolver.pop();
420 Ok(result)
421 }
422
423 /// An asynchronous version of [`read_text_into()`].
424 /// Reads asynchronously until end element is found using provided buffer as
425 /// intermediate storage for events content. This function is supposed to be
426 /// called after you already read a [`Start`] event.
427 ///
428 /// See the documentation of [`read_text_into()`] for more information.
429 ///
430 /// # Examples
431 ///
432 /// This example shows, how you can read a HTML content from your XML document.
433 ///
434 /// ```
435 /// # tokio_test::block_on(async {
436 /// # use pretty_assertions::assert_eq;
437 /// # use std::borrow::Cow;
438 /// use quick_xml::events::{BytesStart, Event};
439 /// use quick_xml::reader::NsReader;
440 ///
441 /// let mut reader = NsReader::from_reader("
442 /// <html>
443 /// <title>This is a HTML text</title>
444 /// <p>Usual XML rules does not apply inside it
445 /// <p>For example, elements not needed to be "closed"
446 /// </html>
447 /// ".as_bytes());
448 /// reader.config_mut().trim_text(true);
449 ///
450 /// let start = BytesStart::new("html");
451 /// let end = start.to_end().into_owned();
452 ///
453 /// let mut buf = Vec::new();
454 ///
455 /// // First, we read a start event...
456 /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start));
457 /// // ...and disable checking of end names because we expect HTML further...
458 /// reader.config_mut().check_end_names = false;
459 ///
460 /// // ...then, we could read text content until close tag.
461 /// // This call will correctly handle nested <html> elements.
462 /// let text = reader.read_text_into_async(end.name(), &mut buf).await.unwrap();
463 /// let text = text.decode().unwrap();
464 /// assert_eq!(text, r#"
465 /// <title>This is a HTML text</title>
466 /// <p>Usual XML rules does not apply inside it
467 /// <p>For example, elements not needed to be "closed"
468 /// "#);
469 /// assert!(matches!(text, Cow::Borrowed(_)));
470 ///
471 /// // Now we can enable checks again
472 /// reader.config_mut().check_end_names = true;
473 ///
474 /// // At the end we should get an Eof event, because we ate the whole XML
475 /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof);
476 /// # }) // tokio_test::block_on
477 /// ```
478 ///
479 /// [`read_text_into()`]: Self::read_text_into
480 /// [`Start`]: Event::Start
481 pub async fn read_text_into_async<'n, 'b>(
482 &mut self,
483 // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033
484 end: QName<'n>,
485 buf: &'b mut Vec<u8>,
486 ) -> Result<BytesText<'b>> {
487 // According to the https://www.w3.org/TR/xml11/#dt-etag, end name should
488 // match literally the start name. See `Config::check_end_names` documentation
489 let result = self.reader.read_text_into_async(end, buf).await?;
490 // read_text_into_async will consume closing tag. Because nobody can access to its
491 // content anymore, we directly pop namespace of the opening tag
492 self.ns_resolver.pop();
493 Ok(result)
494 }
495
496 /// An asynchronous version of [`read_resolved_event_into()`]. Reads the next
497 /// event into given buffer asynchronously and resolves its namespace (if applicable).
498 ///
499 /// Namespace is resolved only for [`Start`], [`Empty`] and [`End`] events.
500 /// For all other events the concept of namespace is not defined, so
501 /// a [`ResolveResult::Unbound`] is returned.
502 ///
503 /// If you are not interested in namespaces, you can use [`read_event_into_async()`]
504 /// which will not automatically resolve namespaces for you.
505 ///
506 /// # Examples
507 ///
508 /// ```
509 /// # tokio_test::block_on(async {
510 /// # use pretty_assertions::assert_eq;
511 /// use quick_xml::events::Event;
512 /// use quick_xml::name::{Namespace, QName, ResolveResult::*};
513 /// use quick_xml::reader::NsReader;
514 ///
515 /// let mut reader = NsReader::from_reader(r#"
516 /// <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
517 /// <y:tag2><!--Test comment-->Test</y:tag2>
518 /// <y:tag2>Test 2</y:tag2>
519 /// </x:tag1>
520 /// "#.as_bytes());
521 /// reader.config_mut().trim_text(true);
522 ///
523 /// let mut count = 0;
524 /// let mut buf = Vec::new();
525 /// let mut txt = Vec::new();
526 /// loop {
527 /// match reader.read_resolved_event_into_async(&mut buf).await.unwrap() {
528 /// (Bound(Namespace(b"www.xxxx")), Event::Start(e)) => {
529 /// count += 1;
530 /// assert_eq!(e.local_name(), QName(b"tag1").into());
531 /// }
532 /// (Bound(Namespace(b"www.yyyy")), Event::Start(e)) => {
533 /// count += 1;
534 /// assert_eq!(e.local_name(), QName(b"tag2").into());
535 /// }
536 /// (_, Event::Start(_)) => unreachable!(),
537 ///
538 /// (_, Event::Text(e)) => {
539 /// txt.push(e.decode().unwrap().into_owned())
540 /// }
541 /// (_, Event::Eof) => break,
542 /// _ => (),
543 /// }
544 /// buf.clear();
545 /// }
546 /// assert_eq!(count, 3);
547 /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
548 /// # }) // tokio_test::block_on
549 /// ```
550 ///
551 /// [`read_resolved_event_into()`]: NsReader::read_resolved_event_into
552 /// [`Start`]: Event::Start
553 /// [`Empty`]: Event::Empty
554 /// [`End`]: Event::End
555 /// [`read_event_into_async()`]: Self::read_event_into_async
556 pub async fn read_resolved_event_into_async<'ns, 'b>(
557 // Name 'ns lifetime, because otherwise we get an error
558 // "implicit elided lifetime not allowed here" on ResolveResult
559 &'ns mut self,
560 buf: &'b mut Vec<u8>,
561 ) -> Result<(ResolveResult<'ns>, Event<'b>)> {
562 let event = self.read_event_into_async(buf).await?;
563 Ok(self.resolver().resolve_event(event))
564 }
565}
566
567#[cfg(test)]
568mod test {
569 use super::TokioAdapter;
570 use crate::reader::test::check;
571
572 check!(
573 #[tokio::test]
574 read_event_into_async,
575 read_until_close_async,
576 TokioAdapter,
577 1,
578 &mut Vec::new(),
579 async,
580 await
581 );
582
583 #[test]
584 fn test_future_is_send() {
585 // This test should just compile, no actual runtime checks are performed here.
586 use super::*;
587 use tokio::io::BufReader;
588 fn check_send<T: Send>(_: T) {}
589
590 let input = vec![];
591 let mut reading_buf = vec![];
592 let mut reader = Reader::from_reader(BufReader::new(input.as_slice()));
593
594 check_send(reader.read_event_into_async(&mut reading_buf));
595 }
596}