mongodb/sync/change_stream.rs
1use futures_util::stream::StreamExt;
2use serde::de::DeserializeOwned;
3
4use crate::{
5 change_stream::{
6 event::ResumeToken,
7 session::SessionChangeStream as AsyncSessionChangeStream,
8 ChangeStream as AsyncChangeStream,
9 },
10 error::Result,
11};
12
13use super::ClientSession;
14
15/// A `ChangeStream` streams the ongoing changes of its associated collection, database or
16/// deployment. `ChangeStream` instances should be created with method `watch` against the relevant
17/// target.
18///
19/// `ChangeStream`s are "resumable", meaning that they can be restarted at a given place in the
20/// stream of events. This is done automatically when the `ChangeStream` encounters certain
21/// ["resumable"](https://specifications.readthedocs.io/en/latest/change-streams/change-streams/#resumable-error)
22/// errors, such as transient network failures. It can also be done manually by passing
23/// a [`ResumeToken`] retrieved from a past event into either the
24/// [`resume_after`](crate::action::Watch::resume_after) or
25/// [`start_after`](crate::action::Watch::start_after) (4.2+) options used to create
26/// the `ChangeStream`. Issuing a raw change stream aggregation is discouraged unless users wish to
27/// explicitly opt out of resumability.
28///
29/// A `ChangeStream` can be iterated like any other [`Iterator`]:
30///
31/// ```
32/// # use mongodb::{sync::Client, error::Result, bson::doc,
33/// # change_stream::event::ChangeStreamEvent};
34/// #
35/// # fn func() -> Result<()> {
36/// # let client = Client::with_uri_str("mongodb://example.com")?;
37/// # let coll = client.database("foo").collection("bar");
38/// let mut change_stream = coll.watch().run()?;
39/// coll.insert_one(doc! { "x": 1 }).run()?;
40/// for event in change_stream {
41/// let event = event?;
42/// println!("operation performed: {:?}, document: {:?}", event.operation_type, event.full_document);
43/// // operation performed: Insert, document: Some(Document({"x": Int32(1)}))
44/// }
45/// #
46/// # Ok(())
47/// # }
48/// ```
49///
50/// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams) for more
51/// details. Also see the documentation on [usage recommendations](https://www.mongodb.com/docs/manual/administration/change-streams-production-recommendations/).
52pub struct ChangeStream<T>
53where
54 T: DeserializeOwned + Unpin + Send + Sync,
55{
56 async_stream: AsyncChangeStream<T>,
57}
58
59impl<T> ChangeStream<T>
60where
61 T: DeserializeOwned + Unpin + Send + Sync,
62{
63 pub(crate) fn new(async_stream: AsyncChangeStream<T>) -> Self {
64 Self { async_stream }
65 }
66
67 /// Returns the cached resume token that can be used to resume after the most recently returned
68 /// change.
69 ///
70 /// See the documentation
71 /// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for more
72 /// information on change stream resume tokens.
73 pub fn resume_token(&self) -> Option<ResumeToken> {
74 self.async_stream.resume_token()
75 }
76
77 /// Update the type streamed values will be parsed as.
78 pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> ChangeStream<D> {
79 ChangeStream {
80 async_stream: self.async_stream.with_type(),
81 }
82 }
83
84 /// Returns whether the change stream will continue to receive events.
85 pub fn is_alive(&self) -> bool {
86 self.async_stream.is_alive()
87 }
88
89 /// Retrieves the next result from the change stream, if any.
90 ///
91 /// Where calling `Iterator::next` will internally loop until a change document is received,
92 /// this will make at most one request and return `None` if the returned document batch is
93 /// empty. This method should be used when storing the resume token in order to ensure the
94 /// most up to date token is received, e.g.
95 ///
96 /// ```
97 /// # use mongodb::{bson::Document, sync::{Client, Collection}, error::Result};
98 /// # fn func() -> Result<()> {
99 /// # let client = Client::with_uri_str("mongodb://example.com")?;
100 /// # let coll: Collection<Document> = client.database("foo").collection("bar");
101 /// let mut change_stream = coll.watch().run()?;
102 /// let mut resume_token = None;
103 /// while change_stream.is_alive() {
104 /// if let Some(event) = change_stream.next_if_any()? {
105 /// // process event
106 /// }
107 /// resume_token = change_stream.resume_token();
108 /// }
109 /// #
110 /// # Ok(())
111 /// # }
112 /// ```
113 pub fn next_if_any(&mut self) -> Result<Option<T>> {
114 crate::sync::TOKIO_RUNTIME.block_on(self.async_stream.next_if_any())
115 }
116}
117
118impl<T> Iterator for ChangeStream<T>
119where
120 T: DeserializeOwned + Unpin + Send + Sync,
121{
122 type Item = Result<T>;
123
124 fn next(&mut self) -> Option<Self::Item> {
125 crate::sync::TOKIO_RUNTIME.block_on(self.async_stream.next())
126 }
127}
128
129/// A [`SessionChangeStream`] is a change stream that was created with a [`ClientSession`] that must
130/// be iterated using one. To iterate, use [`SessionChangeStream::next`]:
131///
132/// ```
133/// # use mongodb::{bson::Document, sync::Client, error::Result};
134/// #
135/// # async fn do_stuff() -> Result<()> {
136/// # let client = Client::with_uri_str("mongodb://example.com")?;
137/// # let mut session = client.start_session().run()?;
138/// # let coll = client.database("foo").collection::<Document>("bar");
139/// #
140/// let mut cs = coll.watch().session(&mut session).run()?;
141/// while let Some(event) = cs.next(&mut session)? {
142/// println!("{:?}", event)
143/// }
144/// #
145/// # Ok(())
146/// # }
147/// ```
148pub struct SessionChangeStream<T>
149where
150 T: DeserializeOwned + Unpin,
151{
152 async_stream: AsyncSessionChangeStream<T>,
153}
154
155impl<T> SessionChangeStream<T>
156where
157 T: DeserializeOwned + Unpin + Send + Sync,
158{
159 pub(crate) fn new(async_stream: AsyncSessionChangeStream<T>) -> Self {
160 Self { async_stream }
161 }
162
163 /// Returns the cached resume token that can be used to resume after the most recently returned
164 /// change.
165 ///
166 /// See the documentation
167 /// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for more
168 /// information on change stream resume tokens.
169 pub fn resume_token(&self) -> Option<ResumeToken> {
170 self.async_stream.resume_token()
171 }
172
173 /// Update the type streamed values will be parsed as.
174 pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> SessionChangeStream<D> {
175 SessionChangeStream {
176 async_stream: self.async_stream.with_type(),
177 }
178 }
179
180 /// Retrieve the next result from the change stream.
181 /// The session provided must be the same session used to create the change stream.
182 ///
183 /// ```
184 /// # use bson::{doc, Document};
185 /// # use mongodb::sync::Client;
186 /// # fn main() {
187 /// # async {
188 /// # let client = Client::with_uri_str("foo")?;
189 /// # let coll = client.database("foo").collection::<Document>("bar");
190 /// # let other_coll = coll.clone();
191 /// # let mut session = client.start_session().run()?;
192 /// let mut cs = coll.watch().session(&mut session).run()?;
193 /// while let Some(event) = cs.next(&mut session)? {
194 /// let id = bson::to_bson(&event.id)?;
195 /// other_coll.insert_one(doc! { "id": id }).session(&mut session).run()?;
196 /// }
197 /// # Ok::<(), mongodb::error::Error>(())
198 /// # };
199 /// # }
200 /// ```
201 pub fn next(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
202 crate::sync::TOKIO_RUNTIME
203 .block_on(self.async_stream.next(&mut session.async_client_session))
204 }
205
206 /// Returns whether the change stream will continue to receive events.
207 pub fn is_alive(&self) -> bool {
208 self.async_stream.is_alive()
209 }
210
211 /// Retrieve the next result from the change stream, if any.
212 ///
213 /// Where calling `next` will internally loop until a change document is received,
214 /// this will make at most one request and return `None` if the returned document batch is
215 /// empty. This method should be used when storing the resume token in order to ensure the
216 /// most up to date token is received, e.g.
217 ///
218 /// ```
219 /// # use mongodb::{bson::Document, sync::{Client, Collection}, error::Result};
220 /// # async fn func() -> Result<()> {
221 /// # let client = Client::with_uri_str("mongodb://example.com")?;
222 /// # let coll: Collection<Document> = client.database("foo").collection("bar");
223 /// # let mut session = client.start_session().run()?;
224 /// let mut change_stream = coll.watch().session(&mut session).run()?;
225 /// let mut resume_token = None;
226 /// while change_stream.is_alive() {
227 /// if let Some(event) = change_stream.next_if_any(&mut session)? {
228 /// // process event
229 /// }
230 /// resume_token = change_stream.resume_token();
231 /// }
232 /// #
233 /// # Ok(())
234 /// # }
235 /// ```
236 pub fn next_if_any(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
237 crate::sync::TOKIO_RUNTIME.block_on(
238 self.async_stream
239 .next_if_any(&mut session.async_client_session),
240 )
241 }
242}