mongodb/change_stream/event.rs
1//! Contains the types related to a `ChangeStream` event.
2#[cfg(test)]
3use std::convert::TryInto;
4
5use crate::{cursor::CursorSpecification, options::ChangeStreamOptions};
6
7use crate::bson::{DateTime, Document, RawBson, RawDocumentBuf, Timestamp};
8#[cfg(test)]
9use crate::{bson::Bson, bson_compat::RawError};
10use serde::{Deserialize, Serialize};
11
12/// An opaque token used for resuming an interrupted
13/// [`ChangeStream`](crate::change_stream::ChangeStream).
14///
15/// When starting a new change stream,
16/// [`crate::action::Watch::start_after`] and
17/// [`crate::action::Watch::resume_after`] fields can be specified
18/// with instances of `ResumeToken`.
19///
20/// See the documentation
21/// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for more
22/// information on resume tokens.
23#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
24pub struct ResumeToken(pub(crate) RawBson);
25
26impl ResumeToken {
27 pub(crate) fn initial(
28 options: Option<&ChangeStreamOptions>,
29 spec: &CursorSpecification,
30 ) -> Option<ResumeToken> {
31 match &spec.post_batch_resume_token {
32 // Token from initial response from `aggregate`
33 Some(token) if spec.initial_buffer.is_empty() => Some(token.clone()),
34 // Token from options passed to `watch`
35 _ => options
36 .and_then(|o| o.start_after.as_ref().or(o.resume_after.as_ref()))
37 .cloned(),
38 }
39 }
40
41 pub(crate) fn from_raw(doc: Option<RawDocumentBuf>) -> Option<ResumeToken> {
42 doc.map(|doc| ResumeToken(RawBson::Document(doc)))
43 }
44
45 #[cfg(test)]
46 pub(crate) fn parsed(self) -> std::result::Result<Bson, RawError> {
47 self.0.try_into()
48 }
49}
50
51/// A `ChangeStreamEvent` represents a
52/// [change event](https://www.mongodb.com/docs/manual/reference/change-events/) in the associated change stream.
53#[derive(Debug, Serialize, Deserialize, PartialEq)]
54#[serde(rename_all = "camelCase")]
55#[non_exhaustive]
56pub struct ChangeStreamEvent<T> {
57 /// An opaque token for use when resuming an interrupted `ChangeStream`.
58 ///
59 /// See the documentation
60 /// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for
61 /// more information on resume tokens.
62 ///
63 /// Also see the documentation on [resuming a change
64 /// stream](https://www.mongodb.com/docs/manual/changeStreams/#resume-a-change-stream).
65 #[serde(rename = "_id")]
66 pub id: ResumeToken,
67
68 /// Describes the type of operation represented in this change notification.
69 pub operation_type: OperationType,
70
71 /// Identifies the collection or database on which the event occurred.
72 pub ns: Option<ChangeNamespace>,
73
74 /// The type of the newly created object. Only included for `OperationType::Create`.
75 pub ns_type: Option<ChangeNamespaceType>,
76
77 /// The new name for the `ns` collection. Only included for `OperationType::Rename`.
78 pub to: Option<ChangeNamespace>,
79
80 /// The identifier for the session associated with the transaction.
81 /// Only present if the operation is part of a multi-document transaction.
82 pub lsid: Option<Document>,
83
84 /// Together with the lsid, a number that helps uniquely identify a transaction.
85 /// Only present if the operation is part of a multi-document transaction.
86 pub txn_number: Option<i64>,
87
88 /// A `Document` that contains the `_id` of the document created or modified by the `insert`,
89 /// `replace`, `delete`, `update` operations (i.e. CRUD operations). For sharded collections,
90 /// also displays the full shard key for the document. The `_id` field is not repeated if it is
91 /// already a part of the shard key.
92 pub document_key: Option<Document>,
93
94 /// A description of the fields that were updated or removed by the update operation.
95 /// Only specified if `operation_type` is `OperationType::Update`.
96 pub update_description: Option<UpdateDescription>,
97
98 /// The cluster time at which the change occurred.
99 pub cluster_time: Option<Timestamp>,
100
101 /// The wall time from the mongod that the change event originated from.
102 pub wall_time: Option<DateTime>,
103
104 /// The `Document` created or modified by the `insert`, `replace`, `delete`, `update`
105 /// operations (i.e. CRUD operations).
106 ///
107 /// For `insert` and `replace` operations, this represents the new document created by the
108 /// operation. For `delete` operations, this field is `None`.
109 ///
110 /// For `update` operations, this field only appears if you configured the change stream with
111 /// [`full_document`](crate::action::Watch::full_document) set to
112 /// [`UpdateLookup`](crate::options::FullDocumentType::UpdateLookup). This field then
113 /// represents the most current majority-committed version of the document modified by the
114 /// update operation.
115 pub full_document: Option<T>,
116
117 /// Contains the pre-image of the modified or deleted document if the pre-image is available
118 /// for the change event and either `Required` or `WhenAvailable` was specified for the
119 /// [`full_document_before_change`](
120 /// crate::action::Watch::full_document_before_change) option when creating the
121 /// change stream. If `WhenAvailable` was specified but the pre-image is unavailable, this
122 /// will be explicitly set to `None`.
123 pub full_document_before_change: Option<T>,
124}
125
126/// Describes which fields have been updated or removed from a document.
127#[derive(Debug, Serialize, Deserialize, PartialEq)]
128#[serde(rename_all = "camelCase")]
129#[non_exhaustive]
130pub struct UpdateDescription {
131 /// A `Document` containing key:value pairs of names of the fields that were changed, and the
132 /// new value for those fields.
133 pub updated_fields: Document,
134
135 /// An array of field names that were removed from the `Document`.
136 pub removed_fields: Vec<String>,
137
138 /// Arrays that were truncated in the `Document`.
139 pub truncated_arrays: Option<Vec<TruncatedArray>>,
140
141 /// When an update event reports changes involving ambiguous fields, the disambiguatedPaths
142 /// document provides the path key with an array listing each path component.
143 /// Note: The disambiguatedPaths field is only available on change streams started with the
144 /// showExpandedEvents option
145 pub disambiguated_paths: Option<Document>,
146}
147
148/// Describes an array that has been truncated.
149#[derive(Debug, Serialize, Deserialize, PartialEq)]
150#[serde(rename_all = "camelCase")]
151#[non_exhaustive]
152pub struct TruncatedArray {
153 /// The field path of the array.
154 pub field: String,
155
156 /// The new size of the array.
157 pub new_size: i32,
158}
159
160/// The operation type represented in a given change notification.
161#[derive(Debug, Clone, PartialEq, Eq)]
162#[non_exhaustive]
163pub enum OperationType {
164 /// See [insert-event](https://www.mongodb.com/docs/manual/reference/change-events/#insert-event)
165 Insert,
166
167 /// See [update-event](https://www.mongodb.com/docs/manual/reference/change-events/#update-event)
168 Update,
169
170 /// See [replace-event](https://www.mongodb.com/docs/manual/reference/change-events/#replace-event)
171 Replace,
172
173 /// See [delete-event](https://www.mongodb.com/docs/manual/reference/change-events/#delete-event)
174 Delete,
175
176 /// See [drop-event](https://www.mongodb.com/docs/manual/reference/change-events/#drop-event)
177 Drop,
178
179 /// See [rename-event](https://www.mongodb.com/docs/manual/reference/change-events/#rename-event)
180 Rename,
181
182 /// See [dropdatabase-event](https://www.mongodb.com/docs/manual/reference/change-events/#dropdatabase-event)
183 DropDatabase,
184
185 /// See [invalidate-event](https://www.mongodb.com/docs/manual/reference/change-events/#invalidate-event)
186 Invalidate,
187
188 /// A catch-all for future event types.
189 Other(String),
190}
191
192#[derive(Serialize, Deserialize)]
193#[serde(rename_all = "camelCase")]
194enum OperationTypeHelper {
195 Insert,
196 Update,
197 Replace,
198 Delete,
199 Drop,
200 Rename,
201 DropDatabase,
202 Invalidate,
203}
204
205#[derive(Serialize, Deserialize)]
206#[serde(untagged)]
207enum OperationTypeWrapper<'a> {
208 Known(OperationTypeHelper),
209 Unknown(&'a str),
210}
211
212impl<'a> From<&'a OperationType> for OperationTypeWrapper<'a> {
213 fn from(src: &'a OperationType) -> Self {
214 match src {
215 OperationType::Insert => Self::Known(OperationTypeHelper::Insert),
216 OperationType::Update => Self::Known(OperationTypeHelper::Update),
217 OperationType::Replace => Self::Known(OperationTypeHelper::Replace),
218 OperationType::Delete => Self::Known(OperationTypeHelper::Delete),
219 OperationType::Drop => Self::Known(OperationTypeHelper::Drop),
220 OperationType::Rename => Self::Known(OperationTypeHelper::Rename),
221 OperationType::DropDatabase => Self::Known(OperationTypeHelper::DropDatabase),
222 OperationType::Invalidate => Self::Known(OperationTypeHelper::Invalidate),
223 OperationType::Other(s) => Self::Unknown(s),
224 }
225 }
226}
227
228impl From<OperationTypeWrapper<'_>> for OperationType {
229 fn from(src: OperationTypeWrapper) -> Self {
230 match src {
231 OperationTypeWrapper::Known(h) => match h {
232 OperationTypeHelper::Insert => Self::Insert,
233 OperationTypeHelper::Update => Self::Update,
234 OperationTypeHelper::Replace => Self::Replace,
235 OperationTypeHelper::Delete => Self::Delete,
236 OperationTypeHelper::Drop => Self::Drop,
237 OperationTypeHelper::Rename => Self::Rename,
238 OperationTypeHelper::DropDatabase => Self::DropDatabase,
239 OperationTypeHelper::Invalidate => Self::Invalidate,
240 },
241 OperationTypeWrapper::Unknown(s) => Self::Other(s.to_string()),
242 }
243 }
244}
245
246impl<'de> Deserialize<'de> for OperationType {
247 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
248 where
249 D: serde::Deserializer<'de>,
250 {
251 OperationTypeWrapper::deserialize(deserializer).map(OperationType::from)
252 }
253}
254
255impl Serialize for OperationType {
256 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
257 where
258 S: serde::Serializer,
259 {
260 OperationTypeWrapper::serialize(&self.into(), serializer)
261 }
262}
263
264/// Identifies the collection or database on which an event occurred.
265#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
266#[non_exhaustive]
267pub struct ChangeNamespace {
268 /// The name of the database in which the change occurred.
269 pub db: String,
270
271 /// The name of the collection in which the change occurred.
272 pub coll: Option<String>,
273}
274
275/// Identifies the type of object for a `create` event.
276#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
277#[non_exhaustive]
278pub enum ChangeNamespaceType {
279 /// A collection with no special options set.
280 Collection,
281 /// A timeseries collection.
282 Timeseries,
283 /// A view collection.
284 View,
285 /// Forward compatibility fallthrough.
286 #[serde(untagged)]
287 Other(String),
288}