Skip to main content

mongodb/action/
watch.rs

1use std::{marker::PhantomData, time::Duration};
2
3use crate::{
4    bson::{Bson, Document, Timestamp},
5    operation::OperationTarget,
6};
7use serde::de::DeserializeOwned;
8
9use super::{
10    action_impl,
11    deeplink,
12    export_doc,
13    option_setters,
14    options_doc,
15    ExplicitSession,
16    ImplicitSession,
17};
18use crate::{
19    change_stream::{
20        event::{ChangeStreamEvent, ResumeToken},
21        options::{ChangeStreamOptions, FullDocumentBeforeChangeType, FullDocumentType},
22        session::SessionChangeStream,
23        ChangeStream,
24    },
25    collation::Collation,
26    error::Result,
27    options::ReadConcern,
28    selection_criteria::SelectionCriteria,
29    Client,
30    ClientSession,
31    Collection,
32    Database,
33};
34
35impl Client {
36    /// Starts a new [`ChangeStream`] that receives events for all changes in the cluster. The
37    /// stream does not observe changes from system collections or the "config", "local" or
38    /// "admin" databases.
39    ///
40    /// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/) on change
41    /// streams.
42    ///
43    /// Change streams require either a "majority" read concern or no read
44    /// concern. Anything else will cause a server error.
45    ///
46    /// Note that using a `$project` stage to remove any of the `_id` `operationType` or `ns` fields
47    /// will cause an error. The driver requires these fields to support resumability. For
48    /// more information on resumability, see the documentation for
49    /// [`ChangeStream`](change_stream/struct.ChangeStream.html)
50    ///
51    /// If the pipeline alters the structure of the returned events, the parsed type will need to be
52    /// changed via [`ChangeStream::with_type`].
53    ///
54    /// `await` will return d[`Result<ChangeStream<ChangeStreamEvent<Document>>>`] or
55    /// d[`Result<SessionChangeStream<ChangeStreamEvent<Document>>>`] if a
56    /// [`ClientSession`] has been provided.
57    #[deeplink]
58    #[options_doc(watch)]
59    pub fn watch(&self) -> Watch<'_> {
60        Watch::new_cluster(self)
61    }
62}
63
64impl Database {
65    /// Starts a new [`ChangeStream`](change_stream/struct.ChangeStream.html) that receives events
66    /// for all changes in this database. The stream does not observe changes from system
67    /// collections and cannot be started on "config", "local" or "admin" databases.
68    ///
69    /// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/) on change
70    /// streams.
71    ///
72    /// Change streams require either a "majority" read concern or no read
73    /// concern. Anything else will cause a server error.
74    ///
75    /// Note that using a `$project` stage to remove any of the `_id`, `operationType` or `ns`
76    /// fields will cause an error. The driver requires these fields to support resumability. For
77    /// more information on resumability, see the documentation for
78    /// [`ChangeStream`](change_stream/struct.ChangeStream.html).
79    ///
80    /// If the pipeline alters the structure of the returned events, the parsed type will need to be
81    /// changed via [`ChangeStream::with_type`].
82    ///
83    /// `await` will return d[`Result<ChangeStream<ChangeStreamEvent<Document>>>`] or
84    /// d[`Result<SessionChangeStream<ChangeStreamEvent<Document>>>`] if a
85    /// [`ClientSession`] has been provided.
86    #[deeplink]
87    #[options_doc(watch)]
88    pub fn watch(&self) -> Watch<'_> {
89        Watch::new(self.client(), self.into())
90    }
91}
92
93impl<T> Collection<T>
94where
95    T: Send + Sync,
96{
97    /// Starts a new [`ChangeStream`](change_stream/struct.ChangeStream.html) that receives events
98    /// for all changes in this collection. A
99    /// [`ChangeStream`](change_stream/struct.ChangeStream.html) cannot be started on system
100    /// collections.
101    ///
102    /// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/) on change
103    /// streams.
104    ///
105    /// Change streams require either a "majority" read concern or no read concern. Anything else
106    /// will cause a server error.
107    ///
108    /// `await` will return d[`Result<ChangeStream<ChangeStreamEvent<T>>>`] or
109    /// d[`Result<SessionChangeStream<ChangeStreamEvent<T>>>`] if a
110    /// [`ClientSession`] has been provided.
111    #[deeplink]
112    #[options_doc(watch)]
113    pub fn watch(&self) -> Watch<'_, T> {
114        Watch::new(self.client(), self.into())
115    }
116}
117
118#[cfg(feature = "sync")]
119impl crate::sync::Client {
120    /// Starts a new [`ChangeStream`] that receives events for all changes in the cluster. The
121    /// stream does not observe changes from system collections or the "config", "local" or
122    /// "admin" databases.
123    ///
124    /// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/) on change
125    /// streams.
126    ///
127    /// Change streams require either a "majority" read concern or no read
128    /// concern. Anything else will cause a server error.
129    #[options_doc(watch, "run")]
130    pub fn watch(&self) -> Watch<'_> {
131        self.async_client.watch()
132    }
133}
134
135#[cfg(feature = "sync")]
136impl crate::sync::Database {
137    /// Starts a new [`ChangeStream`](change_stream/struct.ChangeStream.html) that receives events
138    /// for all changes in this database. The stream does not observe changes from system
139    /// collections and cannot be started on "config", "local" or "admin" databases.
140    ///
141    /// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/) on change
142    /// streams.
143    ///
144    /// Change streams require either a "majority" read concern or no read
145    /// concern. Anything else will cause a server error.
146    #[options_doc(watch, "run")]
147    pub fn watch(&self) -> Watch<'_> {
148        self.async_database.watch()
149    }
150}
151
152#[cfg(feature = "sync")]
153impl<T> crate::sync::Collection<T>
154where
155    T: Send + Sync,
156{
157    /// Starts a new [`ChangeStream`](change_stream/struct.ChangeStream.html) that receives events
158    /// for all changes in this collection. A
159    /// [`ChangeStream`](change_stream/struct.ChangeStream.html) cannot be started on system
160    /// collections.
161    ///
162    /// See the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/) on change
163    /// streams.
164    ///
165    /// Change streams require either a "majority" read concern or no read concern. Anything else
166    /// will cause a server error.
167    #[options_doc(watch, "run")]
168    pub fn watch(&self) -> Watch<'_, T> {
169        self.async_collection.watch()
170    }
171}
172
173/// Starts a new [`ChangeStream`] that receives events for all changes in a given scope.  Create by
174/// calling [`Client::watch`], [`Database::watch`], or [`Collection::watch`].
175#[must_use]
176pub struct Watch<'a, T = Document, S = ImplicitSession> {
177    client: &'a Client,
178    target: OperationTarget,
179    pipeline: Vec<Document>,
180    options: Option<ChangeStreamOptions>,
181    session: S,
182    cluster: bool,
183    phantom: PhantomData<fn() -> T>,
184}
185
186impl<'a, T> Watch<'a, T, ImplicitSession> {
187    fn new(client: &'a Client, target: OperationTarget) -> Self {
188        Self {
189            client,
190            target,
191            pipeline: vec![],
192            options: None,
193            session: ImplicitSession,
194            cluster: false,
195            phantom: PhantomData,
196        }
197    }
198
199    fn new_cluster(client: &'a Client) -> Self {
200        Self {
201            client,
202            target: OperationTarget::admin(client),
203            pipeline: vec![],
204            options: None,
205            session: ImplicitSession,
206            cluster: true,
207            phantom: PhantomData,
208        }
209    }
210}
211
212#[option_setters(crate::change_stream::options::ChangeStreamOptions, skip = [resume_after, all_changes_for_cluster])]
213#[export_doc(watch, extra = [session])]
214impl<S> Watch<'_, S> {
215    /// Apply an aggregation pipeline to the change stream.
216    ///
217    /// Note that using a `$project` stage to remove any of the `_id`, `operationType` or `ns`
218    /// fields will cause an error. The driver requires these fields to support resumability. For
219    /// more information on resumability, see the documentation for
220    /// [`ChangeStream`](change_stream/struct.ChangeStream.html)
221    ///
222    /// If the pipeline alters the structure of the returned events, the parsed type will need to be
223    /// changed via [`ChangeStream::with_type`].
224    pub fn pipeline(mut self, value: impl IntoIterator<Item = Document>) -> Self {
225        self.pipeline = value.into_iter().collect();
226        self
227    }
228
229    /// Specifies the logical starting point for the new change stream. Note that if a watched
230    /// collection is dropped and recreated or newly renamed, `start_after` should be set instead.
231    /// `resume_after` and `start_after` cannot be set simultaneously.
232    ///
233    /// For more information on resuming a change stream see the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-after)
234    pub fn resume_after(mut self, value: impl Into<Option<ResumeToken>>) -> Self {
235        // Implemented manually to accept `impl Into<Option<ResumeToken>>` so the output of
236        // `ChangeStream::resume_token()` can be passed in directly.
237        self.options().resume_after = value.into();
238        self
239    }
240}
241
242impl<'a, T> Watch<'a, T, ImplicitSession> {
243    /// Use the provided ['ClientSession'].
244    pub fn session<'s>(
245        self,
246        session: impl Into<&'s mut ClientSession>,
247    ) -> Watch<'a, T, ExplicitSession<'s>> {
248        Watch {
249            client: self.client,
250            target: self.target,
251            pipeline: self.pipeline,
252            options: self.options,
253            session: ExplicitSession(session.into()),
254            cluster: self.cluster,
255            phantom: PhantomData,
256        }
257    }
258}
259
260#[action_impl(sync = crate::sync::ChangeStream<ChangeStreamEvent<T>>)]
261impl<'a, T: DeserializeOwned + Unpin + Send + Sync> Action for Watch<'a, T, ImplicitSession> {
262    type Future = WatchFuture;
263
264    async fn execute(mut self) -> Result<ChangeStream<ChangeStreamEvent<T>>> {
265        if self.cluster {
266            self.options
267                .get_or_insert_with(Default::default)
268                .all_changes_for_cluster = Some(true);
269        }
270        self.client
271            .execute_watch(self.pipeline, self.options, self.target, None)
272            .await
273    }
274}
275
276#[action_impl(sync = crate::sync::SessionChangeStream<ChangeStreamEvent<T>>)]
277impl<'a, T: DeserializeOwned + Unpin + Send + Sync> Action for Watch<'a, T, ExplicitSession<'a>> {
278    type Future = WatchSessionFuture;
279
280    async fn execute(mut self) -> Result<SessionChangeStream<ChangeStreamEvent<T>>> {
281        if self.cluster {
282            self.options
283                .get_or_insert_with(Default::default)
284                .all_changes_for_cluster = Some(true);
285        }
286        self.client
287            .execute_watch_with_session(
288                self.pipeline,
289                self.options,
290                self.target,
291                None,
292                self.session.0,
293            )
294            .await
295    }
296}