mongodb/change_stream.rs
1//! Contains the functionality for change streams.
2pub(crate) mod common;
3pub mod event;
4pub(crate) mod options;
5pub mod session;
6
7#[cfg(test)]
8use std::collections::VecDeque;
9use std::{
10 pin::Pin,
11 task::{Context, Poll},
12};
13
14use crate::error::Error;
15use derive_where::derive_where;
16use futures_core::{future::BoxFuture, Stream};
17use futures_util::FutureExt;
18use serde::de::DeserializeOwned;
19#[cfg(test)]
20use tokio::sync::oneshot;
21
22use crate::{change_stream::event::ResumeToken, error::Result, Cursor};
23use common::{ChangeStreamData, WatchArgs};
24
25/// A `ChangeStream` streams the ongoing changes of its associated collection, database or
26/// deployment. `ChangeStream` instances should be created with method `watch` against the relevant
27/// target.
28///
29/// `ChangeStream`s are "resumable", meaning that they can be restarted at a given place in the
30/// stream of events. This is done automatically when the `ChangeStream` encounters certain
31/// ["resumable"](https://specifications.readthedocs.io/en/latest/change-streams/change-streams/#resumable-error)
32/// errors, such as transient network failures. It can also be done manually by passing
33/// a [`ResumeToken`] retrieved from a past event into either the
34/// [`resume_after`](crate::action::Watch::resume_after) or
35/// [`start_after`](crate::action::Watch::start_after) (4.2+) options used to create the
36/// `ChangeStream`. Issuing a raw change stream aggregation is discouraged unless users wish to
37/// explicitly opt out of resumability.
38///
39/// A `ChangeStream` can be iterated like any other [`Stream`]:
40///
41/// ```
42/// # use futures::stream::StreamExt;
43/// # use mongodb::{Client, error::Result, bson::doc,
44/// # change_stream::event::ChangeStreamEvent};
45/// # use tokio::task;
46/// #
47/// # async fn func() -> Result<()> {
48/// # let client = Client::with_uri_str("mongodb://example.com").await?;
49/// # let coll = client.database("foo").collection("bar");
50/// let mut change_stream = coll.watch().await?;
51/// let coll_ref = coll.clone();
52/// task::spawn(async move {
53/// coll_ref.insert_one(doc! { "x": 1 }).await;
54/// });
55/// while let Some(event) = change_stream.next().await.transpose()? {
56/// println!("operation performed: {:?}, document: {:?}", event.operation_type, event.full_document);
57/// // operation performed: Insert, document: Some(Document({"x": Int32(1)}))
58/// }
59/// #
60/// # Ok(())
61/// # }
62/// ```
63///
64/// If a [`ChangeStream`] is still open when it goes out of scope, it will automatically be closed
65/// via an asynchronous [killCursors](https://www.mongodb.com/docs/manual/reference/command/killCursors/) command executed
66/// from its [`Drop`](https://doc.rust-lang.org/std/ops/trait.Drop.html) implementation.
67///
68/// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams) for more
69/// details. Also see the documentation on [usage recommendations](https://www.mongodb.com/docs/manual/administration/change-streams-production-recommendations/).
70#[derive_where(Debug)]
71pub struct ChangeStream<T>
72where
73 T: DeserializeOwned,
74{
75 inner: StreamState<T>,
76}
77
78impl<T> ChangeStream<T>
79where
80 T: DeserializeOwned,
81{
82 pub(crate) fn new(cursor: Cursor<()>, args: WatchArgs, data: ChangeStreamData) -> Self {
83 Self {
84 inner: StreamState::Idle(CursorWrapper::new(cursor, args, data)),
85 }
86 }
87
88 /// Returns the cached resume token that can be used to resume after the most recently returned
89 /// change.
90 ///
91 /// See the documentation
92 /// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for more
93 /// information on change stream resume tokens.
94 pub fn resume_token(&self) -> Option<ResumeToken> {
95 self.inner.state().data.resume_token.clone()
96 }
97
98 /// Update the type streamed values will be parsed as.
99 pub fn with_type<D: DeserializeOwned>(self) -> ChangeStream<D> {
100 ChangeStream {
101 inner: StreamState::Idle(self.inner.take_state()),
102 }
103 }
104
105 /// Returns whether the change stream will continue to receive events.
106 pub fn is_alive(&self) -> bool {
107 !self.inner.state().cursor.raw().is_exhausted()
108 }
109
110 /// Retrieves the next result from the change stream, if any.
111 ///
112 /// Where calling `Stream::next` will internally loop until a change document is received,
113 /// this will make at most one request and return `None` if the returned document batch is
114 /// empty. This method should be used when storing the resume token in order to ensure the
115 /// most up to date token is received, e.g.
116 ///
117 /// ```
118 /// # use mongodb::{Client, Collection, bson::Document, error::Result};
119 /// # async fn func() -> Result<()> {
120 /// # let client = Client::with_uri_str("mongodb://example.com").await?;
121 /// # let coll: Collection<Document> = client.database("foo").collection("bar");
122 /// let mut change_stream = coll.watch().await?;
123 /// let mut resume_token = None;
124 /// while change_stream.is_alive() {
125 /// if let Some(event) = change_stream.next_if_any().await? {
126 /// // process event
127 /// }
128 /// resume_token = change_stream.resume_token();
129 /// }
130 /// #
131 /// # Ok(())
132 /// # }
133 /// ```
134 pub async fn next_if_any(&mut self) -> Result<Option<T>> {
135 self.inner.state_mut().next_if_any(&mut ()).await
136 }
137
138 #[cfg(test)]
139 pub(crate) fn set_kill_watcher(&mut self, tx: oneshot::Sender<()>) {
140 self.inner.state_mut().cursor.raw_mut().set_kill_watcher(tx);
141 }
142
143 #[cfg(test)]
144 pub(crate) fn current_batch(&self) -> &VecDeque<crate::bson::RawDocumentBuf> {
145 self.inner.state().cursor.batch()
146 }
147
148 #[cfg(test)]
149 pub(crate) fn client(&self) -> &crate::Client {
150 self.inner.state().cursor.raw().client()
151 }
152}
153
154impl<T> Stream for ChangeStream<T>
155where
156 T: DeserializeOwned,
157{
158 type Item = Result<T>;
159
160 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
161 Pin::new(&mut self.inner).poll_next(cx)
162 }
163}
164
165type CursorWrapper = common::CursorWrapper<Cursor<()>>;
166
167// This is almost entirely the same as `crate::cursor::stream::Stream`. However, making a generic
168// version to underlie both has the side effect of changing the variance on `T` from covariant to
169// invariant, which breaks cursor zero-copy deserialization :(
170#[derive_where(Debug)]
171enum StreamState<T: DeserializeOwned> {
172 Idle(CursorWrapper),
173 Polling,
174 Next(#[derive_where(skip)] BoxFuture<'static, NextDone<T>>),
175}
176
177struct NextDone<T> {
178 state: CursorWrapper,
179 out: Result<Option<T>>,
180}
181
182impl<T: DeserializeOwned> StreamState<T> {
183 fn state(&self) -> &CursorWrapper {
184 match self {
185 Self::Idle(st) => st,
186 _ => panic!("invalid change stream state access"),
187 }
188 }
189
190 fn state_mut(&mut self) -> &mut CursorWrapper {
191 match self {
192 Self::Idle(st) => st,
193 _ => panic!("invalid change stream state access"),
194 }
195 }
196
197 fn take_state(self) -> CursorWrapper {
198 match self {
199 Self::Idle(st) => st,
200 _ => panic!("invalid change stream state access"),
201 }
202 }
203}
204
205impl<T: DeserializeOwned> Stream for StreamState<T> {
206 type Item = Result<T>;
207
208 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
209 loop {
210 match std::mem::replace(&mut *self, StreamState::Polling) {
211 StreamState::Idle(mut state) => {
212 *self = StreamState::Next(
213 async move {
214 let out = state.next_if_any(&mut ()).await;
215 NextDone { state, out }
216 }
217 .boxed(),
218 );
219 continue;
220 }
221 StreamState::Next(mut fut) => match fut.poll_unpin(cx) {
222 Poll::Pending => {
223 *self = StreamState::Next(fut);
224 return Poll::Pending;
225 }
226 Poll::Ready(NextDone { state, out }) => {
227 *self = StreamState::Idle(state);
228 match out {
229 Ok(Some(v)) => return Poll::Ready(Some(Ok(v))),
230 Ok(None) => continue,
231 Err(e) => return Poll::Ready(Some(Err(e))),
232 }
233 }
234 },
235 StreamState::Polling => {
236 return Poll::Ready(Some(Err(Error::internal(
237 "attempt to poll change stream already in polling state",
238 ))))
239 }
240 }
241 }
242 }
243}
244
245impl common::InnerCursor for Cursor<()> {
246 type Session = ();
247
248 async fn try_advance(&mut self, _session: &mut Self::Session) -> Result<bool> {
249 self.try_advance().await
250 }
251
252 fn get_resume_token(&self) -> Result<Option<ResumeToken>> {
253 common::get_resume_token(self.batch(), self.raw().post_batch_resume_token())
254 }
255
256 fn current(&self) -> &crate::bson::RawDocument {
257 self.current()
258 }
259
260 async fn execute_watch(
261 &mut self,
262 args: WatchArgs,
263 mut data: ChangeStreamData,
264 _session: &mut Self::Session,
265 ) -> Result<(Self, WatchArgs)> {
266 data.implicit_session = self.raw_mut().take_implicit_session();
267 let new_stream: ChangeStream<event::ChangeStreamEvent<()>> = self
268 .raw()
269 .client()
270 .execute_watch(args.pipeline, args.options, args.target, Some(data))
271 .await?;
272 let new_wrapper = new_stream.inner.take_state();
273 Ok((new_wrapper.cursor, new_wrapper.args))
274 }
275
276 fn set_drop_address(&mut self, from: &Self) {
277 self.raw_mut()
278 .set_drop_address(from.raw().address().clone());
279 }
280}