Skip to main content

mongodb/sync/
cursor.rs

1use futures_util::stream::StreamExt;
2use serde::de::{Deserialize, DeserializeOwned};
3
4use super::ClientSession;
5use crate::{
6    bson::{Document, RawDocument},
7    error::Result,
8    Cursor as AsyncCursor,
9    SessionCursor as AsyncSessionCursor,
10    SessionCursorStream,
11};
12
13/// A `Cursor` streams the result of a query. When a query is made, a `Cursor` will be returned with
14/// the first batch of results from the server; the documents will be returned as the `Cursor` is
15/// iterated. When the batch is exhausted and if there are more results, the `Cursor` will fetch the
16/// next batch of documents, and so forth until the results are exhausted. Note that because of this
17/// batching, additional network I/O may occur on any given call to `Cursor::next`. Because of this,
18/// a `Cursor` iterates over `Result<Document>` items rather than simply `Document` items.
19///
20/// The batch size of the `Cursor` can be configured using the options to the method that returns
21/// it. For example, setting the `batch_size` field of
22/// [`FindOptions`](options/struct.FindOptions.html) will set the batch size of the
23/// `Cursor` returned by [`Collection::find`](struct.Collection.html#method.find).
24///
25/// Note that the batch size determines both the number of documents stored in memory by the
26/// `Cursor` at a given time as well as the total number of network round-trips needed to fetch all
27/// results from the server; both of these factors should be taken into account when choosing the
28/// optimal batch size.
29///
30/// A cursor can be used like any other [`Iterator`]. The simplest way is just to iterate over the
31/// documents it yields using a for loop:
32///
33/// ```rust
34/// # use mongodb::{bson::{doc, Document}, sync::Client, error::Result};
35/// #
36/// # fn do_stuff() -> Result<()> {
37/// # let client = Client::with_uri_str("mongodb://example.com")?;
38/// # let coll = client.database("foo").collection::<Document>("bar");
39/// # let mut cursor = coll.find(doc! {}).run()?;
40/// #
41/// for doc in cursor {
42///   println!("{}", doc?)
43/// }
44/// #
45/// # Ok(())
46/// # }
47/// ```
48///
49/// Additionally, all the other methods that an [`Iterator`] has are available on `Cursor` as well.
50/// For instance, if the number of results from a query is known to be small, it might make sense
51/// to collect them into a vector:
52///
53/// ```rust
54/// # use mongodb::{
55/// #     bson::{doc, Document},
56/// #     error::Result,
57/// #     sync::Client,
58/// # };
59/// #
60/// # fn do_stuff() -> Result<()> {
61/// # let client = Client::with_uri_str("mongodb://example.com")?;
62/// # let coll = client.database("foo").collection("bar");
63/// # let cursor = coll.find(doc! { "x": 1 }).run()?;
64/// #
65/// let results: Vec<Result<Document>> = cursor.collect();
66/// # Ok(())
67/// # }
68/// ```
69#[derive(Debug)]
70pub struct Cursor<T> {
71    async_cursor: AsyncCursor<T>,
72}
73
74impl<T> Cursor<T> {
75    pub(crate) fn new(async_cursor: AsyncCursor<T>) -> Self {
76        Self { async_cursor }
77    }
78}
79
80impl<T> Cursor<T> {
81    /// Move the cursor forward, potentially triggering requests to the database for more results
82    /// if the local buffer has been exhausted.
83    ///
84    /// This will keep requesting data from the server until either the cursor is exhausted
85    /// or batch with results in it has been received.
86    ///
87    /// The return value indicates whether new results were successfully returned (true) or if
88    /// the cursor has been closed (false).
89    ///
90    /// Note: [`Cursor::current`] and [`Cursor::deserialize_current`] must only be called after
91    /// [`Cursor::advance`] returned `Ok(true)`. It is an error to call either of them without
92    /// calling [`Cursor::advance`] first or after [`Cursor::advance`] returns an error / false.
93    ///
94    /// ```
95    /// # use mongodb::{sync::Client, bson::{Document, doc}, error::Result};
96    /// # fn foo() -> Result<()> {
97    /// # let client = Client::with_uri_str("mongodb://localhost:27017")?;
98    /// # let coll = client.database("stuff").collection::<Document>("stuff");
99    /// let mut cursor = coll.find(doc! {}).run()?;
100    /// while cursor.advance()? {
101    ///     println!("{:?}", cursor.deserialize_current()?);
102    /// }
103    /// # Ok(())
104    /// # }
105    /// ```
106    pub fn advance(&mut self) -> Result<bool> {
107        crate::sync::TOKIO_RUNTIME.block_on(self.async_cursor.advance())
108    }
109
110    /// Returns a reference to the current result in the cursor.
111    ///
112    /// # Panics
113    /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::current`] can be
114    /// invoked. Calling [`Cursor::current`] after [`Cursor::advance`] does not return true
115    /// or without calling [`Cursor::advance`] at all may result in a panic.
116    ///
117    /// ```
118    /// # use mongodb::{sync::Client, bson::{doc, Document}, error::Result};
119    /// # fn foo() -> Result<()> {
120    /// # let client = Client::with_uri_str("mongodb://localhost:27017")?;
121    /// # let coll = client.database("stuff").collection::<Document>("stuff");
122    /// let mut cursor = coll.find(doc! {}).run()?;
123    /// while cursor.advance()? {
124    ///     println!("{:?}", cursor.current());
125    /// }
126    /// # Ok(())
127    /// # }
128    /// ```
129    pub fn current(&self) -> &RawDocument {
130        self.async_cursor.current()
131    }
132
133    /// Returns true if the cursor has any additional items to return and false otherwise.
134    pub fn has_next(&self) -> bool {
135        self.async_cursor.has_next()
136    }
137
138    /// Deserialize the current result to the generic type associated with this cursor.
139    ///
140    /// # Panics
141    /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::deserialize_current`] can be
142    /// invoked. Calling [`Cursor::deserialize_current`] after [`Cursor::advance`] does not return
143    /// true or without calling [`Cursor::advance`] at all may result in a panic.
144    ///
145    /// ```
146    /// # use mongodb::{sync::Client, error::Result, bson::doc};
147    /// # fn foo() -> Result<()> {
148    /// # let client = Client::with_uri_str("mongodb://localhost:27017")?;
149    /// # let db = client.database("foo");
150    /// use serde::Deserialize;
151    ///
152    /// #[derive(Debug, Deserialize)]
153    /// struct Cat<'a> {
154    ///     #[serde(borrow)]
155    ///     name: &'a str
156    /// }
157    ///
158    /// let coll = db.collection::<Cat>("cat");
159    /// let mut cursor = coll.find(doc! {}).run()?;
160    /// while cursor.advance()? {
161    ///     println!("{:?}", cursor.deserialize_current()?);
162    /// }
163    /// # Ok(())
164    /// # }
165    /// ```
166    pub fn deserialize_current<'a>(&'a self) -> Result<T>
167    where
168        T: Deserialize<'a>,
169    {
170        self.async_cursor.deserialize_current()
171    }
172}
173
174impl<T> Iterator for Cursor<T>
175where
176    T: DeserializeOwned + Unpin + Send + Sync,
177{
178    type Item = Result<T>;
179
180    fn next(&mut self) -> Option<Self::Item> {
181        crate::sync::TOKIO_RUNTIME.block_on(self.async_cursor.next())
182    }
183}
184
185/// A `SessionCursor` is a cursor that was created with a `ClientSession` must be iterated using
186/// one. To iterate, retrieve a [`SessionCursorIter]` using [`SessionCursor::iter`]:
187///
188/// ```rust
189/// # use mongodb::{bson::{doc, Document}, sync::Client, error::Result};
190/// #
191/// # fn do_stuff() -> Result<()> {
192/// # let client = Client::with_uri_str("mongodb://example.com")?;
193/// # let mut session = client.start_session().run()?;
194/// # let coll = client.database("foo").collection::<Document>("bar");
195/// # let mut cursor = coll.find(doc! {}).session(&mut session).run()?;
196/// #
197/// for doc in cursor.iter(&mut session) {
198///   println!("{}", doc?)
199/// }
200/// #
201/// # Ok(())
202/// # }
203/// ```
204#[derive(Debug)]
205pub struct SessionCursor<T> {
206    async_cursor: AsyncSessionCursor<T>,
207}
208
209impl<T> SessionCursor<T> {
210    pub(crate) fn new(async_cursor: AsyncSessionCursor<T>) -> Self {
211        Self { async_cursor }
212    }
213
214    /// Move the cursor forward, potentially triggering requests to the database for more results
215    /// if the local buffer has been exhausted.
216    ///
217    /// This will keep requesting data from the server until either the cursor is exhausted
218    /// or batch with results in it has been received.
219    ///
220    /// The return value indicates whether new results were successfully returned (true) or if
221    /// the cursor has been closed (false).
222    ///
223    /// Note: [`Cursor::current`] and [`Cursor::deserialize_current`] must only be called after
224    /// [`Cursor::advance`] returned `Ok(true)`. It is an error to call either of them without
225    /// calling [`Cursor::advance`] first or after [`Cursor::advance`] returns an error / false.
226    ///
227    /// ```
228    /// # use mongodb::{sync::Client, bson::{doc, Document}, error::Result};
229    /// # fn foo() -> Result<()> {
230    /// # let client = Client::with_uri_str("mongodb://localhost:27017")?;
231    /// # let mut session = client.start_session().run()?;
232    /// # let coll = client.database("stuff").collection::<Document>("stuff");
233    /// let mut cursor = coll.find(doc! {}).session(&mut session).run()?;
234    /// while cursor.advance(&mut session)? {
235    ///     println!("{:?}", cursor.deserialize_current()?);
236    /// }
237    /// # Ok(())
238    /// # }
239    /// ```
240    pub fn advance(&mut self, session: &mut ClientSession) -> Result<bool> {
241        crate::sync::TOKIO_RUNTIME
242            .block_on(self.async_cursor.advance(&mut session.async_client_session))
243    }
244
245    /// Returns a reference to the current result in the cursor.
246    ///
247    /// # Panics
248    /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::current`] can be
249    /// invoked. Calling [`Cursor::current`] after [`Cursor::advance`] does not return true
250    /// or without calling [`Cursor::advance`] at all may result in a panic.
251    ///
252    /// ```
253    /// # use mongodb::{sync::Client, bson::{doc, Document}, error::Result};
254    /// # fn foo() -> Result<()> {
255    /// # let client = Client::with_uri_str("mongodb://localhost:27017")?;
256    /// # let mut session = client.start_session().run()?;
257    /// # let coll = client.database("stuff").collection::<Document>("stuff");
258    /// let mut cursor = coll.find(doc! {}).session(&mut session).run()?;
259    /// while cursor.advance(&mut session)? {
260    ///     println!("{:?}", cursor.current());
261    /// }
262    /// # Ok(())
263    /// # }
264    /// ```
265    pub fn current(&self) -> &RawDocument {
266        self.async_cursor.current()
267    }
268
269    /// Deserialize the current result to the generic type associated with this cursor.
270    ///
271    /// # Panics
272    /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::deserialize_current`] can be
273    /// invoked. Calling [`Cursor::deserialize_current`] after [`Cursor::advance`] does not return
274    /// true or without calling [`Cursor::advance`] at all may result in a panic.
275    ///
276    /// ```
277    /// # use mongodb::{sync::Client, error::Result, bson::doc};
278    /// # fn foo() -> Result<()> {
279    /// # let client = Client::with_uri_str("mongodb://localhost:27017")?;
280    /// # let mut session = client.start_session().run()?;
281    /// # let db = client.database("foo");
282    /// use serde::Deserialize;
283    ///
284    /// #[derive(Debug, Deserialize)]
285    /// struct Cat<'a> {
286    ///     #[serde(borrow)]
287    ///     name: &'a str
288    /// }
289    ///
290    /// let coll = db.collection::<Cat>("cat");
291    /// let mut cursor = coll.find(doc! {}).session(&mut session).run()?;
292    /// while cursor.advance(&mut session)? {
293    ///     println!("{:?}", cursor.deserialize_current()?);
294    /// }
295    /// # Ok(())
296    /// # }
297    /// ```
298    pub fn deserialize_current<'a>(&'a self) -> Result<T>
299    where
300        T: Deserialize<'a>,
301    {
302        self.async_cursor.deserialize_current()
303    }
304}
305
306impl<T> SessionCursor<T>
307where
308    T: DeserializeOwned + Unpin + Send + Sync,
309{
310    /// Retrieves a [`SessionCursorIter`] to iterate this cursor. The session provided must be
311    /// the same session used to create the cursor.
312    pub fn iter<'session>(
313        &mut self,
314        session: &'session mut ClientSession,
315    ) -> SessionCursorIter<'_, 'session, T> {
316        SessionCursorIter {
317            async_stream: self.async_cursor.stream(&mut session.async_client_session),
318        }
319    }
320
321    /// Retrieve the next result from the cursor.
322    /// The session provided must be the same session used to create the cursor.
323    ///
324    /// Use this method when the session needs to be used again between iterations or when the added
325    /// functionality of `Iterator` is not needed.
326    ///
327    /// ```
328    /// # use mongodb::{bson::{doc, Document}, sync::Client};
329    /// # fn foo() -> mongodb::error::Result<()> {
330    /// # let client = Client::with_uri_str("foo")?;
331    /// # let coll = client.database("foo").collection::<Document>("bar");
332    /// # let other_coll = coll.clone();
333    /// # let mut session = client.start_session().run()?;
334    /// let mut cursor = coll.find(doc! { "x": 1 }).session(&mut session).run()?;
335    /// while let Some(doc) = cursor.next(&mut session).transpose()? {
336    ///     other_coll.insert_one(doc).session(&mut session).run()?;
337    /// }
338    /// # Ok::<(), mongodb::error::Error>(())
339    /// # }
340    /// ```
341    pub fn next(&mut self, session: &mut ClientSession) -> Option<Result<T>> {
342        self.iter(session).next()
343    }
344}
345
346/// A handle that borrows a `ClientSession` temporarily for executing getMores or iterating through
347/// the current buffer of a `SessionCursor`.
348///
349/// This updates the buffer of the parent `SessionCursor` when dropped.
350pub struct SessionCursorIter<'cursor, 'session, T = Document>
351where
352    T: DeserializeOwned + Unpin + Send + Sync,
353{
354    async_stream: SessionCursorStream<'cursor, 'session, T>,
355}
356
357impl<T> Iterator for SessionCursorIter<'_, '_, T>
358where
359    T: DeserializeOwned + Unpin + Send + Sync,
360{
361    type Item = Result<T>;
362
363    fn next(&mut self) -> Option<Self::Item> {
364        crate::sync::TOKIO_RUNTIME.block_on(self.async_stream.next())
365    }
366}