mongodb/sync/
change_stream.rs

1use futures_util::stream::StreamExt;
2use serde::de::DeserializeOwned;
3
4use crate::{
5    change_stream::{
6        event::ResumeToken,
7        session::SessionChangeStream as AsyncSessionChangeStream,
8        ChangeStream as AsyncChangeStream,
9    },
10    error::Result,
11};
12
13use super::ClientSession;
14
15/// A `ChangeStream` streams the ongoing changes of its associated collection, database or
16/// deployment. `ChangeStream` instances should be created with method `watch` against the relevant
17/// target.
18///
19/// `ChangeStream`s are "resumable", meaning that they can be restarted at a given place in the
20/// stream of events. This is done automatically when the `ChangeStream` encounters certain
21/// ["resumable"](https://specifications.readthedocs.io/en/latest/change-streams/change-streams/#resumable-error)
22/// errors, such as transient network failures. It can also be done manually by passing
23/// a [`ResumeToken`] retrieved from a past event into either the
24/// [`resume_after`](crate::action::Watch::resume_after) or
25/// [`start_after`](crate::action::Watch::start_after) (4.2+) options used to create
26/// the `ChangeStream`. Issuing a raw change stream aggregation is discouraged unless users wish to
27/// explicitly opt out of resumability.
28///
29/// A `ChangeStream` can be iterated like any other [`Iterator`]:
30///
31/// ```
32/// # use mongodb::{sync::Client, error::Result, bson::doc,
33/// # change_stream::event::ChangeStreamEvent};
34/// #
35/// # fn func() -> Result<()> {
36/// # let client = Client::with_uri_str("mongodb://example.com")?;
37/// # let coll = client.database("foo").collection("bar");
38/// let mut change_stream = coll.watch().run()?;
39/// coll.insert_one(doc! { "x": 1 }).run()?;
40/// for event in change_stream {
41///     let event = event?;
42///     println!("operation performed: {:?}, document: {:?}", event.operation_type, event.full_document);
43///     // operation performed: Insert, document: Some(Document({"x": Int32(1)}))
44/// }
45/// #
46/// # Ok(())
47/// # }
48/// ```
49///
50/// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams) for more
51/// details. Also see the documentation on [usage recommendations](https://www.mongodb.com/docs/manual/administration/change-streams-production-recommendations/).
52pub struct ChangeStream<T>
53where
54    T: DeserializeOwned + Unpin + Send + Sync,
55{
56    async_stream: AsyncChangeStream<T>,
57}
58
59impl<T> ChangeStream<T>
60where
61    T: DeserializeOwned + Unpin + Send + Sync,
62{
63    pub(crate) fn new(async_stream: AsyncChangeStream<T>) -> Self {
64        Self { async_stream }
65    }
66
67    /// Returns the cached resume token that can be used to resume after the most recently returned
68    /// change.
69    ///
70    /// See the documentation
71    /// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for more
72    /// information on change stream resume tokens.
73    pub fn resume_token(&self) -> Option<ResumeToken> {
74        self.async_stream.resume_token()
75    }
76
77    /// Update the type streamed values will be parsed as.
78    pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> ChangeStream<D> {
79        ChangeStream {
80            async_stream: self.async_stream.with_type(),
81        }
82    }
83
84    /// Returns whether the change stream will continue to receive events.
85    pub fn is_alive(&self) -> bool {
86        self.async_stream.is_alive()
87    }
88
89    /// Retrieves the next result from the change stream, if any.
90    ///
91    /// Where calling `Iterator::next` will internally loop until a change document is received,
92    /// this will make at most one request and return `None` if the returned document batch is
93    /// empty.  This method should be used when storing the resume token in order to ensure the
94    /// most up to date token is received, e.g.
95    ///
96    /// ```
97    /// # use mongodb::{bson::Document, sync::{Client, Collection}, error::Result};
98    /// # fn func() -> Result<()> {
99    /// # let client = Client::with_uri_str("mongodb://example.com")?;
100    /// # let coll: Collection<Document> = client.database("foo").collection("bar");
101    /// let mut change_stream = coll.watch().run()?;
102    /// let mut resume_token = None;
103    /// while change_stream.is_alive() {
104    ///     if let Some(event) = change_stream.next_if_any()? {
105    ///         // process event
106    ///     }
107    ///     resume_token = change_stream.resume_token();
108    /// }
109    /// #
110    /// # Ok(())
111    /// # }
112    /// ```
113    pub fn next_if_any(&mut self) -> Result<Option<T>> {
114        crate::sync::TOKIO_RUNTIME.block_on(self.async_stream.next_if_any())
115    }
116}
117
118impl<T> Iterator for ChangeStream<T>
119where
120    T: DeserializeOwned + Unpin + Send + Sync,
121{
122    type Item = Result<T>;
123
124    fn next(&mut self) -> Option<Self::Item> {
125        crate::sync::TOKIO_RUNTIME.block_on(self.async_stream.next())
126    }
127}
128
129/// A [`SessionChangeStream`] is a change stream that was created with a [`ClientSession`] that must
130/// be iterated using one. To iterate, use [`SessionChangeStream::next`]:
131///
132/// ```
133/// # use mongodb::{bson::Document, sync::Client, error::Result};
134/// #
135/// # async fn do_stuff() -> Result<()> {
136/// # let client = Client::with_uri_str("mongodb://example.com")?;
137/// # let mut session = client.start_session().run()?;
138/// # let coll = client.database("foo").collection::<Document>("bar");
139/// #
140/// let mut cs = coll.watch().session(&mut session).run()?;
141/// while let Some(event) = cs.next(&mut session)? {
142///     println!("{:?}", event)
143/// }
144/// #
145/// # Ok(())
146/// # }
147/// ```
148pub struct SessionChangeStream<T>
149where
150    T: DeserializeOwned + Unpin,
151{
152    async_stream: AsyncSessionChangeStream<T>,
153}
154
155impl<T> SessionChangeStream<T>
156where
157    T: DeserializeOwned + Unpin + Send + Sync,
158{
159    pub(crate) fn new(async_stream: AsyncSessionChangeStream<T>) -> Self {
160        Self { async_stream }
161    }
162
163    /// Returns the cached resume token that can be used to resume after the most recently returned
164    /// change.
165    ///
166    /// See the documentation
167    /// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for more
168    /// information on change stream resume tokens.
169    pub fn resume_token(&self) -> Option<ResumeToken> {
170        self.async_stream.resume_token()
171    }
172
173    /// Update the type streamed values will be parsed as.
174    pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> SessionChangeStream<D> {
175        SessionChangeStream {
176            async_stream: self.async_stream.with_type(),
177        }
178    }
179
180    /// Retrieve the next result from the change stream.
181    /// The session provided must be the same session used to create the change stream.
182    ///
183    /// ```
184    /// # use bson::{doc, Document};
185    /// # use mongodb::sync::Client;
186    /// # fn main() {
187    /// # async {
188    /// # let client = Client::with_uri_str("foo")?;
189    /// # let coll = client.database("foo").collection::<Document>("bar");
190    /// # let other_coll = coll.clone();
191    /// # let mut session = client.start_session().run()?;
192    /// let mut cs = coll.watch().session(&mut session).run()?;
193    /// while let Some(event) = cs.next(&mut session)? {
194    ///     let id = bson::to_bson(&event.id)?;
195    ///     other_coll.insert_one(doc! { "id": id }).session(&mut session).run()?;
196    /// }
197    /// # Ok::<(), mongodb::error::Error>(())
198    /// # };
199    /// # }
200    /// ```
201    pub fn next(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
202        crate::sync::TOKIO_RUNTIME
203            .block_on(self.async_stream.next(&mut session.async_client_session))
204    }
205
206    /// Returns whether the change stream will continue to receive events.
207    pub fn is_alive(&self) -> bool {
208        self.async_stream.is_alive()
209    }
210
211    /// Retrieve the next result from the change stream, if any.
212    ///
213    /// Where calling `next` will internally loop until a change document is received,
214    /// this will make at most one request and return `None` if the returned document batch is
215    /// empty.  This method should be used when storing the resume token in order to ensure the
216    /// most up to date token is received, e.g.
217    ///
218    /// ```
219    /// # use mongodb::{bson::Document, sync::{Client, Collection}, error::Result};
220    /// # async fn func() -> Result<()> {
221    /// # let client = Client::with_uri_str("mongodb://example.com")?;
222    /// # let coll: Collection<Document> = client.database("foo").collection("bar");
223    /// # let mut session = client.start_session().run()?;
224    /// let mut change_stream = coll.watch().session(&mut session).run()?;
225    /// let mut resume_token = None;
226    /// while change_stream.is_alive() {
227    ///     if let Some(event) = change_stream.next_if_any(&mut session)? {
228    ///         // process event
229    ///     }
230    ///     resume_token = change_stream.resume_token();
231    /// }
232    /// #
233    /// # Ok(())
234    /// # }
235    /// ```
236    pub fn next_if_any(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
237        crate::sync::TOKIO_RUNTIME.block_on(
238            self.async_stream
239                .next_if_any(&mut session.async_client_session),
240        )
241    }
242}