mongodb/
cursor.rs

1mod common;
2pub(crate) mod session;
3
4#[cfg(test)]
5use std::collections::VecDeque;
6use std::{
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11use crate::bson::RawDocument;
12
13#[cfg(test)]
14use crate::bson::RawDocumentBuf;
15use derive_where::derive_where;
16use futures_core::Stream;
17use serde::{de::DeserializeOwned, Deserialize};
18#[cfg(test)]
19use tokio::sync::oneshot;
20
21use crate::{
22    change_stream::event::ResumeToken,
23    client::{options::ServerAddress, AsyncDropToken},
24    cmap::conn::PinnedConnectionHandle,
25    cursor::common::ImplicitClientSessionHandle,
26    error::{Error, Result},
27    Client,
28    ClientSession,
29};
30use common::{kill_cursor, GenericCursor};
31pub(crate) use common::{
32    stream_poll_next,
33    BatchValue,
34    CursorInformation,
35    CursorSpecification,
36    CursorStream,
37    NextInBatchFuture,
38    PinnedConnection,
39};
40
41/// A [`Cursor`] streams the result of a query. When a query is made, the returned [`Cursor`] will
42/// contain the first batch of results from the server; the individual results will then be returned
43/// as the [`Cursor`] is iterated. When the batch is exhausted and if there are more results, the
44/// [`Cursor`] will fetch the next batch of documents, and so forth until the results are exhausted.
45/// Note that because of this batching, additional network I/O may occur on any given call to
46/// `next`. Because of this, a [`Cursor`] iterates over `Result<T>` items rather than
47/// simply `T` items.
48///
49/// The batch size of the `Cursor` can be configured using the options to the method that returns
50/// it. For example, setting the `batch_size` field of
51/// [`FindOptions`](options/struct.FindOptions.html) will set the batch size of the
52/// `Cursor` returned by [`Collection::find`](struct.Collection.html#method.find).
53///
54/// Note that the batch size determines both the number of documents stored in memory by the
55/// `Cursor` at a given time as well as the total number of network round-trips needed to fetch all
56/// results from the server; both of these factors should be taken into account when choosing the
57/// optimal batch size.
58///
59/// [`Cursor`] implements [`Stream`](https://docs.rs/futures/latest/futures/stream/trait.Stream.html), which means
60/// it can be iterated over much in the same way that an `Iterator` can be in synchronous Rust. In
61/// order to do so, the [`StreamExt`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html) trait must
62/// be imported. Because a [`Cursor`] iterates over a `Result<T>`, it also has access to the
63/// potentially more ergonomic functionality provided by
64/// [`TryStreamExt`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html), which can be
65/// imported instead of or in addition to
66/// [`StreamExt`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html). The methods from
67/// [`TryStreamExt`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html) are especially useful when
68/// used in conjunction with the `?` operator.
69///
70/// ```rust
71/// # use mongodb::{bson::{Document, doc}, Client, error::Result};
72/// #
73/// # async fn do_stuff() -> Result<()> {
74/// # let client = Client::with_uri_str("mongodb://example.com").await?;
75/// # let coll = client.database("foo").collection::<Document>("bar");
76/// #
77/// use futures::stream::{StreamExt, TryStreamExt};
78///
79/// let mut cursor = coll.find(doc! {}).await?;
80/// // regular Stream uses next() and iterates over Option<Result<T>>
81/// while let Some(doc) = cursor.next().await {
82///   println!("{}", doc?)
83/// }
84/// // regular Stream uses collect() and collects into a Vec<Result<T>>
85/// let v: Vec<Result<_>> = cursor.collect().await;
86///
87/// let mut cursor = coll.find(doc! {}).await?;
88/// // TryStream uses try_next() and iterates over Result<Option<T>>
89/// while let Some(doc) = cursor.try_next().await? {
90///   println!("{}", doc)
91/// }
92/// // TryStream uses try_collect() and collects into a Result<Vec<T>>
93/// let v: Vec<_> = cursor.try_collect().await?;
94/// #
95/// # Ok(())
96/// # }
97/// ```
98///
99/// If a [`Cursor`] is still open when it goes out of scope, it will automatically be closed via an
100/// asynchronous [killCursors](https://www.mongodb.com/docs/manual/reference/command/killCursors/) command executed
101/// from its [`Drop`](https://doc.rust-lang.org/std/ops/trait.Drop.html) implementation.
102#[derive_where(Debug)]
103pub struct Cursor<T> {
104    client: Client,
105    drop_token: AsyncDropToken,
106    // `wrapped_cursor` is an `Option` so that it can be `None` for the `drop` impl for a cursor
107    // that's had `with_type` called; in all other circumstances it will be `Some`.
108    wrapped_cursor: Option<ImplicitSessionCursor>,
109    drop_address: Option<ServerAddress>,
110    #[cfg(test)]
111    kill_watcher: Option<oneshot::Sender<()>>,
112    #[derive_where(skip)]
113    _phantom: std::marker::PhantomData<fn() -> T>,
114}
115
116impl<T> Cursor<T> {
117    pub(crate) fn new(
118        client: Client,
119        spec: CursorSpecification,
120        session: Option<ClientSession>,
121        pin: Option<PinnedConnectionHandle>,
122    ) -> Self {
123        Self {
124            client: client.clone(),
125            drop_token: client.register_async_drop(),
126            wrapped_cursor: Some(ImplicitSessionCursor::with_implicit_session(
127                client,
128                spec,
129                PinnedConnection::new(pin),
130                ImplicitClientSessionHandle(session),
131            )),
132            drop_address: None,
133            #[cfg(test)]
134            kill_watcher: None,
135            _phantom: Default::default(),
136        }
137    }
138
139    pub(crate) fn post_batch_resume_token(&self) -> Option<&ResumeToken> {
140        self.wrapped_cursor
141            .as_ref()
142            .and_then(|c| c.post_batch_resume_token())
143    }
144
145    pub(crate) fn client(&self) -> &Client {
146        &self.client
147    }
148
149    pub(crate) fn address(&self) -> &ServerAddress {
150        self.wrapped_cursor.as_ref().unwrap().address()
151    }
152
153    pub(crate) fn set_drop_address(&mut self, address: ServerAddress) {
154        self.drop_address = Some(address);
155    }
156
157    pub(crate) fn take_implicit_session(&mut self) -> Option<ClientSession> {
158        self.wrapped_cursor
159            .as_mut()
160            .and_then(|c| c.take_implicit_session())
161    }
162
163    /// Move the cursor forward, potentially triggering requests to the database for more results
164    /// if the local buffer has been exhausted.
165    ///
166    /// This will keep requesting data from the server until either the cursor is exhausted
167    /// or batch with results in it has been received.
168    ///
169    /// The return value indicates whether new results were successfully returned (true) or if
170    /// the cursor has been closed (false).
171    ///
172    /// Note: [`Cursor::current`] and [`Cursor::deserialize_current`] must only be called after
173    /// [`Cursor::advance`] returned `Ok(true)`. It is an error to call either of them without
174    /// calling [`Cursor::advance`] first or after [`Cursor::advance`] returns an error / false.
175    ///
176    /// ```
177    /// # use mongodb::{Client, bson::{Document, doc}, error::Result};
178    /// # async fn foo() -> Result<()> {
179    /// # let client = Client::with_uri_str("mongodb://localhost:27017").await?;
180    /// # let coll = client.database("stuff").collection::<Document>("stuff");
181    /// let mut cursor = coll.find(doc! {}).await?;
182    /// while cursor.advance().await? {
183    ///     println!("{:?}", cursor.current());
184    /// }
185    /// # Ok(())
186    /// # }
187    /// ```
188    pub async fn advance(&mut self) -> Result<bool> {
189        self.wrapped_cursor.as_mut().unwrap().advance().await
190    }
191
192    #[cfg(test)]
193    pub(crate) async fn try_advance(&mut self) -> Result<()> {
194        self.wrapped_cursor
195            .as_mut()
196            .unwrap()
197            .try_advance()
198            .await
199            .map(|_| ())
200    }
201
202    /// Returns a reference to the current result in the cursor.
203    ///
204    /// # Panics
205    /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::current`] can be
206    /// invoked. Calling [`Cursor::current`] after [`Cursor::advance`] does not return true
207    /// or without calling [`Cursor::advance`] at all may result in a panic.
208    ///
209    /// ```
210    /// # use mongodb::{Client, bson::{Document, doc}, error::Result};
211    /// # async fn foo() -> Result<()> {
212    /// # let client = Client::with_uri_str("mongodb://localhost:27017").await?;
213    /// # let coll = client.database("stuff").collection::<Document>("stuff");
214    /// let mut cursor = coll.find(doc! {}).await?;
215    /// while cursor.advance().await? {
216    ///     println!("{:?}", cursor.current());
217    /// }
218    /// # Ok(())
219    /// # }
220    /// ```
221    pub fn current(&self) -> &RawDocument {
222        self.wrapped_cursor.as_ref().unwrap().current().unwrap()
223    }
224
225    /// Whether this cursor has exhausted all of its getMore calls. The cursor may have more
226    /// items remaining in the buffer.
227    pub(crate) fn is_exhausted(&self) -> bool {
228        self.wrapped_cursor.as_ref().unwrap().is_exhausted()
229    }
230
231    /// Returns true if the cursor has any additional items to return and false otherwise.
232    pub fn has_next(&self) -> bool {
233        !self.is_exhausted()
234            || !self
235                .wrapped_cursor
236                .as_ref()
237                .unwrap()
238                .state()
239                .buffer
240                .is_empty()
241    }
242    /// Deserialize the current result to the generic type associated with this cursor.
243    ///
244    /// # Panics
245    /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::deserialize_current`] can be
246    /// invoked. Calling [`Cursor::deserialize_current`] after [`Cursor::advance`] does not return
247    /// true or without calling [`Cursor::advance`] at all may result in a panic.
248    ///
249    /// ```
250    /// # use mongodb::{Client, error::Result, bson::doc};
251    /// # async fn foo() -> Result<()> {
252    /// # let client = Client::with_uri_str("mongodb://localhost:27017").await?;
253    /// # let db = client.database("foo");
254    /// use serde::Deserialize;
255    ///
256    /// #[derive(Debug, Deserialize)]
257    /// struct Cat<'a> {
258    ///     #[serde(borrow)]
259    ///     name: &'a str
260    /// }
261    ///
262    /// let coll = db.collection::<Cat>("cat");
263    /// let mut cursor = coll.find(doc! {}).await?;
264    /// while cursor.advance().await? {
265    ///     println!("{:?}", cursor.deserialize_current()?);
266    /// }
267    /// # Ok(())
268    /// # }
269    /// ```
270    pub fn deserialize_current<'a>(&'a self) -> Result<T>
271    where
272        T: Deserialize<'a>,
273    {
274        crate::bson_compat::deserialize_from_slice(self.current().as_bytes()).map_err(Error::from)
275    }
276
277    /// Update the type streamed values will be parsed as.
278    pub fn with_type<'a, D>(mut self) -> Cursor<D>
279    where
280        D: Deserialize<'a>,
281    {
282        Cursor {
283            client: self.client.clone(),
284            drop_token: self.drop_token.take(),
285            wrapped_cursor: self.wrapped_cursor.take(),
286            drop_address: self.drop_address.take(),
287            #[cfg(test)]
288            kill_watcher: self.kill_watcher.take(),
289            _phantom: Default::default(),
290        }
291    }
292
293    /// Some tests need to be able to observe the events generated by `killCommand` execution;
294    /// however, because that happens asynchronously on `drop`, the test runner can conclude before
295    /// the event is published.  To fix that, tests can set a "kill watcher" on cursors - a
296    /// one-shot channel with a `()` value pushed after `killCommand` is run that the test can wait
297    /// on.
298    #[cfg(test)]
299    pub(crate) fn set_kill_watcher(&mut self, tx: oneshot::Sender<()>) {
300        assert!(
301            self.kill_watcher.is_none(),
302            "cursor already has a kill_watcher"
303        );
304        self.kill_watcher = Some(tx);
305    }
306
307    #[cfg(test)]
308    pub(crate) fn current_batch(&self) -> &VecDeque<RawDocumentBuf> {
309        self.wrapped_cursor.as_ref().unwrap().current_batch()
310    }
311}
312
313impl<T> CursorStream for Cursor<T>
314where
315    T: DeserializeOwned,
316{
317    fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
318        self.wrapped_cursor.as_mut().unwrap().poll_next_in_batch(cx)
319    }
320}
321
322impl<T> Stream for Cursor<T>
323where
324    T: DeserializeOwned,
325{
326    type Item = Result<T>;
327
328    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
329        // This `unwrap` is safe because `wrapped_cursor` is always `Some` outside of `drop`.
330        stream_poll_next(self.wrapped_cursor.as_mut().unwrap(), cx)
331    }
332}
333
334impl<T> Drop for Cursor<T> {
335    fn drop(&mut self) {
336        let wrapped_cursor = match &self.wrapped_cursor {
337            None => return,
338            Some(c) => c,
339        };
340        if wrapped_cursor.is_exhausted() {
341            return;
342        }
343
344        kill_cursor(
345            self.client.clone(),
346            &mut self.drop_token,
347            wrapped_cursor.namespace(),
348            wrapped_cursor.id(),
349            wrapped_cursor.pinned_connection().replicate(),
350            self.drop_address.take(),
351            #[cfg(test)]
352            self.kill_watcher.take(),
353        );
354    }
355}
356
357/// A `GenericCursor` that optionally owns its own sessions.
358/// This is to be used by cursors associated with implicit sessions.
359type ImplicitSessionCursor = GenericCursor<'static, ImplicitClientSessionHandle>;