quick_xml/reader/
async_tokio.rs

1//! This is an implementation of [`Reader`] for reading from a [`AsyncBufRead`]
2//! as underlying byte stream. This reader fully implements async/await so reading
3//! can use non-blocking I/O.
4
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, ReadBuf};
9
10use crate::errors::{Error, IllFormedError, Result, SyntaxError};
11use crate::events::{BytesRef, Event};
12use crate::name::{QName, ResolveResult};
13use crate::parser::{ElementParser, Parser, PiParser};
14use crate::reader::buffered_reader::impl_buffered_source;
15use crate::reader::{
16    BangType, BinaryStream, NsReader, ParseState, ReadRefResult, ReadTextResult, Reader, Span,
17};
18use crate::utils::is_whitespace;
19
20/// A struct for read XML asynchronously from an [`AsyncBufRead`].
21///
22/// Having own struct allows us to implement anything without risk of name conflicts
23/// and does not suffer from the impossibility of having `async` in traits.
24struct TokioAdapter<'a, R>(&'a mut R);
25
26impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> {
27    impl_buffered_source!('b, 0, async, await);
28}
29
30////////////////////////////////////////////////////////////////////////////////////////////////////
31
32impl<'r, R> AsyncRead for BinaryStream<'r, R>
33where
34    R: AsyncRead + Unpin,
35{
36    fn poll_read(
37        self: Pin<&mut Self>,
38        cx: &mut Context<'_>,
39        buf: &mut ReadBuf<'_>,
40    ) -> Poll<io::Result<()>> {
41        let start = buf.remaining();
42        let this = self.get_mut();
43        let poll = Pin::new(&mut *this.inner).poll_read(cx, buf);
44
45        // If something was read, update offset
46        if let Poll::Ready(Ok(_)) = poll {
47            let amt = start - buf.remaining();
48            *this.offset += amt as u64;
49        }
50        poll
51    }
52}
53
54impl<'r, R> AsyncBufRead for BinaryStream<'r, R>
55where
56    R: AsyncBufRead + Unpin,
57{
58    #[inline]
59    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
60        Pin::new(&mut *self.get_mut().inner).poll_fill_buf(cx)
61    }
62
63    #[inline]
64    fn consume(self: Pin<&mut Self>, amt: usize) {
65        let this = self.get_mut();
66        this.inner.consume(amt);
67        *this.offset += amt as u64;
68    }
69}
70
71////////////////////////////////////////////////////////////////////////////////////////////////////
72
73impl<R: AsyncBufRead + Unpin> Reader<R> {
74    /// An asynchronous version of [`read_event_into()`]. Reads the next event into
75    /// given buffer.
76    ///
77    /// This is the main entry point for reading XML `Event`s when using an async reader.
78    ///
79    /// See the documentation of [`read_event_into()`] for more information.
80    ///
81    /// # Examples
82    ///
83    /// ```
84    /// # tokio_test::block_on(async {
85    /// # use pretty_assertions::assert_eq;
86    /// use quick_xml::events::Event;
87    /// use quick_xml::reader::Reader;
88    ///
89    /// // This explicitly uses `from_reader("...".as_bytes())` to use a buffered
90    /// // reader instead of relying on the zero-copy optimizations for reading
91    /// // from byte slices, which provides the sync interface anyway.
92    /// let mut reader = Reader::from_reader(r#"
93    ///     <tag1 att1 = "test">
94    ///        <tag2><!--Test comment-->Test</tag2>
95    ///        <tag2>Test 2</tag2>
96    ///     </tag1>
97    /// "#.as_bytes());
98    /// reader.config_mut().trim_text(true);
99    ///
100    /// let mut count = 0;
101    /// let mut buf = Vec::new();
102    /// let mut txt = Vec::new();
103    /// loop {
104    ///     match reader.read_event_into_async(&mut buf).await {
105    ///         Ok(Event::Start(_)) => count += 1,
106    ///         Ok(Event::Text(e)) => txt.push(e.decode().unwrap().into_owned()),
107    ///         Err(e) => panic!("Error at position {}: {:?}", reader.error_position(), e),
108    ///         Ok(Event::Eof) => break,
109    ///         _ => (),
110    ///     }
111    ///     buf.clear();
112    /// }
113    /// assert_eq!(count, 3);
114    /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
115    /// # }) // tokio_test::block_on
116    /// ```
117    ///
118    /// [`read_event_into()`]: Reader::read_event_into
119    pub async fn read_event_into_async<'b>(
120        &mut self,
121        mut buf: &'b mut Vec<u8>,
122    ) -> Result<Event<'b>> {
123        read_event_impl!(
124            self,
125            buf,
126            TokioAdapter(&mut self.reader),
127            read_until_close_async,
128            await
129        )
130    }
131
132    /// An asynchronous version of [`read_to_end_into()`].
133    /// Reads asynchronously until end element is found using provided buffer as
134    /// intermediate storage for events content. This function is supposed to be
135    /// called after you already read a [`Start`] event.
136    ///
137    /// See the documentation of [`read_to_end_into()`] for more information.
138    ///
139    /// # Examples
140    ///
141    /// This example shows, how you can skip XML content after you read the
142    /// start event.
143    ///
144    /// ```
145    /// # tokio_test::block_on(async {
146    /// # use pretty_assertions::assert_eq;
147    /// use quick_xml::events::{BytesStart, Event};
148    /// use quick_xml::reader::Reader;
149    ///
150    /// let mut reader = Reader::from_reader(r#"
151    ///     <outer>
152    ///         <inner>
153    ///             <inner></inner>
154    ///             <inner/>
155    ///             <outer></outer>
156    ///             <outer/>
157    ///         </inner>
158    ///     </outer>
159    /// "#.as_bytes());
160    /// reader.config_mut().trim_text(true);
161    /// let mut buf = Vec::new();
162    ///
163    /// let start = BytesStart::new("outer");
164    /// let end   = start.to_end().into_owned();
165    ///
166    /// // First, we read a start event...
167    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start));
168    ///
169    /// // ...then, we could skip all events to the corresponding end event.
170    /// // This call will correctly handle nested <outer> elements.
171    /// // Note, however, that this method does not handle namespaces.
172    /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
173    ///
174    /// // At the end we should get an Eof event, because we ate the whole XML
175    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof);
176    /// # }) // tokio_test::block_on
177    /// ```
178    ///
179    /// [`read_to_end_into()`]: Self::read_to_end_into
180    /// [`Start`]: Event::Start
181    pub async fn read_to_end_into_async<'n>(
182        &mut self,
183        // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033`
184        end: QName<'n>,
185        buf: &mut Vec<u8>,
186    ) -> Result<Span> {
187        Ok(read_to_end!(
188            self,
189            end,
190            buf,
191            read_event_into_async,
192            {
193                buf.clear();
194            },
195            await
196        ))
197    }
198
199    /// Private function to read until `>` is found. This function expects that
200    /// it was called just after encounter a `<` symbol.
201    async fn read_until_close_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
202        read_until_close!(self, buf, TokioAdapter(&mut self.reader), await)
203    }
204}
205
206////////////////////////////////////////////////////////////////////////////////////////////////////
207
208impl<R: AsyncBufRead + Unpin> NsReader<R> {
209    /// An asynchronous version of [`read_event_into()`]. Reads the next event into
210    /// given buffer.
211    ///
212    /// This method manages namespaces but doesn't resolve them automatically.
213    /// You should call [`resolve_element()`] if you want to get a namespace.
214    ///
215    /// You also can use [`read_resolved_event_into_async()`] instead if you want
216    /// to resolve namespace as soon as you get an event.
217    ///
218    /// # Examples
219    ///
220    /// ```
221    /// # tokio_test::block_on(async {
222    /// # use pretty_assertions::assert_eq;
223    /// use quick_xml::events::Event;
224    /// use quick_xml::name::{Namespace, ResolveResult::*};
225    /// use quick_xml::reader::NsReader;
226    ///
227    /// let mut reader = NsReader::from_reader(r#"
228    ///     <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
229    ///        <y:tag2><!--Test comment-->Test</y:tag2>
230    ///        <y:tag2>Test 2</y:tag2>
231    ///     </x:tag1>
232    /// "#.as_bytes());
233    /// reader.config_mut().trim_text(true);
234    ///
235    /// let mut count = 0;
236    /// let mut buf = Vec::new();
237    /// let mut txt = Vec::new();
238    /// loop {
239    ///     match reader.read_event_into_async(&mut buf).await.unwrap() {
240    ///         Event::Start(e) => {
241    ///             count += 1;
242    ///             let (ns, local) = reader.resolve_element(e.name());
243    ///             match local.as_ref() {
244    ///                 b"tag1" => assert_eq!(ns, Bound(Namespace(b"www.xxxx"))),
245    ///                 b"tag2" => assert_eq!(ns, Bound(Namespace(b"www.yyyy"))),
246    ///                 _ => unreachable!(),
247    ///             }
248    ///         }
249    ///         Event::Text(e) => {
250    ///             txt.push(e.decode().unwrap().into_owned())
251    ///         }
252    ///         Event::Eof => break,
253    ///         _ => (),
254    ///     }
255    ///     buf.clear();
256    /// }
257    /// assert_eq!(count, 3);
258    /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
259    /// # }) // tokio_test::block_on
260    /// ```
261    ///
262    /// [`read_event_into()`]: NsReader::read_event_into
263    /// [`resolve_element()`]: Self::resolve_element
264    /// [`read_resolved_event_into_async()`]: Self::read_resolved_event_into_async
265    pub async fn read_event_into_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
266        self.pop();
267        let event = self.reader.read_event_into_async(buf).await;
268        self.process_event(event)
269    }
270
271    /// An asynchronous version of [`read_to_end_into()`].
272    /// Reads asynchronously until end element is found using provided buffer as
273    /// intermediate storage for events content. This function is supposed to be
274    /// called after you already read a [`Start`] event.
275    ///
276    /// See the documentation of [`read_to_end_into()`] for more information.
277    ///
278    /// # Examples
279    ///
280    /// This example shows, how you can skip XML content after you read the
281    /// start event.
282    ///
283    /// ```
284    /// # tokio_test::block_on(async {
285    /// # use pretty_assertions::assert_eq;
286    /// use quick_xml::name::{Namespace, ResolveResult};
287    /// use quick_xml::events::{BytesStart, Event};
288    /// use quick_xml::reader::NsReader;
289    ///
290    /// let mut reader = NsReader::from_reader(r#"
291    ///     <outer xmlns="namespace 1">
292    ///         <inner xmlns="namespace 2">
293    ///             <outer></outer>
294    ///         </inner>
295    ///         <inner>
296    ///             <inner></inner>
297    ///             <inner/>
298    ///             <outer></outer>
299    ///             <p:outer xmlns:p="ns"></p:outer>
300    ///             <outer/>
301    ///         </inner>
302    ///     </outer>
303    /// "#.as_bytes());
304    /// reader.config_mut().trim_text(true);
305    /// let mut buf = Vec::new();
306    ///
307    /// let ns = Namespace(b"namespace 1");
308    /// let start = BytesStart::from_content(r#"outer xmlns="namespace 1""#, 5);
309    /// let end   = start.to_end().into_owned();
310    ///
311    /// // First, we read a start event...
312    /// assert_eq!(
313    ///     reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
314    ///     (ResolveResult::Bound(ns), Event::Start(start))
315    /// );
316    ///
317    /// // ...then, we could skip all events to the corresponding end event.
318    /// // This call will correctly handle nested <outer> elements.
319    /// // Note, however, that this method does not handle namespaces.
320    /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
321    ///
322    /// // At the end we should get an Eof event, because we ate the whole XML
323    /// assert_eq!(
324    ///     reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
325    ///     (ResolveResult::Unbound, Event::Eof)
326    /// );
327    /// # }) // tokio_test::block_on
328    /// ```
329    ///
330    /// [`read_to_end_into()`]: Self::read_to_end_into
331    /// [`Start`]: Event::Start
332    pub async fn read_to_end_into_async<'n>(
333        &mut self,
334        // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033`
335        end: QName<'n>,
336        buf: &mut Vec<u8>,
337    ) -> Result<Span> {
338        // According to the https://www.w3.org/TR/xml11/#dt-etag, end name should
339        // match literally the start name. See `Config::check_end_names` documentation
340        self.reader.read_to_end_into_async(end, buf).await
341    }
342
343    /// An asynchronous version of [`read_resolved_event_into()`]. Reads the next
344    /// event into given buffer asynchronously and resolves its namespace (if applicable).
345    ///
346    /// Namespace is resolved only for [`Start`], [`Empty`] and [`End`] events.
347    /// For all other events the concept of namespace is not defined, so
348    /// a [`ResolveResult::Unbound`] is returned.
349    ///
350    /// If you are not interested in namespaces, you can use [`read_event_into_async()`]
351    /// which will not automatically resolve namespaces for you.
352    ///
353    /// # Examples
354    ///
355    /// ```
356    /// # tokio_test::block_on(async {
357    /// # use pretty_assertions::assert_eq;
358    /// use quick_xml::events::Event;
359    /// use quick_xml::name::{Namespace, QName, ResolveResult::*};
360    /// use quick_xml::reader::NsReader;
361    ///
362    /// let mut reader = NsReader::from_reader(r#"
363    ///     <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
364    ///        <y:tag2><!--Test comment-->Test</y:tag2>
365    ///        <y:tag2>Test 2</y:tag2>
366    ///     </x:tag1>
367    /// "#.as_bytes());
368    /// reader.config_mut().trim_text(true);
369    ///
370    /// let mut count = 0;
371    /// let mut buf = Vec::new();
372    /// let mut txt = Vec::new();
373    /// loop {
374    ///     match reader.read_resolved_event_into_async(&mut buf).await.unwrap() {
375    ///         (Bound(Namespace(b"www.xxxx")), Event::Start(e)) => {
376    ///             count += 1;
377    ///             assert_eq!(e.local_name(), QName(b"tag1").into());
378    ///         }
379    ///         (Bound(Namespace(b"www.yyyy")), Event::Start(e)) => {
380    ///             count += 1;
381    ///             assert_eq!(e.local_name(), QName(b"tag2").into());
382    ///         }
383    ///         (_, Event::Start(_)) => unreachable!(),
384    ///
385    ///         (_, Event::Text(e)) => {
386    ///             txt.push(e.decode().unwrap().into_owned())
387    ///         }
388    ///         (_, Event::Eof) => break,
389    ///         _ => (),
390    ///     }
391    ///     buf.clear();
392    /// }
393    /// assert_eq!(count, 3);
394    /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
395    /// # }) // tokio_test::block_on
396    /// ```
397    ///
398    /// [`read_resolved_event_into()`]: NsReader::read_resolved_event_into
399    /// [`Start`]: Event::Start
400    /// [`Empty`]: Event::Empty
401    /// [`End`]: Event::End
402    /// [`read_event_into_async()`]: Self::read_event_into_async
403    pub async fn read_resolved_event_into_async<'ns, 'b>(
404        // Name 'ns lifetime, because otherwise we get an error
405        // "implicit elided lifetime not allowed here" on ResolveResult
406        &'ns mut self,
407        buf: &'b mut Vec<u8>,
408    ) -> Result<(ResolveResult<'ns>, Event<'b>)> {
409        let event = self.read_event_into_async(buf).await;
410        self.resolve_event(event)
411    }
412}
413
414#[cfg(test)]
415mod test {
416    use super::TokioAdapter;
417    use crate::reader::test::check;
418
419    check!(
420        #[tokio::test]
421        read_event_into_async,
422        read_until_close_async,
423        TokioAdapter,
424        &mut Vec::new(),
425        async,
426        await
427    );
428
429    #[test]
430    fn test_future_is_send() {
431        // This test should just compile, no actual runtime checks are performed here.
432        use super::*;
433        use tokio::io::BufReader;
434        fn check_send<T: Send>(_: T) {}
435
436        let input = vec![];
437        let mut reading_buf = vec![];
438        let mut reader = Reader::from_reader(BufReader::new(input.as_slice()));
439
440        check_send(reader.read_event_into_async(&mut reading_buf));
441    }
442}