mongodb/action/watch.rs
1use std::{marker::PhantomData, time::Duration};
2
3use crate::{
4 bson::{Bson, Document, Timestamp},
5 operation::OperationTarget,
6};
7use serde::de::DeserializeOwned;
8
9use super::{
10 action_impl,
11 deeplink,
12 export_doc,
13 option_setters,
14 options_doc,
15 ExplicitSession,
16 ImplicitSession,
17};
18use crate::{
19 change_stream::{
20 event::{ChangeStreamEvent, ResumeToken},
21 options::{ChangeStreamOptions, FullDocumentBeforeChangeType, FullDocumentType},
22 session::SessionChangeStream,
23 ChangeStream,
24 },
25 collation::Collation,
26 error::Result,
27 options::ReadConcern,
28 selection_criteria::SelectionCriteria,
29 Client,
30 ClientSession,
31 Collection,
32 Database,
33};
34
35impl Client {
36 /// Starts a new [`ChangeStream`] that receives events for all changes in the cluster. The
37 /// stream does not observe changes from system collections or the "config", "local" or
38 /// "admin" databases.
39 ///
40 /// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/) on change
41 /// streams.
42 ///
43 /// Change streams require either a "majority" read concern or no read
44 /// concern. Anything else will cause a server error.
45 ///
46 /// Note that using a `$project` stage to remove any of the `_id` `operationType` or `ns` fields
47 /// will cause an error. The driver requires these fields to support resumability. For
48 /// more information on resumability, see the documentation for
49 /// [`ChangeStream`](change_stream/struct.ChangeStream.html)
50 ///
51 /// If the pipeline alters the structure of the returned events, the parsed type will need to be
52 /// changed via [`ChangeStream::with_type`].
53 ///
54 /// `await` will return d[`Result<ChangeStream<ChangeStreamEvent<Document>>>`] or
55 /// d[`Result<SessionChangeStream<ChangeStreamEvent<Document>>>`] if a
56 /// [`ClientSession`] has been provided.
57 #[deeplink]
58 #[options_doc(watch)]
59 pub fn watch(&self) -> Watch<'_> {
60 Watch::new_cluster(self)
61 }
62}
63
64impl Database {
65 /// Starts a new [`ChangeStream`](change_stream/struct.ChangeStream.html) that receives events
66 /// for all changes in this database. The stream does not observe changes from system
67 /// collections and cannot be started on "config", "local" or "admin" databases.
68 ///
69 /// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/) on change
70 /// streams.
71 ///
72 /// Change streams require either a "majority" read concern or no read
73 /// concern. Anything else will cause a server error.
74 ///
75 /// Note that using a `$project` stage to remove any of the `_id`, `operationType` or `ns`
76 /// fields will cause an error. The driver requires these fields to support resumability. For
77 /// more information on resumability, see the documentation for
78 /// [`ChangeStream`](change_stream/struct.ChangeStream.html).
79 ///
80 /// If the pipeline alters the structure of the returned events, the parsed type will need to be
81 /// changed via [`ChangeStream::with_type`].
82 ///
83 /// `await` will return d[`Result<ChangeStream<ChangeStreamEvent<Document>>>`] or
84 /// d[`Result<SessionChangeStream<ChangeStreamEvent<Document>>>`] if a
85 /// [`ClientSession`] has been provided.
86 #[deeplink]
87 #[options_doc(watch)]
88 pub fn watch(&self) -> Watch<'_> {
89 Watch::new(self.client(), self.into())
90 }
91}
92
93impl<T> Collection<T>
94where
95 T: Send + Sync,
96{
97 /// Starts a new [`ChangeStream`](change_stream/struct.ChangeStream.html) that receives events
98 /// for all changes in this collection. A
99 /// [`ChangeStream`](change_stream/struct.ChangeStream.html) cannot be started on system
100 /// collections.
101 ///
102 /// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/) on change
103 /// streams.
104 ///
105 /// Change streams require either a "majority" read concern or no read concern. Anything else
106 /// will cause a server error.
107 ///
108 /// `await` will return d[`Result<ChangeStream<ChangeStreamEvent<T>>>`] or
109 /// d[`Result<SessionChangeStream<ChangeStreamEvent<T>>>`] if a
110 /// [`ClientSession`] has been provided.
111 #[deeplink]
112 #[options_doc(watch)]
113 pub fn watch(&self) -> Watch<'_, T> {
114 Watch::new(self.client(), self.into())
115 }
116}
117
118#[cfg(feature = "sync")]
119impl crate::sync::Client {
120 /// Starts a new [`ChangeStream`] that receives events for all changes in the cluster. The
121 /// stream does not observe changes from system collections or the "config", "local" or
122 /// "admin" databases.
123 ///
124 /// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/) on change
125 /// streams.
126 ///
127 /// Change streams require either a "majority" read concern or no read
128 /// concern. Anything else will cause a server error.
129 #[options_doc(watch, "run")]
130 pub fn watch(&self) -> Watch<'_> {
131 self.async_client.watch()
132 }
133}
134
135#[cfg(feature = "sync")]
136impl crate::sync::Database {
137 /// Starts a new [`ChangeStream`](change_stream/struct.ChangeStream.html) that receives events
138 /// for all changes in this database. The stream does not observe changes from system
139 /// collections and cannot be started on "config", "local" or "admin" databases.
140 ///
141 /// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/) on change
142 /// streams.
143 ///
144 /// Change streams require either a "majority" read concern or no read
145 /// concern. Anything else will cause a server error.
146 #[options_doc(watch, "run")]
147 pub fn watch(&self) -> Watch<'_> {
148 self.async_database.watch()
149 }
150}
151
152#[cfg(feature = "sync")]
153impl<T> crate::sync::Collection<T>
154where
155 T: Send + Sync,
156{
157 /// Starts a new [`ChangeStream`](change_stream/struct.ChangeStream.html) that receives events
158 /// for all changes in this collection. A
159 /// [`ChangeStream`](change_stream/struct.ChangeStream.html) cannot be started on system
160 /// collections.
161 ///
162 /// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/) on change
163 /// streams.
164 ///
165 /// Change streams require either a "majority" read concern or no read concern. Anything else
166 /// will cause a server error.
167 #[options_doc(watch, "run")]
168 pub fn watch(&self) -> Watch<'_, T> {
169 self.async_collection.watch()
170 }
171}
172
173/// Starts a new [`ChangeStream`] that receives events for all changes in a given scope. Create by
174/// calling [`Client::watch`], [`Database::watch`], or [`Collection::watch`].
175#[must_use]
176pub struct Watch<'a, T = Document, S = ImplicitSession> {
177 client: &'a Client,
178 target: OperationTarget,
179 pipeline: Vec<Document>,
180 options: Option<ChangeStreamOptions>,
181 session: S,
182 cluster: bool,
183 phantom: PhantomData<fn() -> T>,
184}
185
186impl<'a, T> Watch<'a, T, ImplicitSession> {
187 fn new(client: &'a Client, target: OperationTarget) -> Self {
188 Self {
189 client,
190 target,
191 pipeline: vec![],
192 options: None,
193 session: ImplicitSession,
194 cluster: false,
195 phantom: PhantomData,
196 }
197 }
198
199 fn new_cluster(client: &'a Client) -> Self {
200 Self {
201 client,
202 target: OperationTarget::admin(client),
203 pipeline: vec![],
204 options: None,
205 session: ImplicitSession,
206 cluster: true,
207 phantom: PhantomData,
208 }
209 }
210}
211
212#[option_setters(crate::change_stream::options::ChangeStreamOptions, skip = [resume_after, all_changes_for_cluster])]
213#[export_doc(watch, extra = [session])]
214impl<S> Watch<'_, S> {
215 /// Apply an aggregation pipeline to the change stream.
216 ///
217 /// Note that using a `$project` stage to remove any of the `_id`, `operationType` or `ns`
218 /// fields will cause an error. The driver requires these fields to support resumability. For
219 /// more information on resumability, see the documentation for
220 /// [`ChangeStream`](change_stream/struct.ChangeStream.html)
221 ///
222 /// If the pipeline alters the structure of the returned events, the parsed type will need to be
223 /// changed via [`ChangeStream::with_type`].
224 pub fn pipeline(mut self, value: impl IntoIterator<Item = Document>) -> Self {
225 self.pipeline = value.into_iter().collect();
226 self
227 }
228
229 /// Specifies the logical starting point for the new change stream. Note that if a watched
230 /// collection is dropped and recreated or newly renamed, `start_after` should be set instead.
231 /// `resume_after` and `start_after` cannot be set simultaneously.
232 ///
233 /// For more information on resuming a change stream see the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-after)
234 pub fn resume_after(mut self, value: impl Into<Option<ResumeToken>>) -> Self {
235 // Implemented manually to accept `impl Into<Option<ResumeToken>>` so the output of
236 // `ChangeStream::resume_token()` can be passed in directly.
237 self.options().resume_after = value.into();
238 self
239 }
240}
241
242impl<'a, T> Watch<'a, T, ImplicitSession> {
243 /// Use the provided ['ClientSession'].
244 pub fn session<'s>(
245 self,
246 session: impl Into<&'s mut ClientSession>,
247 ) -> Watch<'a, T, ExplicitSession<'s>> {
248 Watch {
249 client: self.client,
250 target: self.target,
251 pipeline: self.pipeline,
252 options: self.options,
253 session: ExplicitSession(session.into()),
254 cluster: self.cluster,
255 phantom: PhantomData,
256 }
257 }
258}
259
260#[action_impl(sync = crate::sync::ChangeStream<ChangeStreamEvent<T>>)]
261impl<'a, T: DeserializeOwned + Unpin + Send + Sync> Action for Watch<'a, T, ImplicitSession> {
262 type Future = WatchFuture;
263
264 async fn execute(mut self) -> Result<ChangeStream<ChangeStreamEvent<T>>> {
265 if self.cluster {
266 self.options
267 .get_or_insert_with(Default::default)
268 .all_changes_for_cluster = Some(true);
269 }
270 self.client
271 .execute_watch(self.pipeline, self.options, self.target, None)
272 .await
273 }
274}
275
276#[action_impl(sync = crate::sync::SessionChangeStream<ChangeStreamEvent<T>>)]
277impl<'a, T: DeserializeOwned + Unpin + Send + Sync> Action for Watch<'a, T, ExplicitSession<'a>> {
278 type Future = WatchSessionFuture;
279
280 async fn execute(mut self) -> Result<SessionChangeStream<ChangeStreamEvent<T>>> {
281 if self.cluster {
282 self.options
283 .get_or_insert_with(Default::default)
284 .all_changes_for_cluster = Some(true);
285 }
286 self.client
287 .execute_watch_with_session(
288 self.pipeline,
289 self.options,
290 self.target,
291 None,
292 self.session.0,
293 )
294 .await
295 }
296}