mongodb/change_stream/options.rs
1//! Contains options for ChangeStreams.
2use macro_magic::export_tokens;
3use serde::{Deserialize, Serialize};
4use serde_with::skip_serializing_none;
5use std::time::Duration;
6use typed_builder::TypedBuilder;
7
8use crate::{
9 bson::{Bson, Timestamp},
10 change_stream::event::ResumeToken,
11 collation::Collation,
12 concern::ReadConcern,
13 options::AggregateOptions,
14 selection_criteria::SelectionCriteria,
15};
16
17/// These are the valid options that can be passed to the `watch` method for creating a
18/// [`ChangeStream`](crate::change_stream::ChangeStream).
19#[skip_serializing_none]
20#[derive(Clone, Debug, Default, Deserialize, Serialize, TypedBuilder)]
21#[builder(field_defaults(default, setter(into)))]
22#[serde(rename_all = "camelCase")]
23#[non_exhaustive]
24#[export_tokens]
25pub struct ChangeStreamOptions {
26 #[rustfmt::skip]
27 /// Configures how the
28 /// [`ChangeStreamEvent::full_document`](crate::change_stream::event::ChangeStreamEvent::full_document)
29 /// field will be populated. By default, the field will be empty for updates.
30 pub full_document: Option<FullDocumentType>,
31
32 /// Configures how the
33 /// [`ChangeStreamEvent::full_document_before_change`](
34 /// crate::change_stream::event::ChangeStreamEvent::full_document_before_change) field will be
35 /// populated. By default, the field will be empty for updates.
36 pub full_document_before_change: Option<FullDocumentBeforeChangeType>,
37
38 /// Specifies the logical starting point for the new change stream. Note that if a watched
39 /// collection is dropped and recreated or newly renamed, `start_after` should be set instead.
40 /// `resume_after` and `start_after` cannot be set simultaneously.
41 ///
42 /// For more information on resuming a change stream see the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-after)
43 pub resume_after: Option<ResumeToken>,
44
45 /// The change stream will only provide changes that occurred at or after the specified
46 /// timestamp. Any command run against the server will return an operation time that can be
47 /// used here.
48 pub start_at_operation_time: Option<Timestamp>,
49
50 /// Takes a resume token and starts a new change stream returning the first notification after
51 /// the token. This will allow users to watch collections that have been dropped and
52 /// recreated or newly renamed collections without missing any notifications.
53 ///
54 /// This feature is only available on MongoDB 4.2+.
55 ///
56 /// See the documentation [here](https://www.mongodb.com/docs/master/changeStreams/#change-stream-start-after) for more
57 /// information.
58 pub start_after: Option<ResumeToken>,
59
60 /// If `true`, the change stream will monitor all changes for the given cluster.
61 #[builder(setter(skip))]
62 pub(crate) all_changes_for_cluster: Option<bool>,
63
64 /// The maximum amount of time for the server to wait on new documents to satisfy a change
65 /// stream query.
66 #[serde(skip_serializing)]
67 pub max_await_time: Option<Duration>,
68
69 /// The number of documents to return per batch.
70 #[serde(skip_serializing)]
71 pub batch_size: Option<u32>,
72
73 /// Specifies a collation.
74 #[serde(skip_serializing)]
75 pub collation: Option<Collation>,
76
77 /// The read concern to use for the operation.
78 ///
79 /// If none is specified, the read concern defined on the object executing this operation will
80 /// be used.
81 #[serde(skip_serializing)]
82 pub read_concern: Option<ReadConcern>,
83
84 /// The criteria used to select a server for this operation.
85 ///
86 /// If none is specified, the selection criteria defined on the object executing this operation
87 /// will be used.
88 #[serde(skip_serializing)]
89 pub selection_criteria: Option<SelectionCriteria>,
90
91 /// Enables the server to send the 'expanded' list of change stream events.
92 pub show_expanded_events: Option<bool>,
93
94 /// Tags the query with an arbitrary [`Bson`] value to help trace the operation through the
95 /// database profiler, currentOp and logs.
96 ///
97 /// The comment can be any [`Bson`] value on server versions 4.4+. On lower server versions,
98 /// the comment must be a [`Bson::String`] value.
99 pub comment: Option<Bson>,
100}
101
102impl ChangeStreamOptions {
103 pub(crate) fn aggregate_options(&self) -> AggregateOptions {
104 AggregateOptions::builder()
105 .batch_size(self.batch_size)
106 .collation(self.collation.clone())
107 .max_await_time(self.max_await_time)
108 .read_concern(self.read_concern.clone())
109 .selection_criteria(self.selection_criteria.clone())
110 .comment(self.comment.clone())
111 .build()
112 }
113}
114
115/// Describes the modes for configuring the
116/// [`ChangeStreamEvent::full_document`](
117/// crate::change_stream::event::ChangeStreamEvent::full_document) field.
118#[derive(Clone, Debug, Deserialize, Serialize)]
119#[serde(rename_all = "camelCase")]
120#[non_exhaustive]
121pub enum FullDocumentType {
122 /// The field will be populated with a copy of the entire document that was updated.
123 UpdateLookup,
124
125 /// The field will be populated for replace and update change events if the post-image for this
126 /// event is available.
127 WhenAvailable,
128
129 /// The same behavior as `WhenAvailable` except that an error is raised if the post-image is
130 /// not available.
131 Required,
132
133 /// User-defined other types for forward compatibility.
134 Other(String),
135}
136
137/// Describes the modes for configuring the
138/// [`ChangeStreamEvent::full_document_before_change`](
139/// crate::change_stream::event::ChangeStreamEvent::full_document_before_change) field.
140#[derive(Clone, Debug, Deserialize, Serialize)]
141#[serde(rename_all = "camelCase")]
142#[non_exhaustive]
143pub enum FullDocumentBeforeChangeType {
144 /// The field will be populated for replace, update, and delete change events if the pre-image
145 /// for this event is available.
146 WhenAvailable,
147
148 /// The same behavior as `WhenAvailable` except that an error is raised if the pre-image is
149 /// not available.
150 Required,
151
152 /// Do not send a value.
153 Off,
154
155 /// User-defined other types for forward compatibility.
156 Other(String),
157}