mongodb/cursor.rs
1pub(crate) mod common;
2pub mod raw_batch;
3pub(crate) mod session;
4mod stream;
5#[cfg(feature = "sync")]
6pub(crate) mod sync;
7
8use std::task::Poll;
9
10use derive_where::derive_where;
11use futures_core::Stream as AsyncStream;
12use futures_util::stream::StreamExt;
13use serde::{de::DeserializeOwned, Deserialize};
14
15use crate::{
16 bson::RawDocument,
17 cmap::conn::PinnedConnectionHandle,
18 error::Result,
19 Client,
20 ClientSession,
21};
22
23/// A [`Cursor`] streams the result of a query. When a query is made, the returned [`Cursor`] will
24/// contain the first batch of results from the server; the individual results will then be returned
25/// as the [`Cursor`] is iterated. When the batch is exhausted and if there are more results, the
26/// [`Cursor`] will fetch the next batch of documents, and so forth until the results are exhausted.
27/// Note that because of this batching, additional network I/O may occur on any given call to
28/// `next`. Because of this, a [`Cursor`] iterates over `Result<T>` items rather than
29/// simply `T` items.
30///
31/// The batch size of the `Cursor` can be configured using the options to the method that returns
32/// it. For example, setting the `batch_size` field of
33/// [`FindOptions`](options/struct.FindOptions.html) will set the batch size of the
34/// `Cursor` returned by [`Collection::find`](struct.Collection.html#method.find).
35///
36/// Note that the batch size determines both the number of documents stored in memory by the
37/// `Cursor` at a given time as well as the total number of network round-trips needed to fetch all
38/// results from the server; both of these factors should be taken into account when choosing the
39/// optimal batch size.
40///
41/// [`Cursor`] implements [`Stream`](https://docs.rs/futures/latest/futures/stream/trait.Stream.html), which means
42/// it can be iterated over much in the same way that an `Iterator` can be in synchronous Rust. In
43/// order to do so, the [`StreamExt`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html) trait must
44/// be imported. Because a [`Cursor`] iterates over a `Result<T>`, it also has access to the
45/// potentially more ergonomic functionality provided by
46/// [`TryStreamExt`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html), which can be
47/// imported instead of or in addition to
48/// [`StreamExt`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html). The methods from
49/// [`TryStreamExt`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html) are especially useful when
50/// used in conjunction with the `?` operator.
51///
52/// ```rust
53/// # use mongodb::{bson::{Document, doc}, Client, error::Result};
54/// #
55/// # async fn do_stuff() -> Result<()> {
56/// # let client = Client::with_uri_str("mongodb://example.com").await?;
57/// # let coll = client.database("foo").collection::<Document>("bar");
58/// #
59/// use futures::stream::{StreamExt, TryStreamExt};
60///
61/// let mut cursor = coll.find(doc! {}).await?;
62/// // regular Stream uses next() and iterates over Option<Result<T>>
63/// while let Some(doc) = cursor.next().await {
64/// println!("{}", doc?)
65/// }
66/// // regular Stream uses collect() and collects into a Vec<Result<T>>
67/// let v: Vec<Result<_>> = cursor.collect().await;
68///
69/// let mut cursor = coll.find(doc! {}).await?;
70/// // TryStream uses try_next() and iterates over Result<Option<T>>
71/// while let Some(doc) = cursor.try_next().await? {
72/// println!("{}", doc)
73/// }
74/// // TryStream uses try_collect() and collects into a Result<Vec<T>>
75/// let v: Vec<_> = cursor.try_collect().await?;
76/// #
77/// # Ok(())
78/// # }
79/// ```
80///
81/// If a [`Cursor`] is still open when it goes out of scope, it will automatically be closed via an
82/// asynchronous [killCursors](https://www.mongodb.com/docs/manual/reference/command/killCursors/) command executed
83/// from its [`Drop`](https://doc.rust-lang.org/std/ops/trait.Drop.html) implementation.
84#[derive_where(Debug)]
85pub struct Cursor<T> {
86 stream: stream::Stream<'static, raw_batch::RawBatchCursor, T>,
87}
88
89impl<T> Cursor<T> {
90 /// Move the cursor forward, potentially triggering requests to the database for more results
91 /// if the local buffer has been exhausted.
92 ///
93 /// This will keep requesting data from the server until either the cursor is exhausted
94 /// or batch with results in it has been received.
95 ///
96 /// The return value indicates whether new results were successfully returned (true) or if
97 /// the cursor has been closed (false).
98 ///
99 /// Note: [`Cursor::current`] and [`Cursor::deserialize_current`] must only be called after
100 /// [`Cursor::advance`] returned `Ok(true)`. It is an error to call either of them without
101 /// calling [`Cursor::advance`] first or after [`Cursor::advance`] returns an error / false.
102 ///
103 /// ```
104 /// # use mongodb::{Client, bson::{Document, doc}, error::Result};
105 /// # async fn foo() -> Result<()> {
106 /// # let client = Client::with_uri_str("mongodb://localhost:27017").await?;
107 /// # let coll = client.database("stuff").collection::<Document>("stuff");
108 /// let mut cursor = coll.find(doc! {}).await?;
109 /// while cursor.advance().await? {
110 /// println!("{:?}", cursor.current());
111 /// }
112 /// # Ok(())
113 /// # }
114 /// ```
115 pub async fn advance(&mut self) -> Result<bool> {
116 self.stream.buffer_mut().advance().await
117 }
118
119 /// Returns a reference to the current result in the cursor.
120 ///
121 /// # Panics
122 /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::current`] can be
123 /// invoked. Calling [`Cursor::current`] after [`Cursor::advance`] does not return true
124 /// or without calling [`Cursor::advance`] at all may result in a panic.
125 ///
126 /// ```
127 /// # use mongodb::{Client, bson::{Document, doc}, error::Result};
128 /// # async fn foo() -> Result<()> {
129 /// # let client = Client::with_uri_str("mongodb://localhost:27017").await?;
130 /// # let coll = client.database("stuff").collection::<Document>("stuff");
131 /// let mut cursor = coll.find(doc! {}).await?;
132 /// while cursor.advance().await? {
133 /// println!("{:?}", cursor.current());
134 /// }
135 /// # Ok(())
136 /// # }
137 /// ```
138 pub fn current(&self) -> &RawDocument {
139 self.stream.buffer().current()
140 }
141
142 /// Returns true if the cursor has any additional items to return and false otherwise.
143 pub fn has_next(&self) -> bool {
144 let state = self.stream.buffer();
145 !state.batch().is_empty() || state.raw.has_next()
146 }
147
148 /// Deserialize the current result to the generic type associated with this cursor.
149 ///
150 /// # Panics
151 /// [`Cursor::advance`] must return `Ok(true)` before [`Cursor::deserialize_current`] can be
152 /// invoked. Calling [`Cursor::deserialize_current`] after [`Cursor::advance`] does not return
153 /// true or without calling [`Cursor::advance`] at all may result in a panic.
154 ///
155 /// ```
156 /// # use mongodb::{Client, error::Result, bson::doc};
157 /// # async fn foo() -> Result<()> {
158 /// # let client = Client::with_uri_str("mongodb://localhost:27017").await?;
159 /// # let db = client.database("foo");
160 /// use serde::Deserialize;
161 ///
162 /// #[derive(Debug, Deserialize)]
163 /// struct Cat<'a> {
164 /// #[serde(borrow)]
165 /// name: &'a str
166 /// }
167 ///
168 /// let coll = db.collection::<Cat>("cat");
169 /// let mut cursor = coll.find(doc! {}).await?;
170 /// while cursor.advance().await? {
171 /// println!("{:?}", cursor.deserialize_current()?);
172 /// }
173 /// # Ok(())
174 /// # }
175 /// ```
176 pub fn deserialize_current<'a>(&'a self) -> Result<T>
177 where
178 T: Deserialize<'a>,
179 {
180 self.stream.buffer().deserialize_current()
181 }
182
183 /// Update the type streamed values will be parsed as.
184 pub fn with_type<'a, D>(self) -> Cursor<D>
185 where
186 D: Deserialize<'a>,
187 {
188 Cursor {
189 stream: self.stream.with_type(),
190 }
191 }
192
193 pub(crate) fn raw(&self) -> &raw_batch::RawBatchCursor {
194 &self.stream.buffer().raw
195 }
196
197 pub(crate) fn raw_mut(&mut self) -> &mut raw_batch::RawBatchCursor {
198 &mut self.stream.buffer_mut().raw
199 }
200
201 pub(crate) async fn try_advance(&mut self) -> Result<bool> {
202 self.stream.buffer_mut().try_advance().await
203 }
204
205 pub(crate) fn batch(&self) -> &std::collections::VecDeque<crate::bson::RawDocumentBuf> {
206 self.stream.buffer().batch()
207 }
208}
209
210pub(crate) trait NewCursor: Sized {
211 fn generic_new(
212 client: Client,
213 spec: common::CursorSpecification,
214 implicit_session: Option<ClientSession>,
215 pinned: Option<PinnedConnectionHandle>,
216 ) -> Result<Self>;
217}
218
219impl<T> NewCursor for Cursor<T> {
220 fn generic_new(
221 client: Client,
222 spec: common::CursorSpecification,
223 implicit_session: Option<ClientSession>,
224 pinned: Option<PinnedConnectionHandle>,
225 ) -> Result<Self> {
226 let raw = crate::cursor::raw_batch::RawBatchCursor::generic_new(
227 client,
228 spec,
229 implicit_session,
230 pinned,
231 )?;
232 Ok(Self {
233 stream: stream::Stream::new(raw),
234 })
235 }
236}
237
238impl<T> AsyncStream for Cursor<T>
239where
240 T: DeserializeOwned,
241{
242 type Item = Result<T>;
243
244 fn poll_next(
245 mut self: std::pin::Pin<&mut Self>,
246 cx: &mut std::task::Context<'_>,
247 ) -> Poll<Option<Self::Item>> {
248 self.stream.poll_next_unpin(cx)
249 }
250}