mongodb/sync/cursor.rs
1use futures_util::stream::StreamExt;
2use serde::de::{Deserialize, DeserializeOwned};
3
4use super::ClientSession;
5use crate::{
6 bson::{Document, RawDocument},
7 error::Result,
8 Cursor as AsyncCursor,
9 SessionCursor as AsyncSessionCursor,
10 SessionCursorStream,
11};
12
13/// A `Cursor` streams the result of a query. When a query is made, a `Cursor` will be returned with
14/// the first batch of results from the server; the documents will be returned as the `Cursor` is
15/// iterated. When the batch is exhausted and if there are more results, the `Cursor` will fetch the
16/// next batch of documents, and so forth until the results are exhausted. Note that because of this
17/// batching, additional network I/O may occur on any given call to `Cursor::next`. Because of this,
18/// a `Cursor` iterates over `Result<Document>` items rather than simply `Document` items.
19///
20/// The batch size of the `Cursor` can be configured using the options to the method that returns
21/// it. For example, setting the `batch_size` field of
22/// [`FindOptions`](options/struct.FindOptions.html) will set the batch size of the
23/// `Cursor` returned by [`Collection::find`](struct.Collection.html#method.find).
24///
25/// Note that the batch size determines both the number of documents stored in memory by the
26/// `Cursor` at a given time as well as the total number of network round-trips needed to fetch all
27/// results from the server; both of these factors should be taken into account when choosing the
28/// optimal batch size.
29///
30/// A cursor can be used like any other [`Iterator`]. The simplest way is just to iterate over the
31/// documents it yields using a for loop:
32///
33/// ```rust
34/// # use mongodb::{bson::{doc, Document}, sync::Client, error::Result};
35/// #
36/// # fn do_stuff() -> Result<()> {
37/// # let client = Client::with_uri_str("mongodb://example.com")?;
38/// # let coll = client.database("foo").collection::<Document>("bar");
39/// # let mut cursor = coll.find(doc! {}).run()?;
40/// #
41/// for doc in cursor {
42/// println!("{}", doc?)
43/// }
44/// #
45/// # Ok(())
46/// # }
47/// ```
48///
49/// Additionally, all the other methods that an [`Iterator`] has are available on `Cursor` as well.
50/// For instance, if the number of results from a query is known to be small, it might make sense
51/// to collect them into a vector:
52///
53/// ```rust
54/// # use mongodb::{
55/// # bson::{doc, Document},
56/// # error::Result,
57/// # sync::Client,
58/// # };
59/// #
60/// # fn do_stuff() -> Result<()> {
61/// # let client = Client::with_uri_str("mongodb://example.com")?;
62/// # let coll = client.database("foo").collection("bar");
63/// # let cursor = coll.find(doc! { "x": 1 }).run()?;
64/// #
65/// let results: Vec<Result<Document>> = cursor.collect();
66/// # Ok(())
67/// # }
68/// ```
69#[derive(Debug)]
70pub struct Cursor<T> {
71 async_cursor: AsyncCursor<T>,
72}
73
74impl<T> Cursor<T> {
75 pub(crate) fn new(async_cursor: AsyncCursor<T>) -> Self {
76 Self { async_cursor }
77 }
78}
79
80impl<T> Cursor<T> {
81 /// Move the cursor forward, potentially triggering requests to the database for more results
82 /// if the local buffer has been exhausted.
83 ///
84 /// This will keep requesting data from the server until either the cursor is exhausted
85 /// or batch with results in it has been received.
86 ///
87 /// The return value indicates whether new results were successfully returned (true) or if
88 /// the cursor has been closed (false).
89 ///
90 /// Note: [`Cursor::current`] and [`Cursor::deserialize_current`] must only be called after
91 /// [`Cursor::advance`] returned `Ok(true)`. It is an error to call either of them without
92 /// calling [`Cursor::advance`] first or after [`Cursor::advance`] returns an error / false.
93 ///
94 /// ```
95 /// # use mongodb::{sync::Client, bson::{Document, doc}, error::Result};
96 /// # fn foo() -> Result<()> {
97 /// # let client = Client::with_uri_str("mongodb://localhost:27017")?;
98 /// # let coll = client.database("stuff").collection::<Document>("stuff");
99 /// let mut cursor = coll.find(doc! {}).run()?;
100 /// while cursor.advance()? {
101 /// println!("{:?}", cursor.deserialize_current()?);
102 /// }
103 /// # Ok(())
104 /// # }
105 /// ```
106 pub fn advance(&mut self) -> Result<bool> {
107 crate::sync::TOKIO_RUNTIME.block_on(self.async_cursor.advance())
108 }
109
110 /// Returns a reference to the current result in the cursor.
111 ///
112 /// # Panics
113 /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::current`] can be
114 /// invoked. Calling [`Cursor::current`] after [`Cursor::advance`] does not return true
115 /// or without calling [`Cursor::advance`] at all may result in a panic.
116 ///
117 /// ```
118 /// # use mongodb::{sync::Client, bson::{doc, Document}, error::Result};
119 /// # fn foo() -> Result<()> {
120 /// # let client = Client::with_uri_str("mongodb://localhost:27017")?;
121 /// # let coll = client.database("stuff").collection::<Document>("stuff");
122 /// let mut cursor = coll.find(doc! {}).run()?;
123 /// while cursor.advance()? {
124 /// println!("{:?}", cursor.current());
125 /// }
126 /// # Ok(())
127 /// # }
128 /// ```
129 pub fn current(&self) -> &RawDocument {
130 self.async_cursor.current()
131 }
132
133 /// Returns true if the cursor has any additional items to return and false otherwise.
134 pub fn has_next(&self) -> bool {
135 self.async_cursor.has_next()
136 }
137
138 /// Deserialize the current result to the generic type associated with this cursor.
139 ///
140 /// # Panics
141 /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::deserialize_current`] can be
142 /// invoked. Calling [`Cursor::deserialize_current`] after [`Cursor::advance`] does not return
143 /// true or without calling [`Cursor::advance`] at all may result in a panic.
144 ///
145 /// ```
146 /// # use mongodb::{sync::Client, error::Result, bson::doc};
147 /// # fn foo() -> Result<()> {
148 /// # let client = Client::with_uri_str("mongodb://localhost:27017")?;
149 /// # let db = client.database("foo");
150 /// use serde::Deserialize;
151 ///
152 /// #[derive(Debug, Deserialize)]
153 /// struct Cat<'a> {
154 /// #[serde(borrow)]
155 /// name: &'a str
156 /// }
157 ///
158 /// let coll = db.collection::<Cat>("cat");
159 /// let mut cursor = coll.find(doc! {}).run()?;
160 /// while cursor.advance()? {
161 /// println!("{:?}", cursor.deserialize_current()?);
162 /// }
163 /// # Ok(())
164 /// # }
165 /// ```
166 pub fn deserialize_current<'a>(&'a self) -> Result<T>
167 where
168 T: Deserialize<'a>,
169 {
170 self.async_cursor.deserialize_current()
171 }
172}
173
174impl<T> Iterator for Cursor<T>
175where
176 T: DeserializeOwned + Unpin + Send + Sync,
177{
178 type Item = Result<T>;
179
180 fn next(&mut self) -> Option<Self::Item> {
181 crate::sync::TOKIO_RUNTIME.block_on(self.async_cursor.next())
182 }
183}
184
185/// A `SessionCursor` is a cursor that was created with a `ClientSession` must be iterated using
186/// one. To iterate, retrieve a [`SessionCursorIter]` using [`SessionCursor::iter`]:
187///
188/// ```rust
189/// # use mongodb::{bson::{doc, Document}, sync::Client, error::Result};
190/// #
191/// # fn do_stuff() -> Result<()> {
192/// # let client = Client::with_uri_str("mongodb://example.com")?;
193/// # let mut session = client.start_session().run()?;
194/// # let coll = client.database("foo").collection::<Document>("bar");
195/// # let mut cursor = coll.find(doc! {}).session(&mut session).run()?;
196/// #
197/// for doc in cursor.iter(&mut session) {
198/// println!("{}", doc?)
199/// }
200/// #
201/// # Ok(())
202/// # }
203/// ```
204#[derive(Debug)]
205pub struct SessionCursor<T> {
206 async_cursor: AsyncSessionCursor<T>,
207}
208
209impl<T> SessionCursor<T> {
210 pub(crate) fn new(async_cursor: AsyncSessionCursor<T>) -> Self {
211 Self { async_cursor }
212 }
213
214 /// Move the cursor forward, potentially triggering requests to the database for more results
215 /// if the local buffer has been exhausted.
216 ///
217 /// This will keep requesting data from the server until either the cursor is exhausted
218 /// or batch with results in it has been received.
219 ///
220 /// The return value indicates whether new results were successfully returned (true) or if
221 /// the cursor has been closed (false).
222 ///
223 /// Note: [`Cursor::current`] and [`Cursor::deserialize_current`] must only be called after
224 /// [`Cursor::advance`] returned `Ok(true)`. It is an error to call either of them without
225 /// calling [`Cursor::advance`] first or after [`Cursor::advance`] returns an error / false.
226 ///
227 /// ```
228 /// # use mongodb::{sync::Client, bson::{doc, Document}, error::Result};
229 /// # fn foo() -> Result<()> {
230 /// # let client = Client::with_uri_str("mongodb://localhost:27017")?;
231 /// # let mut session = client.start_session().run()?;
232 /// # let coll = client.database("stuff").collection::<Document>("stuff");
233 /// let mut cursor = coll.find(doc! {}).session(&mut session).run()?;
234 /// while cursor.advance(&mut session)? {
235 /// println!("{:?}", cursor.deserialize_current()?);
236 /// }
237 /// # Ok(())
238 /// # }
239 /// ```
240 pub fn advance(&mut self, session: &mut ClientSession) -> Result<bool> {
241 crate::sync::TOKIO_RUNTIME
242 .block_on(self.async_cursor.advance(&mut session.async_client_session))
243 }
244
245 /// Returns a reference to the current result in the cursor.
246 ///
247 /// # Panics
248 /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::current`] can be
249 /// invoked. Calling [`Cursor::current`] after [`Cursor::advance`] does not return true
250 /// or without calling [`Cursor::advance`] at all may result in a panic.
251 ///
252 /// ```
253 /// # use mongodb::{sync::Client, bson::{doc, Document}, error::Result};
254 /// # fn foo() -> Result<()> {
255 /// # let client = Client::with_uri_str("mongodb://localhost:27017")?;
256 /// # let mut session = client.start_session().run()?;
257 /// # let coll = client.database("stuff").collection::<Document>("stuff");
258 /// let mut cursor = coll.find(doc! {}).session(&mut session).run()?;
259 /// while cursor.advance(&mut session)? {
260 /// println!("{:?}", cursor.current());
261 /// }
262 /// # Ok(())
263 /// # }
264 /// ```
265 pub fn current(&self) -> &RawDocument {
266 self.async_cursor.current()
267 }
268
269 /// Deserialize the current result to the generic type associated with this cursor.
270 ///
271 /// # Panics
272 /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::deserialize_current`] can be
273 /// invoked. Calling [`Cursor::deserialize_current`] after [`Cursor::advance`] does not return
274 /// true or without calling [`Cursor::advance`] at all may result in a panic.
275 ///
276 /// ```
277 /// # use mongodb::{sync::Client, error::Result, bson::doc};
278 /// # fn foo() -> Result<()> {
279 /// # let client = Client::with_uri_str("mongodb://localhost:27017")?;
280 /// # let mut session = client.start_session().run()?;
281 /// # let db = client.database("foo");
282 /// use serde::Deserialize;
283 ///
284 /// #[derive(Debug, Deserialize)]
285 /// struct Cat<'a> {
286 /// #[serde(borrow)]
287 /// name: &'a str
288 /// }
289 ///
290 /// let coll = db.collection::<Cat>("cat");
291 /// let mut cursor = coll.find(doc! {}).session(&mut session).run()?;
292 /// while cursor.advance(&mut session)? {
293 /// println!("{:?}", cursor.deserialize_current()?);
294 /// }
295 /// # Ok(())
296 /// # }
297 /// ```
298 pub fn deserialize_current<'a>(&'a self) -> Result<T>
299 where
300 T: Deserialize<'a>,
301 {
302 self.async_cursor.deserialize_current()
303 }
304}
305
306impl<T> SessionCursor<T>
307where
308 T: DeserializeOwned + Unpin + Send + Sync,
309{
310 /// Retrieves a [`SessionCursorIter`] to iterate this cursor. The session provided must be
311 /// the same session used to create the cursor.
312 pub fn iter<'session>(
313 &mut self,
314 session: &'session mut ClientSession,
315 ) -> SessionCursorIter<'_, 'session, T> {
316 SessionCursorIter {
317 async_stream: self.async_cursor.stream(&mut session.async_client_session),
318 }
319 }
320
321 /// Retrieve the next result from the cursor.
322 /// The session provided must be the same session used to create the cursor.
323 ///
324 /// Use this method when the session needs to be used again between iterations or when the added
325 /// functionality of `Iterator` is not needed.
326 ///
327 /// ```
328 /// # use mongodb::{bson::{doc, Document}, sync::Client};
329 /// # fn foo() -> mongodb::error::Result<()> {
330 /// # let client = Client::with_uri_str("foo")?;
331 /// # let coll = client.database("foo").collection::<Document>("bar");
332 /// # let other_coll = coll.clone();
333 /// # let mut session = client.start_session().run()?;
334 /// let mut cursor = coll.find(doc! { "x": 1 }).session(&mut session).run()?;
335 /// while let Some(doc) = cursor.next(&mut session).transpose()? {
336 /// other_coll.insert_one(doc).session(&mut session).run()?;
337 /// }
338 /// # Ok::<(), mongodb::error::Error>(())
339 /// # }
340 /// ```
341 pub fn next(&mut self, session: &mut ClientSession) -> Option<Result<T>> {
342 self.iter(session).next()
343 }
344}
345
346/// A handle that borrows a `ClientSession` temporarily for executing getMores or iterating through
347/// the current buffer of a `SessionCursor`.
348///
349/// This updates the buffer of the parent `SessionCursor` when dropped.
350pub struct SessionCursorIter<'cursor, 'session, T = Document>
351where
352 T: DeserializeOwned + Unpin + Send + Sync,
353{
354 async_stream: SessionCursorStream<'cursor, 'session, T>,
355}
356
357impl<T> Iterator for SessionCursorIter<'_, '_, T>
358where
359 T: DeserializeOwned + Unpin + Send + Sync,
360{
361 type Item = Result<T>;
362
363 fn next(&mut self) -> Option<Self::Item> {
364 crate::sync::TOKIO_RUNTIME.block_on(self.async_stream.next())
365 }
366}