Skip to main content

quick_xml/reader/
async_tokio.rs

1//! This is an implementation of [`Reader`] for reading from a [`AsyncBufRead`]
2//! as underlying byte stream. This reader fully implements async/await so reading
3//! can use non-blocking I/O.
4
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, ReadBuf};
9
10use crate::errors::{Error, IllFormedError, Result, SyntaxError};
11use crate::events::{BytesRef, BytesText, Event};
12use crate::name::{QName, ResolveResult};
13use crate::parser::{ElementParser, Parser, PiParser};
14use crate::reader::buffered_reader::impl_buffered_source;
15use crate::reader::{
16    BangType, BinaryStream, NsReader, ParseState, ReadRefResult, ReadTextResult, Reader, Span,
17};
18use crate::utils::is_whitespace;
19
20/// A struct for read XML asynchronously from an [`AsyncBufRead`].
21///
22/// Having own struct allows us to implement anything without risk of name conflicts
23/// and does not suffer from the impossibility of having `async` in traits.
24struct TokioAdapter<'a, R>(&'a mut R);
25
26impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> {
27    impl_buffered_source!('b, 0, async, await);
28}
29
30////////////////////////////////////////////////////////////////////////////////////////////////////
31
32impl<'r, R> AsyncRead for BinaryStream<'r, R>
33where
34    R: AsyncRead + Unpin,
35{
36    fn poll_read(
37        self: Pin<&mut Self>,
38        cx: &mut Context<'_>,
39        buf: &mut ReadBuf<'_>,
40    ) -> Poll<io::Result<()>> {
41        let start = buf.remaining();
42        let this = self.get_mut();
43        let poll = Pin::new(&mut *this.inner).poll_read(cx, buf);
44
45        // If something was read, update offset
46        if let Poll::Ready(Ok(_)) = poll {
47            let amt = start - buf.remaining();
48            *this.offset += amt as u64;
49        }
50        poll
51    }
52}
53
54impl<'r, R> AsyncBufRead for BinaryStream<'r, R>
55where
56    R: AsyncBufRead + Unpin,
57{
58    #[inline]
59    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
60        Pin::new(&mut *self.get_mut().inner).poll_fill_buf(cx)
61    }
62
63    #[inline]
64    fn consume(self: Pin<&mut Self>, amt: usize) {
65        let this = self.get_mut();
66        this.inner.consume(amt);
67        *this.offset += amt as u64;
68    }
69}
70
71////////////////////////////////////////////////////////////////////////////////////////////////////
72
73impl<R: AsyncBufRead + Unpin> Reader<R> {
74    /// An asynchronous version of [`read_event_into()`]. Reads the next event into
75    /// given buffer.
76    ///
77    /// This is the main entry point for reading XML `Event`s when using an async reader.
78    ///
79    /// See the documentation of [`read_event_into()`] for more information.
80    ///
81    /// # Examples
82    ///
83    /// ```
84    /// # tokio_test::block_on(async {
85    /// # use pretty_assertions::assert_eq;
86    /// use quick_xml::events::Event;
87    /// use quick_xml::reader::Reader;
88    ///
89    /// // This explicitly uses `from_reader("...".as_bytes())` to use a buffered
90    /// // reader instead of relying on the zero-copy optimizations for reading
91    /// // from byte slices, which provides the sync interface anyway.
92    /// let mut reader = Reader::from_reader(r#"
93    ///     <tag1 att1 = "test">
94    ///        <tag2><!--Test comment-->Test</tag2>
95    ///        <tag2>Test 2</tag2>
96    ///     </tag1>
97    /// "#.as_bytes());
98    /// reader.config_mut().trim_text(true);
99    ///
100    /// let mut count = 0;
101    /// let mut buf = Vec::new();
102    /// let mut txt = Vec::new();
103    /// loop {
104    ///     match reader.read_event_into_async(&mut buf).await {
105    ///         Ok(Event::Start(_)) => count += 1,
106    ///         Ok(Event::Text(e)) => txt.push(e.decode().unwrap().into_owned()),
107    ///         Err(e) => panic!("Error at position {}: {:?}", reader.error_position(), e),
108    ///         Ok(Event::Eof) => break,
109    ///         _ => (),
110    ///     }
111    ///     buf.clear();
112    /// }
113    /// assert_eq!(count, 3);
114    /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
115    /// # }) // tokio_test::block_on
116    /// ```
117    ///
118    /// [`read_event_into()`]: Reader::read_event_into
119    pub async fn read_event_into_async<'b>(
120        &mut self,
121        mut buf: &'b mut Vec<u8>,
122    ) -> Result<Event<'b>> {
123        read_event_impl!(
124            self,
125            buf,
126            TokioAdapter(&mut self.reader),
127            read_until_close_async,
128            await
129        )
130    }
131
132    /// An asynchronous version of [`read_to_end_into()`].
133    /// Reads asynchronously until end element is found using provided buffer as
134    /// intermediate storage for events content. This function is supposed to be
135    /// called after you already read a [`Start`] event.
136    ///
137    /// See the documentation of [`read_to_end_into()`] for more information.
138    ///
139    /// # Examples
140    ///
141    /// This example shows, how you can skip XML content after you read the
142    /// start event.
143    ///
144    /// ```
145    /// # tokio_test::block_on(async {
146    /// # use pretty_assertions::assert_eq;
147    /// use quick_xml::events::{BytesStart, Event};
148    /// use quick_xml::reader::Reader;
149    ///
150    /// let mut reader = Reader::from_reader(r#"
151    ///     <outer>
152    ///         <inner>
153    ///             <inner></inner>
154    ///             <inner/>
155    ///             <outer></outer>
156    ///             <outer/>
157    ///         </inner>
158    ///     </outer>
159    /// "#.as_bytes());
160    /// reader.config_mut().trim_text(true);
161    /// let mut buf = Vec::new();
162    ///
163    /// let start = BytesStart::new("outer");
164    /// let end   = start.to_end().into_owned();
165    ///
166    /// // First, we read a start event...
167    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start));
168    ///
169    /// // ...then, we could skip all events to the corresponding end event.
170    /// // This call will correctly handle nested <outer> elements.
171    /// // Note, however, that this method does not handle namespaces.
172    /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
173    ///
174    /// // At the end we should get an Eof event, because we ate the whole XML
175    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof);
176    /// # }) // tokio_test::block_on
177    /// ```
178    ///
179    /// [`read_to_end_into()`]: Self::read_to_end_into
180    /// [`Start`]: Event::Start
181    pub async fn read_to_end_into_async<'n>(
182        &mut self,
183        // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033
184        end: QName<'n>,
185        buf: &mut Vec<u8>,
186    ) -> Result<Span> {
187        Ok(read_to_end!(
188            self,
189            end,
190            buf,
191            read_event_into_async,
192            {
193                buf.clear();
194            },
195            await
196        ))
197    }
198
199    /// An asynchronous version of [`read_text_into()`].
200    /// Reads asynchronously until end element is found using provided buffer as
201    /// intermediate storage for events content. This function is supposed to be
202    /// called after you already read a [`Start`] event.
203    ///
204    /// See the documentation of [`read_text_into()`] for more information.
205    ///
206    /// # Examples
207    ///
208    /// This example shows, how you can read a HTML content from your XML document.
209    ///
210    /// ```
211    /// # tokio_test::block_on(async {
212    /// # use pretty_assertions::assert_eq;
213    /// # use std::borrow::Cow;
214    /// use quick_xml::events::{BytesStart, Event};
215    /// use quick_xml::reader::Reader;
216    ///
217    /// let mut reader = Reader::from_reader("
218    ///     <html>
219    ///         <title>This is a HTML text</title>
220    ///         <p>Usual XML rules does not apply inside it
221    ///         <p>For example, elements not needed to be &quot;closed&quot;
222    ///     </html>
223    /// ".as_bytes());
224    /// reader.config_mut().trim_text(true);
225    ///
226    /// let start = BytesStart::new("html");
227    /// let end   = start.to_end().into_owned();
228    ///
229    /// let mut buf = Vec::new();
230    ///
231    /// // First, we read a start event...
232    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start));
233    /// // ...and disable checking of end names because we expect HTML further...
234    /// reader.config_mut().check_end_names = false;
235    ///
236    /// // ...then, we could read text content until close tag.
237    /// // This call will correctly handle nested <html> elements.
238    /// let text = reader.read_text_into_async(end.name(), &mut buf).await.unwrap();
239    /// let text = text.decode().unwrap();
240    /// assert_eq!(text, r#"
241    ///         <title>This is a HTML text</title>
242    ///         <p>Usual XML rules does not apply inside it
243    ///         <p>For example, elements not needed to be &quot;closed&quot;
244    ///     "#);
245    /// assert!(matches!(text, Cow::Borrowed(_)));
246    ///
247    /// // Now we can enable checks again
248    /// reader.config_mut().check_end_names = true;
249    ///
250    /// // At the end we should get an Eof event, because we ate the whole XML
251    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof);
252    /// # }) // tokio_test::block_on
253    /// ```
254    ///
255    /// [`read_text_into()`]: Self::read_text_into
256    /// [`Start`]: Event::Start
257    pub async fn read_text_into_async<'n, 'b>(
258        &mut self,
259        // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033
260        end: QName<'n>,
261        buf: &'b mut Vec<u8>,
262    ) -> Result<BytesText<'b>> {
263        let start = buf.len();
264        let span = read_to_end!(self, end, buf, read_event_into_async, {}, await);
265
266        let len = span.end - span.start;
267        // SAFETY: `buf` may contain not more than isize::MAX bytes and because it is
268        // not cleared when reading event, length of the returned span should fit into
269        // usize (because otherwise we panic at appending to the buffer before that point)
270        let end = start + len as usize;
271
272        Ok(BytesText::wrap(&buf[start..end], self.decoder()))
273    }
274
275    /// Private function to read until `>` is found. This function expects that
276    /// it was called just after encounter a `<` symbol.
277    async fn read_until_close_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
278        read_until_close!(self, buf, TokioAdapter(&mut self.reader), await)
279    }
280}
281
282////////////////////////////////////////////////////////////////////////////////////////////////////
283
284impl<R: AsyncBufRead + Unpin> NsReader<R> {
285    /// An asynchronous version of [`read_event_into()`]. Reads the next event into
286    /// given buffer.
287    ///
288    /// This method manages namespaces but doesn't resolve them automatically.
289    /// You should call [`resolver().resolve_element()`] if you want to get a namespace.
290    ///
291    /// You also can use [`read_resolved_event_into_async()`] instead if you want
292    /// to resolve namespace as soon as you get an event.
293    ///
294    /// # Examples
295    ///
296    /// ```
297    /// # tokio_test::block_on(async {
298    /// # use pretty_assertions::assert_eq;
299    /// use quick_xml::events::Event;
300    /// use quick_xml::name::{Namespace, ResolveResult::*};
301    /// use quick_xml::reader::NsReader;
302    ///
303    /// let mut reader = NsReader::from_reader(r#"
304    ///     <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
305    ///        <y:tag2><!--Test comment-->Test</y:tag2>
306    ///        <y:tag2>Test 2</y:tag2>
307    ///     </x:tag1>
308    /// "#.as_bytes());
309    /// reader.config_mut().trim_text(true);
310    ///
311    /// let mut count = 0;
312    /// let mut buf = Vec::new();
313    /// let mut txt = Vec::new();
314    /// loop {
315    ///     match reader.read_event_into_async(&mut buf).await.unwrap() {
316    ///         Event::Start(e) => {
317    ///             count += 1;
318    ///             let (ns, local) = reader.resolver().resolve_element(e.name());
319    ///             match local.as_ref() {
320    ///                 b"tag1" => assert_eq!(ns, Bound(Namespace(b"www.xxxx"))),
321    ///                 b"tag2" => assert_eq!(ns, Bound(Namespace(b"www.yyyy"))),
322    ///                 _ => unreachable!(),
323    ///             }
324    ///         }
325    ///         Event::Text(e) => {
326    ///             txt.push(e.decode().unwrap().into_owned())
327    ///         }
328    ///         Event::Eof => break,
329    ///         _ => (),
330    ///     }
331    ///     buf.clear();
332    /// }
333    /// assert_eq!(count, 3);
334    /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
335    /// # }) // tokio_test::block_on
336    /// ```
337    ///
338    /// [`read_event_into()`]: NsReader::read_event_into
339    /// [`resolver().resolve_element()`]: crate::name::NamespaceResolver::resolve_element
340    /// [`read_resolved_event_into_async()`]: Self::read_resolved_event_into_async
341    pub async fn read_event_into_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
342        self.pop();
343        let event = self.reader.read_event_into_async(buf).await;
344        self.process_event(event)
345    }
346
347    /// An asynchronous version of [`read_to_end_into()`].
348    /// Reads asynchronously until end element is found using provided buffer as
349    /// intermediate storage for events content. This function is supposed to be
350    /// called after you already read a [`Start`] event.
351    ///
352    /// See the documentation of [`read_to_end_into()`] for more information.
353    ///
354    /// # Examples
355    ///
356    /// This example shows, how you can skip XML content after you read the
357    /// start event.
358    ///
359    /// ```
360    /// # tokio_test::block_on(async {
361    /// # use pretty_assertions::assert_eq;
362    /// use quick_xml::name::{Namespace, ResolveResult};
363    /// use quick_xml::events::{BytesStart, Event};
364    /// use quick_xml::reader::NsReader;
365    ///
366    /// let mut reader = NsReader::from_reader(r#"
367    ///     <outer xmlns="namespace 1">
368    ///         <inner xmlns="namespace 2">
369    ///             <outer></outer>
370    ///         </inner>
371    ///         <inner>
372    ///             <inner></inner>
373    ///             <inner/>
374    ///             <outer></outer>
375    ///             <p:outer xmlns:p="ns"></p:outer>
376    ///             <outer/>
377    ///         </inner>
378    ///     </outer>
379    /// "#.as_bytes());
380    /// reader.config_mut().trim_text(true);
381    /// let mut buf = Vec::new();
382    ///
383    /// let ns = Namespace(b"namespace 1");
384    /// let start = BytesStart::from_content(r#"outer xmlns="namespace 1""#, 5);
385    /// let end   = start.to_end().into_owned();
386    ///
387    /// // First, we read a start event...
388    /// assert_eq!(
389    ///     reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
390    ///     (ResolveResult::Bound(ns), Event::Start(start))
391    /// );
392    ///
393    /// // ...then, we could skip all events to the corresponding end event.
394    /// // This call will correctly handle nested <outer> elements.
395    /// // Note, however, that this method does not handle namespaces.
396    /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
397    ///
398    /// // At the end we should get an Eof event, because we ate the whole XML
399    /// assert_eq!(
400    ///     reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
401    ///     (ResolveResult::Unbound, Event::Eof)
402    /// );
403    /// # }) // tokio_test::block_on
404    /// ```
405    ///
406    /// [`read_to_end_into()`]: Self::read_to_end_into
407    /// [`Start`]: Event::Start
408    pub async fn read_to_end_into_async<'n>(
409        &mut self,
410        // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033`
411        end: QName<'n>,
412        buf: &mut Vec<u8>,
413    ) -> Result<Span> {
414        // According to the https://www.w3.org/TR/xml11/#dt-etag, end name should
415        // match literally the start name. See `Config::check_end_names` documentation
416        let result = self.reader.read_to_end_into_async(end, buf).await?;
417        // read_to_end_into_async will consume closing tag. Because nobody can access to its
418        // content anymore, we directly pop namespace of the opening tag
419        self.ns_resolver.pop();
420        Ok(result)
421    }
422
423    /// An asynchronous version of [`read_text_into()`].
424    /// Reads asynchronously until end element is found using provided buffer as
425    /// intermediate storage for events content. This function is supposed to be
426    /// called after you already read a [`Start`] event.
427    ///
428    /// See the documentation of [`read_text_into()`] for more information.
429    ///
430    /// # Examples
431    ///
432    /// This example shows, how you can read a HTML content from your XML document.
433    ///
434    /// ```
435    /// # tokio_test::block_on(async {
436    /// # use pretty_assertions::assert_eq;
437    /// # use std::borrow::Cow;
438    /// use quick_xml::events::{BytesStart, Event};
439    /// use quick_xml::reader::NsReader;
440    ///
441    /// let mut reader = NsReader::from_reader("
442    ///     <html>
443    ///         <title>This is a HTML text</title>
444    ///         <p>Usual XML rules does not apply inside it
445    ///         <p>For example, elements not needed to be &quot;closed&quot;
446    ///     </html>
447    /// ".as_bytes());
448    /// reader.config_mut().trim_text(true);
449    ///
450    /// let start = BytesStart::new("html");
451    /// let end   = start.to_end().into_owned();
452    ///
453    /// let mut buf = Vec::new();
454    ///
455    /// // First, we read a start event...
456    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start));
457    /// // ...and disable checking of end names because we expect HTML further...
458    /// reader.config_mut().check_end_names = false;
459    ///
460    /// // ...then, we could read text content until close tag.
461    /// // This call will correctly handle nested <html> elements.
462    /// let text = reader.read_text_into_async(end.name(), &mut buf).await.unwrap();
463    /// let text = text.decode().unwrap();
464    /// assert_eq!(text, r#"
465    ///         <title>This is a HTML text</title>
466    ///         <p>Usual XML rules does not apply inside it
467    ///         <p>For example, elements not needed to be &quot;closed&quot;
468    ///     "#);
469    /// assert!(matches!(text, Cow::Borrowed(_)));
470    ///
471    /// // Now we can enable checks again
472    /// reader.config_mut().check_end_names = true;
473    ///
474    /// // At the end we should get an Eof event, because we ate the whole XML
475    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof);
476    /// # }) // tokio_test::block_on
477    /// ```
478    ///
479    /// [`read_text_into()`]: Self::read_text_into
480    /// [`Start`]: Event::Start
481    pub async fn read_text_into_async<'n, 'b>(
482        &mut self,
483        // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033
484        end: QName<'n>,
485        buf: &'b mut Vec<u8>,
486    ) -> Result<BytesText<'b>> {
487        // According to the https://www.w3.org/TR/xml11/#dt-etag, end name should
488        // match literally the start name. See `Config::check_end_names` documentation
489        let result = self.reader.read_text_into_async(end, buf).await?;
490        // read_text_into_async will consume closing tag. Because nobody can access to its
491        // content anymore, we directly pop namespace of the opening tag
492        self.ns_resolver.pop();
493        Ok(result)
494    }
495
496    /// An asynchronous version of [`read_resolved_event_into()`]. Reads the next
497    /// event into given buffer asynchronously and resolves its namespace (if applicable).
498    ///
499    /// Namespace is resolved only for [`Start`], [`Empty`] and [`End`] events.
500    /// For all other events the concept of namespace is not defined, so
501    /// a [`ResolveResult::Unbound`] is returned.
502    ///
503    /// If you are not interested in namespaces, you can use [`read_event_into_async()`]
504    /// which will not automatically resolve namespaces for you.
505    ///
506    /// # Examples
507    ///
508    /// ```
509    /// # tokio_test::block_on(async {
510    /// # use pretty_assertions::assert_eq;
511    /// use quick_xml::events::Event;
512    /// use quick_xml::name::{Namespace, QName, ResolveResult::*};
513    /// use quick_xml::reader::NsReader;
514    ///
515    /// let mut reader = NsReader::from_reader(r#"
516    ///     <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
517    ///        <y:tag2><!--Test comment-->Test</y:tag2>
518    ///        <y:tag2>Test 2</y:tag2>
519    ///     </x:tag1>
520    /// "#.as_bytes());
521    /// reader.config_mut().trim_text(true);
522    ///
523    /// let mut count = 0;
524    /// let mut buf = Vec::new();
525    /// let mut txt = Vec::new();
526    /// loop {
527    ///     match reader.read_resolved_event_into_async(&mut buf).await.unwrap() {
528    ///         (Bound(Namespace(b"www.xxxx")), Event::Start(e)) => {
529    ///             count += 1;
530    ///             assert_eq!(e.local_name(), QName(b"tag1").into());
531    ///         }
532    ///         (Bound(Namespace(b"www.yyyy")), Event::Start(e)) => {
533    ///             count += 1;
534    ///             assert_eq!(e.local_name(), QName(b"tag2").into());
535    ///         }
536    ///         (_, Event::Start(_)) => unreachable!(),
537    ///
538    ///         (_, Event::Text(e)) => {
539    ///             txt.push(e.decode().unwrap().into_owned())
540    ///         }
541    ///         (_, Event::Eof) => break,
542    ///         _ => (),
543    ///     }
544    ///     buf.clear();
545    /// }
546    /// assert_eq!(count, 3);
547    /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
548    /// # }) // tokio_test::block_on
549    /// ```
550    ///
551    /// [`read_resolved_event_into()`]: NsReader::read_resolved_event_into
552    /// [`Start`]: Event::Start
553    /// [`Empty`]: Event::Empty
554    /// [`End`]: Event::End
555    /// [`read_event_into_async()`]: Self::read_event_into_async
556    pub async fn read_resolved_event_into_async<'ns, 'b>(
557        // Name 'ns lifetime, because otherwise we get an error
558        // "implicit elided lifetime not allowed here" on ResolveResult
559        &'ns mut self,
560        buf: &'b mut Vec<u8>,
561    ) -> Result<(ResolveResult<'ns>, Event<'b>)> {
562        let event = self.read_event_into_async(buf).await?;
563        Ok(self.resolver().resolve_event(event))
564    }
565}
566
567#[cfg(test)]
568mod test {
569    use super::TokioAdapter;
570    use crate::reader::test::check;
571
572    check!(
573        #[tokio::test]
574        read_event_into_async,
575        read_until_close_async,
576        TokioAdapter,
577        1,
578        &mut Vec::new(),
579        async,
580        await
581    );
582
583    #[test]
584    fn test_future_is_send() {
585        // This test should just compile, no actual runtime checks are performed here.
586        use super::*;
587        use tokio::io::BufReader;
588        fn check_send<T: Send>(_: T) {}
589
590        let input = vec![];
591        let mut reading_buf = vec![];
592        let mut reader = Reader::from_reader(BufReader::new(input.as_slice()));
593
594        check_send(reader.read_event_into_async(&mut reading_buf));
595    }
596}