Skip to main content

quick_xml/reader/
async_tokio.rs

1//! This is an implementation of [`Reader`] for reading from a [`AsyncBufRead`]
2//! as underlying byte stream. This reader fully implements async/await so reading
3//! can use non-blocking I/O.
4
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, ReadBuf};
9
10use crate::encoding;
11use crate::errors::{Error, IllFormedError, Result, SyntaxError};
12use crate::events::{BytesRef, BytesText, Event};
13use crate::name::{QName, ResolveResult};
14use crate::parser::{ElementParser, Parser, PiParser};
15use crate::reader::buffered_reader::impl_buffered_source;
16use crate::reader::{
17    BangType, BinaryStream, NsReader, ParseState, ReadRefResult, ReadTextResult, Reader, Span,
18};
19use crate::utils::is_whitespace;
20
21/// A struct for read XML asynchronously from an [`AsyncBufRead`].
22///
23/// Having own struct allows us to implement anything without risk of name conflicts
24/// and does not suffer from the impossibility of having `async` in traits.
25struct TokioAdapter<'a, R>(&'a mut R);
26
27impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> {
28    impl_buffered_source!('b, 0, async, await);
29}
30
31////////////////////////////////////////////////////////////////////////////////////////////////////
32
33impl<'r, R> AsyncRead for BinaryStream<'r, R>
34where
35    R: AsyncRead + Unpin,
36{
37    fn poll_read(
38        self: Pin<&mut Self>,
39        cx: &mut Context<'_>,
40        buf: &mut ReadBuf<'_>,
41    ) -> Poll<io::Result<()>> {
42        let start = buf.remaining();
43        let this = self.get_mut();
44        let poll = Pin::new(&mut *this.inner).poll_read(cx, buf);
45
46        // If something was read, update offset
47        if let Poll::Ready(Ok(_)) = poll {
48            let amt = start - buf.remaining();
49            *this.offset += amt as u64;
50        }
51        poll
52    }
53}
54
55impl<'r, R> AsyncBufRead for BinaryStream<'r, R>
56where
57    R: AsyncBufRead + Unpin,
58{
59    #[inline]
60    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
61        Pin::new(&mut *self.get_mut().inner).poll_fill_buf(cx)
62    }
63
64    #[inline]
65    fn consume(self: Pin<&mut Self>, amt: usize) {
66        let this = self.get_mut();
67        this.inner.consume(amt);
68        *this.offset += amt as u64;
69    }
70}
71
72////////////////////////////////////////////////////////////////////////////////////////////////////
73
74impl<R: AsyncBufRead + Unpin> Reader<R> {
75    /// An asynchronous version of [`read_event_into()`]. Reads the next event into
76    /// given buffer.
77    ///
78    /// This is the main entry point for reading XML `Event`s when using an async reader.
79    ///
80    /// See the documentation of [`read_event_into()`] for more information.
81    ///
82    /// # Examples
83    ///
84    /// ```
85    /// # tokio_test::block_on(async {
86    /// # use pretty_assertions::assert_eq;
87    /// use quick_xml::events::Event;
88    /// use quick_xml::reader::Reader;
89    ///
90    /// // This explicitly uses `from_reader("...".as_bytes())` to use a buffered
91    /// // reader instead of relying on the zero-copy optimizations for reading
92    /// // from byte slices, which provides the sync interface anyway.
93    /// let mut reader = Reader::from_reader(r#"
94    ///     <tag1 att1 = "test">
95    ///        <tag2><!--Test comment-->Test</tag2>
96    ///        <tag2>Test 2</tag2>
97    ///     </tag1>
98    /// "#.as_bytes());
99    /// reader.config_mut().trim_text(true);
100    ///
101    /// let mut count = 0;
102    /// let mut buf = Vec::new();
103    /// let mut txt = Vec::new();
104    /// loop {
105    ///     match reader.read_event_into_async(&mut buf).await {
106    ///         Ok(Event::Start(_)) => count += 1,
107    ///         Ok(Event::Text(e)) => txt.push(e.decode().unwrap().into_owned()),
108    ///         Err(e) => panic!("Error at position {}: {:?}", reader.error_position(), e),
109    ///         Ok(Event::Eof) => break,
110    ///         _ => (),
111    ///     }
112    ///     buf.clear();
113    /// }
114    /// assert_eq!(count, 3);
115    /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
116    /// # }) // tokio_test::block_on
117    /// ```
118    ///
119    /// [`read_event_into()`]: Reader::read_event_into
120    pub async fn read_event_into_async<'b>(
121        &mut self,
122        mut buf: &'b mut Vec<u8>,
123    ) -> Result<Event<'b>> {
124        read_event_impl!(
125            self,
126            buf,
127            TokioAdapter(&mut self.reader),
128            read_until_close_async,
129            await
130        )
131    }
132
133    /// An asynchronous version of [`read_to_end_into()`].
134    /// Reads asynchronously until end element is found using provided buffer as
135    /// intermediate storage for events content. This function is supposed to be
136    /// called after you already read a [`Start`] event.
137    ///
138    /// See the documentation of [`read_to_end_into()`] for more information.
139    ///
140    /// # Examples
141    ///
142    /// This example shows, how you can skip XML content after you read the
143    /// start event.
144    ///
145    /// ```
146    /// # tokio_test::block_on(async {
147    /// # use pretty_assertions::assert_eq;
148    /// use quick_xml::events::{BytesStart, Event};
149    /// use quick_xml::reader::Reader;
150    ///
151    /// let mut reader = Reader::from_reader(r#"
152    ///     <outer>
153    ///         <inner>
154    ///             <inner></inner>
155    ///             <inner/>
156    ///             <outer></outer>
157    ///             <outer/>
158    ///         </inner>
159    ///     </outer>
160    /// "#.as_bytes());
161    /// reader.config_mut().trim_text(true);
162    /// let mut buf = Vec::new();
163    ///
164    /// let start = BytesStart::new("outer");
165    /// let end   = start.to_end().into_owned();
166    ///
167    /// // First, we read a start event...
168    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start));
169    ///
170    /// // ...then, we could skip all events to the corresponding end event.
171    /// // This call will correctly handle nested <outer> elements.
172    /// // Note, however, that this method does not handle namespaces.
173    /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
174    ///
175    /// // At the end we should get an Eof event, because we ate the whole XML
176    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof);
177    /// # }) // tokio_test::block_on
178    /// ```
179    ///
180    /// [`read_to_end_into()`]: Self::read_to_end_into
181    /// [`Start`]: Event::Start
182    pub async fn read_to_end_into_async<'n>(
183        &mut self,
184        // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033
185        end: QName<'n>,
186        buf: &mut Vec<u8>,
187    ) -> Result<Span> {
188        Ok(read_to_end!(
189            self,
190            end,
191            buf,
192            read_event_into_async,
193            {
194                buf.clear();
195            },
196            await
197        ))
198    }
199
200    /// An asynchronous version of [`read_text_into()`].
201    /// Reads asynchronously until end element is found using provided buffer as
202    /// intermediate storage for events content. This function is supposed to be
203    /// called after you already read a [`Start`] event.
204    ///
205    /// See the documentation of [`read_text_into()`] for more information.
206    ///
207    /// # Examples
208    ///
209    /// This example shows, how you can read a HTML content from your XML document.
210    ///
211    /// ```
212    /// # tokio_test::block_on(async {
213    /// # use pretty_assertions::assert_eq;
214    /// # use std::borrow::Cow;
215    /// use quick_xml::events::{BytesStart, Event};
216    /// use quick_xml::reader::Reader;
217    ///
218    /// let mut reader = Reader::from_reader("
219    ///     <html>
220    ///         <title>This is a HTML text</title>
221    ///         <p>Usual XML rules does not apply inside it
222    ///         <p>For example, elements not needed to be &quot;closed&quot;
223    ///     </html>
224    /// ".as_bytes());
225    /// reader.config_mut().trim_text(true);
226    ///
227    /// let start = BytesStart::new("html");
228    /// let end   = start.to_end().into_owned();
229    ///
230    /// let mut buf = Vec::new();
231    ///
232    /// // First, we read a start event...
233    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start));
234    /// // ...and disable checking of end names because we expect HTML further...
235    /// reader.config_mut().check_end_names = false;
236    ///
237    /// // ...then, we could read text content until close tag.
238    /// // This call will correctly handle nested <html> elements.
239    /// let text = reader.read_text_into_async(end.name(), &mut buf).await.unwrap();
240    /// let text = text.decode().unwrap();
241    /// assert_eq!(text, r#"
242    ///         <title>This is a HTML text</title>
243    ///         <p>Usual XML rules does not apply inside it
244    ///         <p>For example, elements not needed to be &quot;closed&quot;
245    ///     "#);
246    /// assert!(matches!(text, Cow::Borrowed(_)));
247    ///
248    /// // Now we can enable checks again
249    /// reader.config_mut().check_end_names = true;
250    ///
251    /// // At the end we should get an Eof event, because we ate the whole XML
252    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof);
253    /// # }) // tokio_test::block_on
254    /// ```
255    ///
256    /// [`read_text_into()`]: Self::read_text_into
257    /// [`Start`]: Event::Start
258    pub async fn read_text_into_async<'n, 'b>(
259        &mut self,
260        // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033
261        end: QName<'n>,
262        buf: &'b mut Vec<u8>,
263    ) -> Result<BytesText<'b>> {
264        let start = buf.len();
265        let span = read_to_end!(self, end, buf, read_event_into_async, {}, await);
266
267        let len = span.end - span.start;
268        // SAFETY: `buf` may contain not more than isize::MAX bytes and because it is
269        // not cleared when reading event, length of the returned span should fit into
270        // usize (because otherwise we panic at appending to the buffer before that point)
271        let end = start + len as usize;
272
273        Ok(BytesText::wrap(&buf[start..end], self.decoder()))
274    }
275
276    /// Private function to read until `>` is found. This function expects that
277    /// it was called just after encounter a `<` symbol.
278    async fn read_until_close_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
279        read_until_close!(self, buf, TokioAdapter(&mut self.reader), await)
280    }
281}
282
283////////////////////////////////////////////////////////////////////////////////////////////////////
284
285impl<R: AsyncBufRead + Unpin> NsReader<R> {
286    /// An asynchronous version of [`read_event_into()`]. Reads the next event into
287    /// given buffer.
288    ///
289    /// This method manages namespaces but doesn't resolve them automatically.
290    /// You should call [`resolver().resolve_element()`] if you want to get a namespace.
291    ///
292    /// You also can use [`read_resolved_event_into_async()`] instead if you want
293    /// to resolve namespace as soon as you get an event.
294    ///
295    /// # Examples
296    ///
297    /// ```
298    /// # tokio_test::block_on(async {
299    /// # use pretty_assertions::assert_eq;
300    /// use quick_xml::events::Event;
301    /// use quick_xml::name::{Namespace, ResolveResult::*};
302    /// use quick_xml::reader::NsReader;
303    ///
304    /// let mut reader = NsReader::from_reader(r#"
305    ///     <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
306    ///        <y:tag2><!--Test comment-->Test</y:tag2>
307    ///        <y:tag2>Test 2</y:tag2>
308    ///     </x:tag1>
309    /// "#.as_bytes());
310    /// reader.config_mut().trim_text(true);
311    ///
312    /// let mut count = 0;
313    /// let mut buf = Vec::new();
314    /// let mut txt = Vec::new();
315    /// loop {
316    ///     match reader.read_event_into_async(&mut buf).await.unwrap() {
317    ///         Event::Start(e) => {
318    ///             count += 1;
319    ///             let (ns, local) = reader.resolver().resolve_element(e.name());
320    ///             match local.as_ref() {
321    ///                 b"tag1" => assert_eq!(ns, Bound(Namespace(b"www.xxxx"))),
322    ///                 b"tag2" => assert_eq!(ns, Bound(Namespace(b"www.yyyy"))),
323    ///                 _ => unreachable!(),
324    ///             }
325    ///         }
326    ///         Event::Text(e) => {
327    ///             txt.push(e.decode().unwrap().into_owned())
328    ///         }
329    ///         Event::Eof => break,
330    ///         _ => (),
331    ///     }
332    ///     buf.clear();
333    /// }
334    /// assert_eq!(count, 3);
335    /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
336    /// # }) // tokio_test::block_on
337    /// ```
338    ///
339    /// [`read_event_into()`]: NsReader::read_event_into
340    /// [`resolver().resolve_element()`]: crate::name::NamespaceResolver::resolve_element
341    /// [`read_resolved_event_into_async()`]: Self::read_resolved_event_into_async
342    pub async fn read_event_into_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
343        self.pop();
344        let event = self.reader.read_event_into_async(buf).await;
345        self.process_event(event)
346    }
347
348    /// An asynchronous version of [`read_to_end_into()`].
349    /// Reads asynchronously until end element is found using provided buffer as
350    /// intermediate storage for events content. This function is supposed to be
351    /// called after you already read a [`Start`] event.
352    ///
353    /// See the documentation of [`read_to_end_into()`] for more information.
354    ///
355    /// # Examples
356    ///
357    /// This example shows, how you can skip XML content after you read the
358    /// start event.
359    ///
360    /// ```
361    /// # tokio_test::block_on(async {
362    /// # use pretty_assertions::assert_eq;
363    /// use quick_xml::name::{Namespace, ResolveResult};
364    /// use quick_xml::events::{BytesStart, Event};
365    /// use quick_xml::reader::NsReader;
366    ///
367    /// let mut reader = NsReader::from_reader(r#"
368    ///     <outer xmlns="namespace 1">
369    ///         <inner xmlns="namespace 2">
370    ///             <outer></outer>
371    ///         </inner>
372    ///         <inner>
373    ///             <inner></inner>
374    ///             <inner/>
375    ///             <outer></outer>
376    ///             <p:outer xmlns:p="ns"></p:outer>
377    ///             <outer/>
378    ///         </inner>
379    ///     </outer>
380    /// "#.as_bytes());
381    /// reader.config_mut().trim_text(true);
382    /// let mut buf = Vec::new();
383    ///
384    /// let ns = Namespace(b"namespace 1");
385    /// let start = BytesStart::from_content(r#"outer xmlns="namespace 1""#, 5);
386    /// let end   = start.to_end().into_owned();
387    ///
388    /// // First, we read a start event...
389    /// assert_eq!(
390    ///     reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
391    ///     (ResolveResult::Bound(ns), Event::Start(start))
392    /// );
393    ///
394    /// // ...then, we could skip all events to the corresponding end event.
395    /// // This call will correctly handle nested <outer> elements.
396    /// // Note, however, that this method does not handle namespaces.
397    /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
398    ///
399    /// // At the end we should get an Eof event, because we ate the whole XML
400    /// assert_eq!(
401    ///     reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
402    ///     (ResolveResult::Unbound, Event::Eof)
403    /// );
404    /// # }) // tokio_test::block_on
405    /// ```
406    ///
407    /// [`read_to_end_into()`]: Self::read_to_end_into
408    /// [`Start`]: Event::Start
409    pub async fn read_to_end_into_async<'n>(
410        &mut self,
411        // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033`
412        end: QName<'n>,
413        buf: &mut Vec<u8>,
414    ) -> Result<Span> {
415        // According to the https://www.w3.org/TR/xml11/#dt-etag, end name should
416        // match literally the start name. See `Config::check_end_names` documentation
417        let result = self.reader.read_to_end_into_async(end, buf).await?;
418        // read_to_end_into_async will consume closing tag. Because nobody can access to its
419        // content anymore, we directly pop namespace of the opening tag
420        self.ns_resolver.pop();
421        Ok(result)
422    }
423
424    /// An asynchronous version of [`read_text_into()`].
425    /// Reads asynchronously until end element is found using provided buffer as
426    /// intermediate storage for events content. This function is supposed to be
427    /// called after you already read a [`Start`] event.
428    ///
429    /// See the documentation of [`read_text_into()`] for more information.
430    ///
431    /// # Examples
432    ///
433    /// This example shows, how you can read a HTML content from your XML document.
434    ///
435    /// ```
436    /// # tokio_test::block_on(async {
437    /// # use pretty_assertions::assert_eq;
438    /// # use std::borrow::Cow;
439    /// use quick_xml::events::{BytesStart, Event};
440    /// use quick_xml::reader::NsReader;
441    ///
442    /// let mut reader = NsReader::from_reader("
443    ///     <html>
444    ///         <title>This is a HTML text</title>
445    ///         <p>Usual XML rules does not apply inside it
446    ///         <p>For example, elements not needed to be &quot;closed&quot;
447    ///     </html>
448    /// ".as_bytes());
449    /// reader.config_mut().trim_text(true);
450    ///
451    /// let start = BytesStart::new("html");
452    /// let end   = start.to_end().into_owned();
453    ///
454    /// let mut buf = Vec::new();
455    ///
456    /// // First, we read a start event...
457    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start));
458    /// // ...and disable checking of end names because we expect HTML further...
459    /// reader.config_mut().check_end_names = false;
460    ///
461    /// // ...then, we could read text content until close tag.
462    /// // This call will correctly handle nested <html> elements.
463    /// let text = reader.read_text_into_async(end.name(), &mut buf).await.unwrap();
464    /// let text = text.decode().unwrap();
465    /// assert_eq!(text, r#"
466    ///         <title>This is a HTML text</title>
467    ///         <p>Usual XML rules does not apply inside it
468    ///         <p>For example, elements not needed to be &quot;closed&quot;
469    ///     "#);
470    /// assert!(matches!(text, Cow::Borrowed(_)));
471    ///
472    /// // Now we can enable checks again
473    /// reader.config_mut().check_end_names = true;
474    ///
475    /// // At the end we should get an Eof event, because we ate the whole XML
476    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof);
477    /// # }) // tokio_test::block_on
478    /// ```
479    ///
480    /// [`read_text_into()`]: Self::read_text_into
481    /// [`Start`]: Event::Start
482    pub async fn read_text_into_async<'n, 'b>(
483        &mut self,
484        // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033
485        end: QName<'n>,
486        buf: &'b mut Vec<u8>,
487    ) -> Result<BytesText<'b>> {
488        // According to the https://www.w3.org/TR/xml11/#dt-etag, end name should
489        // match literally the start name. See `Config::check_end_names` documentation
490        let result = self.reader.read_text_into_async(end, buf).await?;
491        // read_text_into_async will consume closing tag. Because nobody can access to its
492        // content anymore, we directly pop namespace of the opening tag
493        self.ns_resolver.pop();
494        Ok(result)
495    }
496
497    /// An asynchronous version of [`read_resolved_event_into()`]. Reads the next
498    /// event into given buffer asynchronously and resolves its namespace (if applicable).
499    ///
500    /// Namespace is resolved only for [`Start`], [`Empty`] and [`End`] events.
501    /// For all other events the concept of namespace is not defined, so
502    /// a [`ResolveResult::Unbound`] is returned.
503    ///
504    /// If you are not interested in namespaces, you can use [`read_event_into_async()`]
505    /// which will not automatically resolve namespaces for you.
506    ///
507    /// # Examples
508    ///
509    /// ```
510    /// # tokio_test::block_on(async {
511    /// # use pretty_assertions::assert_eq;
512    /// use quick_xml::events::Event;
513    /// use quick_xml::name::{Namespace, QName, ResolveResult::*};
514    /// use quick_xml::reader::NsReader;
515    ///
516    /// let mut reader = NsReader::from_reader(r#"
517    ///     <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
518    ///        <y:tag2><!--Test comment-->Test</y:tag2>
519    ///        <y:tag2>Test 2</y:tag2>
520    ///     </x:tag1>
521    /// "#.as_bytes());
522    /// reader.config_mut().trim_text(true);
523    ///
524    /// let mut count = 0;
525    /// let mut buf = Vec::new();
526    /// let mut txt = Vec::new();
527    /// loop {
528    ///     match reader.read_resolved_event_into_async(&mut buf).await.unwrap() {
529    ///         (Bound(Namespace(b"www.xxxx")), Event::Start(e)) => {
530    ///             count += 1;
531    ///             assert_eq!(e.local_name(), QName(b"tag1").into());
532    ///         }
533    ///         (Bound(Namespace(b"www.yyyy")), Event::Start(e)) => {
534    ///             count += 1;
535    ///             assert_eq!(e.local_name(), QName(b"tag2").into());
536    ///         }
537    ///         (_, Event::Start(_)) => unreachable!(),
538    ///
539    ///         (_, Event::Text(e)) => {
540    ///             txt.push(e.decode().unwrap().into_owned())
541    ///         }
542    ///         (_, Event::Eof) => break,
543    ///         _ => (),
544    ///     }
545    ///     buf.clear();
546    /// }
547    /// assert_eq!(count, 3);
548    /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
549    /// # }) // tokio_test::block_on
550    /// ```
551    ///
552    /// [`read_resolved_event_into()`]: NsReader::read_resolved_event_into
553    /// [`Start`]: Event::Start
554    /// [`Empty`]: Event::Empty
555    /// [`End`]: Event::End
556    /// [`read_event_into_async()`]: Self::read_event_into_async
557    pub async fn read_resolved_event_into_async<'ns, 'b>(
558        // Name 'ns lifetime, because otherwise we get an error
559        // "implicit elided lifetime not allowed here" on ResolveResult
560        &'ns mut self,
561        buf: &'b mut Vec<u8>,
562    ) -> Result<(ResolveResult<'ns>, Event<'b>)> {
563        let event = self.read_event_into_async(buf).await?;
564        Ok(self.resolver().resolve_event(event))
565    }
566}
567
568#[cfg(test)]
569mod test {
570    use super::TokioAdapter;
571    use crate::reader::test::check;
572
573    check!(
574        #[tokio::test]
575        read_event_into_async,
576        read_until_close_async,
577        TokioAdapter,
578        1,
579        &mut Vec::new(),
580        async,
581        await
582    );
583
584    #[test]
585    fn test_future_is_send() {
586        // This test should just compile, no actual runtime checks are performed here.
587        use super::*;
588        use tokio::io::BufReader;
589        fn check_send<T: Send>(_: T) {}
590
591        let input = vec![];
592        let mut reading_buf = vec![];
593        let mut reader = Reader::from_reader(BufReader::new(input.as_slice()));
594
595        check_send(reader.read_event_into_async(&mut reading_buf));
596    }
597}