mongodb/cursor.rs
1mod common;
2pub(crate) mod session;
3
4#[cfg(test)]
5use std::collections::VecDeque;
6use std::{
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11use crate::bson::RawDocument;
12
13#[cfg(test)]
14use crate::bson::RawDocumentBuf;
15use derive_where::derive_where;
16use futures_core::Stream;
17use serde::{de::DeserializeOwned, Deserialize};
18#[cfg(test)]
19use tokio::sync::oneshot;
20
21use crate::{
22 change_stream::event::ResumeToken,
23 client::{options::ServerAddress, AsyncDropToken},
24 cmap::conn::PinnedConnectionHandle,
25 cursor::common::ImplicitClientSessionHandle,
26 error::{Error, Result},
27 Client,
28 ClientSession,
29};
30use common::{kill_cursor, GenericCursor};
31pub(crate) use common::{
32 stream_poll_next,
33 BatchValue,
34 CursorInformation,
35 CursorSpecification,
36 CursorStream,
37 NextInBatchFuture,
38 PinnedConnection,
39};
40
41/// A [`Cursor`] streams the result of a query. When a query is made, the returned [`Cursor`] will
42/// contain the first batch of results from the server; the individual results will then be returned
43/// as the [`Cursor`] is iterated. When the batch is exhausted and if there are more results, the
44/// [`Cursor`] will fetch the next batch of documents, and so forth until the results are exhausted.
45/// Note that because of this batching, additional network I/O may occur on any given call to
46/// `next`. Because of this, a [`Cursor`] iterates over `Result<T>` items rather than
47/// simply `T` items.
48///
49/// The batch size of the `Cursor` can be configured using the options to the method that returns
50/// it. For example, setting the `batch_size` field of
51/// [`FindOptions`](options/struct.FindOptions.html) will set the batch size of the
52/// `Cursor` returned by [`Collection::find`](struct.Collection.html#method.find).
53///
54/// Note that the batch size determines both the number of documents stored in memory by the
55/// `Cursor` at a given time as well as the total number of network round-trips needed to fetch all
56/// results from the server; both of these factors should be taken into account when choosing the
57/// optimal batch size.
58///
59/// [`Cursor`] implements [`Stream`](https://docs.rs/futures/latest/futures/stream/trait.Stream.html), which means
60/// it can be iterated over much in the same way that an `Iterator` can be in synchronous Rust. In
61/// order to do so, the [`StreamExt`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html) trait must
62/// be imported. Because a [`Cursor`] iterates over a `Result<T>`, it also has access to the
63/// potentially more ergonomic functionality provided by
64/// [`TryStreamExt`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html), which can be
65/// imported instead of or in addition to
66/// [`StreamExt`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html). The methods from
67/// [`TryStreamExt`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html) are especially useful when
68/// used in conjunction with the `?` operator.
69///
70/// ```rust
71/// # use mongodb::{bson::{Document, doc}, Client, error::Result};
72/// #
73/// # async fn do_stuff() -> Result<()> {
74/// # let client = Client::with_uri_str("mongodb://example.com").await?;
75/// # let coll = client.database("foo").collection::<Document>("bar");
76/// #
77/// use futures::stream::{StreamExt, TryStreamExt};
78///
79/// let mut cursor = coll.find(doc! {}).await?;
80/// // regular Stream uses next() and iterates over Option<Result<T>>
81/// while let Some(doc) = cursor.next().await {
82/// println!("{}", doc?)
83/// }
84/// // regular Stream uses collect() and collects into a Vec<Result<T>>
85/// let v: Vec<Result<_>> = cursor.collect().await;
86///
87/// let mut cursor = coll.find(doc! {}).await?;
88/// // TryStream uses try_next() and iterates over Result<Option<T>>
89/// while let Some(doc) = cursor.try_next().await? {
90/// println!("{}", doc)
91/// }
92/// // TryStream uses try_collect() and collects into a Result<Vec<T>>
93/// let v: Vec<_> = cursor.try_collect().await?;
94/// #
95/// # Ok(())
96/// # }
97/// ```
98///
99/// If a [`Cursor`] is still open when it goes out of scope, it will automatically be closed via an
100/// asynchronous [killCursors](https://www.mongodb.com/docs/manual/reference/command/killCursors/) command executed
101/// from its [`Drop`](https://doc.rust-lang.org/std/ops/trait.Drop.html) implementation.
102#[derive_where(Debug)]
103pub struct Cursor<T> {
104 client: Client,
105 drop_token: AsyncDropToken,
106 // `wrapped_cursor` is an `Option` so that it can be `None` for the `drop` impl for a cursor
107 // that's had `with_type` called; in all other circumstances it will be `Some`.
108 wrapped_cursor: Option<ImplicitSessionCursor>,
109 drop_address: Option<ServerAddress>,
110 #[cfg(test)]
111 kill_watcher: Option<oneshot::Sender<()>>,
112 #[derive_where(skip)]
113 _phantom: std::marker::PhantomData<fn() -> T>,
114}
115
116impl<T> Cursor<T> {
117 pub(crate) fn new(
118 client: Client,
119 spec: CursorSpecification,
120 session: Option<ClientSession>,
121 pin: Option<PinnedConnectionHandle>,
122 ) -> Self {
123 Self {
124 client: client.clone(),
125 drop_token: client.register_async_drop(),
126 wrapped_cursor: Some(ImplicitSessionCursor::with_implicit_session(
127 client,
128 spec,
129 PinnedConnection::new(pin),
130 ImplicitClientSessionHandle(session),
131 )),
132 drop_address: None,
133 #[cfg(test)]
134 kill_watcher: None,
135 _phantom: Default::default(),
136 }
137 }
138
139 pub(crate) fn post_batch_resume_token(&self) -> Option<&ResumeToken> {
140 self.wrapped_cursor
141 .as_ref()
142 .and_then(|c| c.post_batch_resume_token())
143 }
144
145 pub(crate) fn client(&self) -> &Client {
146 &self.client
147 }
148
149 pub(crate) fn address(&self) -> &ServerAddress {
150 self.wrapped_cursor.as_ref().unwrap().address()
151 }
152
153 pub(crate) fn set_drop_address(&mut self, address: ServerAddress) {
154 self.drop_address = Some(address);
155 }
156
157 pub(crate) fn take_implicit_session(&mut self) -> Option<ClientSession> {
158 self.wrapped_cursor
159 .as_mut()
160 .and_then(|c| c.take_implicit_session())
161 }
162
163 /// Move the cursor forward, potentially triggering requests to the database for more results
164 /// if the local buffer has been exhausted.
165 ///
166 /// This will keep requesting data from the server until either the cursor is exhausted
167 /// or batch with results in it has been received.
168 ///
169 /// The return value indicates whether new results were successfully returned (true) or if
170 /// the cursor has been closed (false).
171 ///
172 /// Note: [`Cursor::current`] and [`Cursor::deserialize_current`] must only be called after
173 /// [`Cursor::advance`] returned `Ok(true)`. It is an error to call either of them without
174 /// calling [`Cursor::advance`] first or after [`Cursor::advance`] returns an error / false.
175 ///
176 /// ```
177 /// # use mongodb::{Client, bson::{Document, doc}, error::Result};
178 /// # async fn foo() -> Result<()> {
179 /// # let client = Client::with_uri_str("mongodb://localhost:27017").await?;
180 /// # let coll = client.database("stuff").collection::<Document>("stuff");
181 /// let mut cursor = coll.find(doc! {}).await?;
182 /// while cursor.advance().await? {
183 /// println!("{:?}", cursor.current());
184 /// }
185 /// # Ok(())
186 /// # }
187 /// ```
188 pub async fn advance(&mut self) -> Result<bool> {
189 self.wrapped_cursor.as_mut().unwrap().advance().await
190 }
191
192 #[cfg(test)]
193 pub(crate) async fn try_advance(&mut self) -> Result<()> {
194 self.wrapped_cursor
195 .as_mut()
196 .unwrap()
197 .try_advance()
198 .await
199 .map(|_| ())
200 }
201
202 /// Returns a reference to the current result in the cursor.
203 ///
204 /// # Panics
205 /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::current`] can be
206 /// invoked. Calling [`Cursor::current`] after [`Cursor::advance`] does not return true
207 /// or without calling [`Cursor::advance`] at all may result in a panic.
208 ///
209 /// ```
210 /// # use mongodb::{Client, bson::{Document, doc}, error::Result};
211 /// # async fn foo() -> Result<()> {
212 /// # let client = Client::with_uri_str("mongodb://localhost:27017").await?;
213 /// # let coll = client.database("stuff").collection::<Document>("stuff");
214 /// let mut cursor = coll.find(doc! {}).await?;
215 /// while cursor.advance().await? {
216 /// println!("{:?}", cursor.current());
217 /// }
218 /// # Ok(())
219 /// # }
220 /// ```
221 pub fn current(&self) -> &RawDocument {
222 self.wrapped_cursor.as_ref().unwrap().current().unwrap()
223 }
224
225 /// Whether this cursor has exhausted all of its getMore calls. The cursor may have more
226 /// items remaining in the buffer.
227 pub(crate) fn is_exhausted(&self) -> bool {
228 self.wrapped_cursor.as_ref().unwrap().is_exhausted()
229 }
230
231 /// Returns true if the cursor has any additional items to return and false otherwise.
232 pub fn has_next(&self) -> bool {
233 !self.is_exhausted()
234 || !self
235 .wrapped_cursor
236 .as_ref()
237 .unwrap()
238 .state()
239 .buffer
240 .is_empty()
241 }
242 /// Deserialize the current result to the generic type associated with this cursor.
243 ///
244 /// # Panics
245 /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::deserialize_current`] can be
246 /// invoked. Calling [`Cursor::deserialize_current`] after [`Cursor::advance`] does not return
247 /// true or without calling [`Cursor::advance`] at all may result in a panic.
248 ///
249 /// ```
250 /// # use mongodb::{Client, error::Result, bson::doc};
251 /// # async fn foo() -> Result<()> {
252 /// # let client = Client::with_uri_str("mongodb://localhost:27017").await?;
253 /// # let db = client.database("foo");
254 /// use serde::Deserialize;
255 ///
256 /// #[derive(Debug, Deserialize)]
257 /// struct Cat<'a> {
258 /// #[serde(borrow)]
259 /// name: &'a str
260 /// }
261 ///
262 /// let coll = db.collection::<Cat>("cat");
263 /// let mut cursor = coll.find(doc! {}).await?;
264 /// while cursor.advance().await? {
265 /// println!("{:?}", cursor.deserialize_current()?);
266 /// }
267 /// # Ok(())
268 /// # }
269 /// ```
270 pub fn deserialize_current<'a>(&'a self) -> Result<T>
271 where
272 T: Deserialize<'a>,
273 {
274 crate::bson_compat::deserialize_from_slice(self.current().as_bytes()).map_err(Error::from)
275 }
276
277 /// Update the type streamed values will be parsed as.
278 pub fn with_type<'a, D>(mut self) -> Cursor<D>
279 where
280 D: Deserialize<'a>,
281 {
282 Cursor {
283 client: self.client.clone(),
284 drop_token: self.drop_token.take(),
285 wrapped_cursor: self.wrapped_cursor.take(),
286 drop_address: self.drop_address.take(),
287 #[cfg(test)]
288 kill_watcher: self.kill_watcher.take(),
289 _phantom: Default::default(),
290 }
291 }
292
293 /// Some tests need to be able to observe the events generated by `killCommand` execution;
294 /// however, because that happens asynchronously on `drop`, the test runner can conclude before
295 /// the event is published. To fix that, tests can set a "kill watcher" on cursors - a
296 /// one-shot channel with a `()` value pushed after `killCommand` is run that the test can wait
297 /// on.
298 #[cfg(test)]
299 pub(crate) fn set_kill_watcher(&mut self, tx: oneshot::Sender<()>) {
300 assert!(
301 self.kill_watcher.is_none(),
302 "cursor already has a kill_watcher"
303 );
304 self.kill_watcher = Some(tx);
305 }
306
307 #[cfg(test)]
308 pub(crate) fn current_batch(&self) -> &VecDeque<RawDocumentBuf> {
309 self.wrapped_cursor.as_ref().unwrap().current_batch()
310 }
311}
312
313impl<T> CursorStream for Cursor<T>
314where
315 T: DeserializeOwned,
316{
317 fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
318 self.wrapped_cursor.as_mut().unwrap().poll_next_in_batch(cx)
319 }
320}
321
322impl<T> Stream for Cursor<T>
323where
324 T: DeserializeOwned,
325{
326 type Item = Result<T>;
327
328 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
329 // This `unwrap` is safe because `wrapped_cursor` is always `Some` outside of `drop`.
330 stream_poll_next(self.wrapped_cursor.as_mut().unwrap(), cx)
331 }
332}
333
334impl<T> Drop for Cursor<T> {
335 fn drop(&mut self) {
336 let wrapped_cursor = match &self.wrapped_cursor {
337 None => return,
338 Some(c) => c,
339 };
340 if wrapped_cursor.is_exhausted() {
341 return;
342 }
343
344 kill_cursor(
345 self.client.clone(),
346 &mut self.drop_token,
347 wrapped_cursor.namespace(),
348 wrapped_cursor.id(),
349 wrapped_cursor.pinned_connection().replicate(),
350 self.drop_address.take(),
351 #[cfg(test)]
352 self.kill_watcher.take(),
353 );
354 }
355}
356
357/// A `GenericCursor` that optionally owns its own sessions.
358/// This is to be used by cursors associated with implicit sessions.
359type ImplicitSessionCursor = GenericCursor<'static, ImplicitClientSessionHandle>;