Skip to main content

mongodb/
cursor.rs

1pub(crate) mod common;
2pub mod raw_batch;
3pub(crate) mod session;
4mod stream;
5#[cfg(feature = "sync")]
6pub(crate) mod sync;
7
8use std::task::Poll;
9
10use derive_where::derive_where;
11use futures_core::Stream as AsyncStream;
12use futures_util::stream::StreamExt;
13use serde::{de::DeserializeOwned, Deserialize};
14
15use crate::{
16    bson::RawDocument,
17    cmap::conn::PinnedConnectionHandle,
18    error::Result,
19    Client,
20    ClientSession,
21};
22
23/// A [`Cursor`] streams the result of a query. When a query is made, the returned [`Cursor`] will
24/// contain the first batch of results from the server; the individual results will then be returned
25/// as the [`Cursor`] is iterated. When the batch is exhausted and if there are more results, the
26/// [`Cursor`] will fetch the next batch of documents, and so forth until the results are exhausted.
27/// Note that because of this batching, additional network I/O may occur on any given call to
28/// `next`. Because of this, a [`Cursor`] iterates over `Result<T>` items rather than
29/// simply `T` items.
30///
31/// The batch size of the `Cursor` can be configured using the options to the method that returns
32/// it. For example, setting the `batch_size` field of
33/// [`FindOptions`](options/struct.FindOptions.html) will set the batch size of the
34/// `Cursor` returned by [`Collection::find`](struct.Collection.html#method.find).
35///
36/// Note that the batch size determines both the number of documents stored in memory by the
37/// `Cursor` at a given time as well as the total number of network round-trips needed to fetch all
38/// results from the server; both of these factors should be taken into account when choosing the
39/// optimal batch size.
40///
41/// [`Cursor`] implements [`Stream`](https://docs.rs/futures/latest/futures/stream/trait.Stream.html), which means
42/// it can be iterated over much in the same way that an `Iterator` can be in synchronous Rust. In
43/// order to do so, the [`StreamExt`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html) trait must
44/// be imported. Because a [`Cursor`] iterates over a `Result<T>`, it also has access to the
45/// potentially more ergonomic functionality provided by
46/// [`TryStreamExt`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html), which can be
47/// imported instead of or in addition to
48/// [`StreamExt`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html). The methods from
49/// [`TryStreamExt`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html) are especially useful when
50/// used in conjunction with the `?` operator.
51///
52/// ```rust
53/// # use mongodb::{bson::{Document, doc}, Client, error::Result};
54/// #
55/// # async fn do_stuff() -> Result<()> {
56/// # let client = Client::with_uri_str("mongodb://example.com").await?;
57/// # let coll = client.database("foo").collection::<Document>("bar");
58/// #
59/// use futures::stream::{StreamExt, TryStreamExt};
60///
61/// let mut cursor = coll.find(doc! {}).await?;
62/// // regular Stream uses next() and iterates over Option<Result<T>>
63/// while let Some(doc) = cursor.next().await {
64///   println!("{}", doc?)
65/// }
66/// // regular Stream uses collect() and collects into a Vec<Result<T>>
67/// let v: Vec<Result<_>> = cursor.collect().await;
68///
69/// let mut cursor = coll.find(doc! {}).await?;
70/// // TryStream uses try_next() and iterates over Result<Option<T>>
71/// while let Some(doc) = cursor.try_next().await? {
72///   println!("{}", doc)
73/// }
74/// // TryStream uses try_collect() and collects into a Result<Vec<T>>
75/// let v: Vec<_> = cursor.try_collect().await?;
76/// #
77/// # Ok(())
78/// # }
79/// ```
80///
81/// If a [`Cursor`] is still open when it goes out of scope, it will automatically be closed via an
82/// asynchronous [killCursors](https://www.mongodb.com/docs/manual/reference/command/killCursors/) command executed
83/// from its [`Drop`](https://doc.rust-lang.org/std/ops/trait.Drop.html) implementation.
84#[derive_where(Debug)]
85pub struct Cursor<T> {
86    stream: stream::Stream<'static, raw_batch::RawBatchCursor, T>,
87}
88
89impl<T> Cursor<T> {
90    /// Move the cursor forward, potentially triggering requests to the database for more results
91    /// if the local buffer has been exhausted.
92    ///
93    /// This will keep requesting data from the server until either the cursor is exhausted
94    /// or batch with results in it has been received.
95    ///
96    /// The return value indicates whether new results were successfully returned (true) or if
97    /// the cursor has been closed (false).
98    ///
99    /// Note: [`Cursor::current`] and [`Cursor::deserialize_current`] must only be called after
100    /// [`Cursor::advance`] returned `Ok(true)`. It is an error to call either of them without
101    /// calling [`Cursor::advance`] first or after [`Cursor::advance`] returns an error / false.
102    ///
103    /// ```
104    /// # use mongodb::{Client, bson::{Document, doc}, error::Result};
105    /// # async fn foo() -> Result<()> {
106    /// # let client = Client::with_uri_str("mongodb://localhost:27017").await?;
107    /// # let coll = client.database("stuff").collection::<Document>("stuff");
108    /// let mut cursor = coll.find(doc! {}).await?;
109    /// while cursor.advance().await? {
110    ///     println!("{:?}", cursor.current());
111    /// }
112    /// # Ok(())
113    /// # }
114    /// ```
115    pub async fn advance(&mut self) -> Result<bool> {
116        self.stream.buffer_mut().advance().await
117    }
118
119    /// Returns a reference to the current result in the cursor.
120    ///
121    /// # Panics
122    /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::current`] can be
123    /// invoked. Calling [`Cursor::current`] after [`Cursor::advance`] does not return true
124    /// or without calling [`Cursor::advance`] at all may result in a panic.
125    ///
126    /// ```
127    /// # use mongodb::{Client, bson::{Document, doc}, error::Result};
128    /// # async fn foo() -> Result<()> {
129    /// # let client = Client::with_uri_str("mongodb://localhost:27017").await?;
130    /// # let coll = client.database("stuff").collection::<Document>("stuff");
131    /// let mut cursor = coll.find(doc! {}).await?;
132    /// while cursor.advance().await? {
133    ///     println!("{:?}", cursor.current());
134    /// }
135    /// # Ok(())
136    /// # }
137    /// ```
138    pub fn current(&self) -> &RawDocument {
139        self.stream.buffer().current()
140    }
141
142    /// Returns true if the cursor has any additional items to return and false otherwise.
143    pub fn has_next(&self) -> bool {
144        let state = self.stream.buffer();
145        !state.batch().is_empty() || state.raw.has_next()
146    }
147
148    /// Deserialize the current result to the generic type associated with this cursor.
149    ///
150    /// # Panics
151    /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::deserialize_current`] can be
152    /// invoked. Calling [`Cursor::deserialize_current`] after [`Cursor::advance`] does not return
153    /// true or without calling [`Cursor::advance`] at all may result in a panic.
154    ///
155    /// ```
156    /// # use mongodb::{Client, error::Result, bson::doc};
157    /// # async fn foo() -> Result<()> {
158    /// # let client = Client::with_uri_str("mongodb://localhost:27017").await?;
159    /// # let db = client.database("foo");
160    /// use serde::Deserialize;
161    ///
162    /// #[derive(Debug, Deserialize)]
163    /// struct Cat<'a> {
164    ///     #[serde(borrow)]
165    ///     name: &'a str
166    /// }
167    ///
168    /// let coll = db.collection::<Cat>("cat");
169    /// let mut cursor = coll.find(doc! {}).await?;
170    /// while cursor.advance().await? {
171    ///     println!("{:?}", cursor.deserialize_current()?);
172    /// }
173    /// # Ok(())
174    /// # }
175    /// ```
176    pub fn deserialize_current<'a>(&'a self) -> Result<T>
177    where
178        T: Deserialize<'a>,
179    {
180        self.stream.buffer().deserialize_current()
181    }
182
183    /// Update the type streamed values will be parsed as.
184    pub fn with_type<'a, D>(self) -> Cursor<D>
185    where
186        D: Deserialize<'a>,
187    {
188        Cursor {
189            stream: self.stream.with_type(),
190        }
191    }
192
193    pub(crate) fn raw(&self) -> &raw_batch::RawBatchCursor {
194        &self.stream.buffer().raw
195    }
196
197    pub(crate) fn raw_mut(&mut self) -> &mut raw_batch::RawBatchCursor {
198        &mut self.stream.buffer_mut().raw
199    }
200
201    pub(crate) async fn try_advance(&mut self) -> Result<bool> {
202        self.stream.buffer_mut().try_advance().await
203    }
204
205    pub(crate) fn batch(&self) -> &std::collections::VecDeque<crate::bson::RawDocumentBuf> {
206        self.stream.buffer().batch()
207    }
208}
209
210pub(crate) trait NewCursor: Sized {
211    fn generic_new(
212        client: Client,
213        spec: common::CursorSpecification,
214        implicit_session: Option<ClientSession>,
215        pinned: Option<PinnedConnectionHandle>,
216    ) -> Result<Self>;
217}
218
219impl<T> NewCursor for Cursor<T> {
220    fn generic_new(
221        client: Client,
222        spec: common::CursorSpecification,
223        implicit_session: Option<ClientSession>,
224        pinned: Option<PinnedConnectionHandle>,
225    ) -> Result<Self> {
226        let raw = crate::cursor::raw_batch::RawBatchCursor::generic_new(
227            client,
228            spec,
229            implicit_session,
230            pinned,
231        )?;
232        Ok(Self {
233            stream: stream::Stream::new(raw),
234        })
235    }
236}
237
238impl<T> AsyncStream for Cursor<T>
239where
240    T: DeserializeOwned,
241{
242    type Item = Result<T>;
243
244    fn poll_next(
245        mut self: std::pin::Pin<&mut Self>,
246        cx: &mut std::task::Context<'_>,
247    ) -> Poll<Option<Self::Item>> {
248        self.stream.poll_next_unpin(cx)
249    }
250}