Skip to main content

mongodb/sync/
change_stream.rs

1use futures_util::stream::StreamExt;
2use serde::de::DeserializeOwned;
3
4use crate::{
5    change_stream::{
6        event::ResumeToken,
7        session::SessionChangeStream as AsyncSessionChangeStream,
8        ChangeStream as AsyncChangeStream,
9    },
10    error::Result,
11};
12
13use super::ClientSession;
14
15/// A `ChangeStream` streams the ongoing changes of its associated collection, database or
16/// deployment. `ChangeStream` instances should be created with method `watch` against the relevant
17/// target.
18///
19/// `ChangeStream`s are "resumable", meaning that they can be restarted at a given place in the
20/// stream of events. This is done automatically when the `ChangeStream` encounters certain
21/// ["resumable"](https://specifications.readthedocs.io/en/latest/change-streams/change-streams/#resumable-error)
22/// errors, such as transient network failures. It can also be done manually by passing
23/// a [`ResumeToken`] retrieved from a past event into either the
24/// [`resume_after`](crate::action::Watch::resume_after) or
25/// [`start_after`](crate::action::Watch::start_after) options used to create
26/// the `ChangeStream`. Issuing a raw change stream aggregation is discouraged unless users wish to
27/// explicitly opt out of resumability.
28///
29/// A `ChangeStream` can be iterated like any other [`Iterator`]:
30///
31/// ```
32/// # use mongodb::{sync::Client, error::Result, bson::doc,
33/// # change_stream::event::ChangeStreamEvent};
34/// #
35/// # fn func() -> Result<()> {
36/// # let client = Client::with_uri_str("mongodb://example.com")?;
37/// # let coll = client.database("foo").collection("bar");
38/// let mut change_stream = coll.watch().run()?;
39/// coll.insert_one(doc! { "x": 1 }).run()?;
40/// for event in change_stream {
41///     let event = event?;
42///     println!("operation performed: {:?}, document: {:?}", event.operation_type, event.full_document);
43///     // operation performed: Insert, document: Some(Document({"x": Int32(1)}))
44/// }
45/// #
46/// # Ok(())
47/// # }
48/// ```
49///
50/// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams) for more
51/// details. Also see the documentation on [usage recommendations](https://www.mongodb.com/docs/manual/administration/change-streams-production-recommendations/).
52pub struct ChangeStream<T>
53where
54    T: DeserializeOwned + Unpin + Send + Sync,
55{
56    async_stream: AsyncChangeStream<T>,
57}
58
59impl<T> ChangeStream<T>
60where
61    T: DeserializeOwned + Unpin + Send + Sync,
62{
63    pub(crate) fn new(async_stream: AsyncChangeStream<T>) -> Self {
64        Self { async_stream }
65    }
66
67    /// Returns the cached resume token that can be used to resume after the most recently returned
68    /// change.
69    ///
70    /// See the documentation
71    /// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for more
72    /// information on change stream resume tokens.
73    pub fn resume_token(&self) -> Option<ResumeToken> {
74        self.async_stream.resume_token()
75    }
76
77    /// Update the type streamed values will be parsed as.
78    pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> ChangeStream<D> {
79        ChangeStream {
80            async_stream: self.async_stream.with_type(),
81        }
82    }
83
84    /// Returns whether the change stream will continue to receive events.
85    pub fn is_alive(&self) -> bool {
86        self.async_stream.is_alive()
87    }
88
89    /// Retrieves the next result from the change stream, if any.
90    ///
91    /// Where calling `Iterator::next` will internally loop until a change document is received,
92    /// this will make at most one request and return `None` if the returned document batch is
93    /// empty.  This method should be used when storing the resume token in order to ensure the
94    /// most up to date token is received, e.g.
95    ///
96    /// ```
97    /// # use mongodb::{bson::Document, sync::{Client, Collection}, error::Result};
98    /// # fn func() -> Result<()> {
99    /// # let client = Client::with_uri_str("mongodb://example.com")?;
100    /// # let coll: Collection<Document> = client.database("foo").collection("bar");
101    /// let mut change_stream = coll.watch().run()?;
102    /// let mut resume_token = None;
103    /// while change_stream.is_alive() {
104    ///     if let Some(event) = change_stream.next_if_any()? {
105    ///         // process event
106    ///     }
107    ///     resume_token = change_stream.resume_token();
108    /// }
109    /// #
110    /// # Ok(())
111    /// # }
112    /// ```
113    pub fn next_if_any(&mut self) -> Result<Option<T>> {
114        crate::sync::TOKIO_RUNTIME.block_on(self.async_stream.next_if_any())
115    }
116}
117
118impl<T> Iterator for ChangeStream<T>
119where
120    T: DeserializeOwned + Unpin + Send + Sync,
121{
122    type Item = Result<T>;
123
124    fn next(&mut self) -> Option<Self::Item> {
125        crate::sync::TOKIO_RUNTIME.block_on(self.async_stream.next())
126    }
127}
128
129/// A [`SessionChangeStream`] is a change stream that was created with a [`ClientSession`] that must
130/// be iterated using one. To iterate, use [`SessionChangeStream::next`]:
131///
132/// ```
133/// # use mongodb::{bson::Document, sync::Client, error::Result};
134/// #
135/// # async fn do_stuff() -> Result<()> {
136/// # let client = Client::with_uri_str("mongodb://example.com")?;
137/// # let mut session = client.start_session().run()?;
138/// # let coll = client.database("foo").collection::<Document>("bar");
139/// #
140/// let mut cs = coll.watch().session(&mut session).run()?;
141/// while let Some(event) = cs.next(&mut session)? {
142///     println!("{:?}", event)
143/// }
144/// #
145/// # Ok(())
146/// # }
147/// ```
148pub struct SessionChangeStream<T>
149where
150    T: DeserializeOwned + Unpin,
151{
152    async_stream: AsyncSessionChangeStream<T>,
153}
154
155impl<T> SessionChangeStream<T>
156where
157    T: DeserializeOwned + Unpin + Send + Sync,
158{
159    pub(crate) fn new(async_stream: AsyncSessionChangeStream<T>) -> Self {
160        Self { async_stream }
161    }
162
163    /// Returns the cached resume token that can be used to resume after the most recently returned
164    /// change.
165    ///
166    /// See the documentation
167    /// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for more
168    /// information on change stream resume tokens.
169    pub fn resume_token(&self) -> Option<ResumeToken> {
170        self.async_stream.resume_token()
171    }
172
173    /// Update the type streamed values will be parsed as.
174    pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> SessionChangeStream<D> {
175        SessionChangeStream {
176            async_stream: self.async_stream.with_type(),
177        }
178    }
179
180    /// Retrieve the next result from the change stream.
181    /// The session provided must be the same session used to create the change stream.
182    ///
183    /// ```
184    /// # use mongodb::{bson::{self, doc, Document}, sync::Client};
185    /// # fn main() {
186    /// # async {
187    /// # let client = Client::with_uri_str("foo")?;
188    /// # let coll = client.database("foo").collection::<Document>("bar");
189    /// # let other_coll = coll.clone();
190    /// # let mut session = client.start_session().run()?;
191    /// let mut cs = coll.watch().session(&mut session).run()?;
192    /// while let Some(event) = cs.next(&mut session)? {
193    ///     let id = bson::serialize_to_bson(&event.id)?;
194    ///     other_coll.insert_one(doc! { "id": id }).session(&mut session).run()?;
195    /// }
196    /// # Ok::<(), mongodb::error::Error>(())
197    /// # };
198    /// # }
199    /// ```
200    pub fn next(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
201        crate::sync::TOKIO_RUNTIME
202            .block_on(self.async_stream.next(&mut session.async_client_session))
203    }
204
205    /// Returns whether the change stream will continue to receive events.
206    pub fn is_alive(&self) -> bool {
207        self.async_stream.is_alive()
208    }
209
210    /// Retrieve the next result from the change stream, if any.
211    ///
212    /// Where calling `next` will internally loop until a change document is received,
213    /// this will make at most one request and return `None` if the returned document batch is
214    /// empty.  This method should be used when storing the resume token in order to ensure the
215    /// most up to date token is received, e.g.
216    ///
217    /// ```
218    /// # use mongodb::{bson::Document, sync::{Client, Collection}, error::Result};
219    /// # async fn func() -> Result<()> {
220    /// # let client = Client::with_uri_str("mongodb://example.com")?;
221    /// # let coll: Collection<Document> = client.database("foo").collection("bar");
222    /// # let mut session = client.start_session().run()?;
223    /// let mut change_stream = coll.watch().session(&mut session).run()?;
224    /// let mut resume_token = None;
225    /// while change_stream.is_alive() {
226    ///     if let Some(event) = change_stream.next_if_any(&mut session)? {
227    ///         // process event
228    ///     }
229    ///     resume_token = change_stream.resume_token();
230    /// }
231    /// #
232    /// # Ok(())
233    /// # }
234    /// ```
235    pub fn next_if_any(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
236        crate::sync::TOKIO_RUNTIME.block_on(
237            self.async_stream
238                .next_if_any(&mut session.async_client_session),
239        )
240    }
241}