mongodb/change_stream/
options.rs

1//! Contains options for ChangeStreams.
2use macro_magic::export_tokens;
3use serde::{Deserialize, Serialize};
4use serde_with::skip_serializing_none;
5use std::time::Duration;
6use typed_builder::TypedBuilder;
7
8use crate::{
9    bson::{Bson, Timestamp},
10    change_stream::event::ResumeToken,
11    collation::Collation,
12    concern::ReadConcern,
13    options::AggregateOptions,
14    selection_criteria::SelectionCriteria,
15};
16
17/// These are the valid options that can be passed to the `watch` method for creating a
18/// [`ChangeStream`](crate::change_stream::ChangeStream).
19#[skip_serializing_none]
20#[derive(Clone, Debug, Default, Deserialize, Serialize, TypedBuilder)]
21#[builder(field_defaults(default, setter(into)))]
22#[serde(rename_all = "camelCase")]
23#[non_exhaustive]
24#[export_tokens]
25pub struct ChangeStreamOptions {
26    #[rustfmt::skip]
27    /// Configures how the
28    /// [`ChangeStreamEvent::full_document`](crate::change_stream::event::ChangeStreamEvent::full_document)
29    /// field will be populated. By default, the field will be empty for updates.
30    pub full_document: Option<FullDocumentType>,
31
32    /// Configures how the
33    /// [`ChangeStreamEvent::full_document_before_change`](
34    /// crate::change_stream::event::ChangeStreamEvent::full_document_before_change) field will be
35    /// populated.  By default, the field will be empty for updates.
36    pub full_document_before_change: Option<FullDocumentBeforeChangeType>,
37
38    /// Specifies the logical starting point for the new change stream. Note that if a watched
39    /// collection is dropped and recreated or newly renamed, `start_after` should be set instead.
40    /// `resume_after` and `start_after` cannot be set simultaneously.
41    ///
42    /// For more information on resuming a change stream see the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-after)
43    pub resume_after: Option<ResumeToken>,
44
45    /// The change stream will only provide changes that occurred at or after the specified
46    /// timestamp. Any command run against the server will return an operation time that can be
47    /// used here.
48    pub start_at_operation_time: Option<Timestamp>,
49
50    /// Takes a resume token and starts a new change stream returning the first notification after
51    /// the token. This will allow users to watch collections that have been dropped and
52    /// recreated or newly renamed collections without missing any notifications.
53    ///
54    /// This feature is only available on MongoDB 4.2+.
55    ///
56    /// See the documentation [here](https://www.mongodb.com/docs/master/changeStreams/#change-stream-start-after) for more
57    /// information.
58    pub start_after: Option<ResumeToken>,
59
60    /// If `true`, the change stream will monitor all changes for the given cluster.
61    #[builder(setter(skip))]
62    pub(crate) all_changes_for_cluster: Option<bool>,
63
64    /// The maximum amount of time for the server to wait on new documents to satisfy a change
65    /// stream query.
66    #[serde(skip_serializing)]
67    pub max_await_time: Option<Duration>,
68
69    /// The number of documents to return per batch.
70    #[serde(skip_serializing)]
71    pub batch_size: Option<u32>,
72
73    /// Specifies a collation.
74    #[serde(skip_serializing)]
75    pub collation: Option<Collation>,
76
77    /// The read concern to use for the operation.
78    ///
79    /// If none is specified, the read concern defined on the object executing this operation will
80    /// be used.
81    #[serde(skip_serializing)]
82    pub read_concern: Option<ReadConcern>,
83
84    /// The criteria used to select a server for this operation.
85    ///
86    /// If none is specified, the selection criteria defined on the object executing this operation
87    /// will be used.
88    #[serde(skip_serializing)]
89    pub selection_criteria: Option<SelectionCriteria>,
90
91    /// Enables the server to send the 'expanded' list of change stream events.
92    pub show_expanded_events: Option<bool>,
93
94    /// Tags the query with an arbitrary [`Bson`] value to help trace the operation through the
95    /// database profiler, currentOp and logs.
96    ///
97    /// The comment can be any [`Bson`] value on server versions 4.4+. On lower server versions,
98    /// the comment must be a [`Bson::String`] value.
99    pub comment: Option<Bson>,
100}
101
102impl ChangeStreamOptions {
103    pub(crate) fn aggregate_options(&self) -> AggregateOptions {
104        AggregateOptions::builder()
105            .batch_size(self.batch_size)
106            .collation(self.collation.clone())
107            .max_await_time(self.max_await_time)
108            .read_concern(self.read_concern.clone())
109            .selection_criteria(self.selection_criteria.clone())
110            .comment(self.comment.clone())
111            .build()
112    }
113}
114
115/// Describes the modes for configuring the
116/// [`ChangeStreamEvent::full_document`](
117/// crate::change_stream::event::ChangeStreamEvent::full_document) field.
118#[derive(Clone, Debug, Deserialize, Serialize)]
119#[serde(rename_all = "camelCase")]
120#[non_exhaustive]
121pub enum FullDocumentType {
122    /// The field will be populated with a copy of the entire document that was updated.
123    UpdateLookup,
124
125    /// The field will be populated for replace and update change events if the post-image for this
126    /// event is available.
127    WhenAvailable,
128
129    /// The same behavior as `WhenAvailable` except that an error is raised if the post-image is
130    /// not available.
131    Required,
132
133    /// User-defined other types for forward compatibility.
134    Other(String),
135}
136
137/// Describes the modes for configuring the
138/// [`ChangeStreamEvent::full_document_before_change`](
139/// crate::change_stream::event::ChangeStreamEvent::full_document_before_change) field.
140#[derive(Clone, Debug, Deserialize, Serialize)]
141#[serde(rename_all = "camelCase")]
142#[non_exhaustive]
143pub enum FullDocumentBeforeChangeType {
144    /// The field will be populated for replace, update, and delete change events if the pre-image
145    /// for this event is available.
146    WhenAvailable,
147
148    /// The same behavior as `WhenAvailable` except that an error is raised if the pre-image is
149    /// not available.
150    Required,
151
152    /// Do not send a value.
153    Off,
154
155    /// User-defined other types for forward compatibility.
156    Other(String),
157}