Skip to main content

mongodb/
change_stream.rs

1//! Contains the functionality for change streams.
2pub(crate) mod common;
3pub mod event;
4pub(crate) mod options;
5pub mod session;
6
7#[cfg(test)]
8use std::collections::VecDeque;
9use std::{
10    pin::Pin,
11    task::{Context, Poll},
12};
13
14use crate::error::Error;
15use derive_where::derive_where;
16use futures_core::{future::BoxFuture, Stream};
17use futures_util::FutureExt;
18use serde::de::DeserializeOwned;
19#[cfg(test)]
20use tokio::sync::oneshot;
21
22use crate::{change_stream::event::ResumeToken, error::Result, Cursor};
23use common::{ChangeStreamData, WatchArgs};
24
25/// A `ChangeStream` streams the ongoing changes of its associated collection, database or
26/// deployment. `ChangeStream` instances should be created with method `watch` against the relevant
27/// target.
28///
29/// `ChangeStream`s are "resumable", meaning that they can be restarted at a given place in the
30/// stream of events. This is done automatically when the `ChangeStream` encounters certain
31/// ["resumable"](https://specifications.readthedocs.io/en/latest/change-streams/change-streams/#resumable-error)
32/// errors, such as transient network failures. It can also be done manually by passing
33/// a [`ResumeToken`] retrieved from a past event into either the
34/// [`resume_after`](crate::action::Watch::resume_after) or
35/// [`start_after`](crate::action::Watch::start_after) (4.2+) options used to create the
36/// `ChangeStream`. Issuing a raw change stream aggregation is discouraged unless users wish to
37/// explicitly opt out of resumability.
38///
39/// A `ChangeStream` can be iterated like any other [`Stream`]:
40///
41/// ```
42/// # use futures::stream::StreamExt;
43/// # use mongodb::{Client, error::Result, bson::doc,
44/// # change_stream::event::ChangeStreamEvent};
45/// # use tokio::task;
46/// #
47/// # async fn func() -> Result<()> {
48/// # let client = Client::with_uri_str("mongodb://example.com").await?;
49/// # let coll = client.database("foo").collection("bar");
50/// let mut change_stream = coll.watch().await?;
51/// let coll_ref = coll.clone();
52/// task::spawn(async move {
53///     coll_ref.insert_one(doc! { "x": 1 }).await;
54/// });
55/// while let Some(event) = change_stream.next().await.transpose()? {
56///     println!("operation performed: {:?}, document: {:?}", event.operation_type, event.full_document);
57///     // operation performed: Insert, document: Some(Document({"x": Int32(1)}))
58/// }
59/// #
60/// # Ok(())
61/// # }
62/// ```
63///
64/// If a [`ChangeStream`] is still open when it goes out of scope, it will automatically be closed
65/// via an asynchronous [killCursors](https://www.mongodb.com/docs/manual/reference/command/killCursors/) command executed
66/// from its [`Drop`](https://doc.rust-lang.org/std/ops/trait.Drop.html) implementation.
67///
68/// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams) for more
69/// details. Also see the documentation on [usage recommendations](https://www.mongodb.com/docs/manual/administration/change-streams-production-recommendations/).
70#[derive_where(Debug)]
71pub struct ChangeStream<T>
72where
73    T: DeserializeOwned,
74{
75    inner: StreamState<T>,
76}
77
78impl<T> ChangeStream<T>
79where
80    T: DeserializeOwned,
81{
82    pub(crate) fn new(cursor: Cursor<()>, args: WatchArgs, data: ChangeStreamData) -> Self {
83        Self {
84            inner: StreamState::Idle(CursorWrapper::new(cursor, args, data)),
85        }
86    }
87
88    /// Returns the cached resume token that can be used to resume after the most recently returned
89    /// change.
90    ///
91    /// See the documentation
92    /// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for more
93    /// information on change stream resume tokens.
94    pub fn resume_token(&self) -> Option<ResumeToken> {
95        self.inner.state().data.resume_token.clone()
96    }
97
98    /// Update the type streamed values will be parsed as.
99    pub fn with_type<D: DeserializeOwned>(self) -> ChangeStream<D> {
100        ChangeStream {
101            inner: StreamState::Idle(self.inner.take_state()),
102        }
103    }
104
105    /// Returns whether the change stream will continue to receive events.
106    pub fn is_alive(&self) -> bool {
107        !self.inner.state().cursor.raw().is_exhausted()
108    }
109
110    /// Retrieves the next result from the change stream, if any.
111    ///
112    /// Where calling `Stream::next` will internally loop until a change document is received,
113    /// this will make at most one request and return `None` if the returned document batch is
114    /// empty.  This method should be used when storing the resume token in order to ensure the
115    /// most up to date token is received, e.g.
116    ///
117    /// ```
118    /// # use mongodb::{Client, Collection, bson::Document, error::Result};
119    /// # async fn func() -> Result<()> {
120    /// # let client = Client::with_uri_str("mongodb://example.com").await?;
121    /// # let coll: Collection<Document> = client.database("foo").collection("bar");
122    /// let mut change_stream = coll.watch().await?;
123    /// let mut resume_token = None;
124    /// while change_stream.is_alive() {
125    ///     if let Some(event) = change_stream.next_if_any().await? {
126    ///         // process event
127    ///     }
128    ///     resume_token = change_stream.resume_token();
129    /// }
130    /// #
131    /// # Ok(())
132    /// # }
133    /// ```
134    pub async fn next_if_any(&mut self) -> Result<Option<T>> {
135        self.inner.state_mut().next_if_any(&mut ()).await
136    }
137
138    #[cfg(test)]
139    pub(crate) fn set_kill_watcher(&mut self, tx: oneshot::Sender<()>) {
140        self.inner.state_mut().cursor.raw_mut().set_kill_watcher(tx);
141    }
142
143    #[cfg(test)]
144    pub(crate) fn current_batch(&self) -> &VecDeque<crate::bson::RawDocumentBuf> {
145        self.inner.state().cursor.batch()
146    }
147
148    #[cfg(test)]
149    pub(crate) fn client(&self) -> &crate::Client {
150        self.inner.state().cursor.raw().client()
151    }
152}
153
154impl<T> Stream for ChangeStream<T>
155where
156    T: DeserializeOwned,
157{
158    type Item = Result<T>;
159
160    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
161        Pin::new(&mut self.inner).poll_next(cx)
162    }
163}
164
165type CursorWrapper = common::CursorWrapper<Cursor<()>>;
166
167// This is almost entirely the same as `crate::cursor::stream::Stream`.  However, making a generic
168// version to underlie both has the side effect of changing the variance on `T` from covariant to
169// invariant, which breaks cursor zero-copy deserialization :(
170#[derive_where(Debug)]
171enum StreamState<T: DeserializeOwned> {
172    Idle(CursorWrapper),
173    Polling,
174    Next(#[derive_where(skip)] BoxFuture<'static, NextDone<T>>),
175}
176
177struct NextDone<T> {
178    state: CursorWrapper,
179    out: Result<Option<T>>,
180}
181
182impl<T: DeserializeOwned> StreamState<T> {
183    fn state(&self) -> &CursorWrapper {
184        match self {
185            Self::Idle(st) => st,
186            _ => panic!("invalid change stream state access"),
187        }
188    }
189
190    fn state_mut(&mut self) -> &mut CursorWrapper {
191        match self {
192            Self::Idle(st) => st,
193            _ => panic!("invalid change stream state access"),
194        }
195    }
196
197    fn take_state(self) -> CursorWrapper {
198        match self {
199            Self::Idle(st) => st,
200            _ => panic!("invalid change stream state access"),
201        }
202    }
203}
204
205impl<T: DeserializeOwned> Stream for StreamState<T> {
206    type Item = Result<T>;
207
208    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
209        loop {
210            match std::mem::replace(&mut *self, StreamState::Polling) {
211                StreamState::Idle(mut state) => {
212                    *self = StreamState::Next(
213                        async move {
214                            let out = state.next_if_any(&mut ()).await;
215                            NextDone { state, out }
216                        }
217                        .boxed(),
218                    );
219                    continue;
220                }
221                StreamState::Next(mut fut) => match fut.poll_unpin(cx) {
222                    Poll::Pending => {
223                        *self = StreamState::Next(fut);
224                        return Poll::Pending;
225                    }
226                    Poll::Ready(NextDone { state, out }) => {
227                        *self = StreamState::Idle(state);
228                        match out {
229                            Ok(Some(v)) => return Poll::Ready(Some(Ok(v))),
230                            Ok(None) => continue,
231                            Err(e) => return Poll::Ready(Some(Err(e))),
232                        }
233                    }
234                },
235                StreamState::Polling => {
236                    return Poll::Ready(Some(Err(Error::internal(
237                        "attempt to poll change stream already in polling state",
238                    ))))
239                }
240            }
241        }
242    }
243}
244
245impl common::InnerCursor for Cursor<()> {
246    type Session = ();
247
248    async fn try_advance(&mut self, _session: &mut Self::Session) -> Result<bool> {
249        self.try_advance().await
250    }
251
252    fn get_resume_token(&self) -> Result<Option<ResumeToken>> {
253        common::get_resume_token(self.batch(), self.raw().post_batch_resume_token())
254    }
255
256    fn current(&self) -> &crate::bson::RawDocument {
257        self.current()
258    }
259
260    async fn execute_watch(
261        &mut self,
262        args: WatchArgs,
263        mut data: ChangeStreamData,
264        _session: &mut Self::Session,
265    ) -> Result<(Self, WatchArgs)> {
266        data.implicit_session = self.raw_mut().take_implicit_session();
267        let new_stream: ChangeStream<event::ChangeStreamEvent<()>> = self
268            .raw()
269            .client()
270            .execute_watch(args.pipeline, args.options, args.target, Some(data))
271            .await?;
272        let new_wrapper = new_stream.inner.take_state();
273        Ok((new_wrapper.cursor, new_wrapper.args))
274    }
275
276    fn set_drop_address(&mut self, from: &Self) {
277        self.raw_mut()
278            .set_drop_address(from.raw().address().clone());
279    }
280}