mongodb/change_stream/
event.rs

1//! Contains the types related to a `ChangeStream` event.
2#[cfg(test)]
3use std::convert::TryInto;
4
5use crate::{cursor::CursorSpecification, options::ChangeStreamOptions};
6
7use crate::bson::{DateTime, Document, RawBson, RawDocumentBuf, Timestamp};
8#[cfg(test)]
9use crate::{bson::Bson, bson_compat::RawError};
10use serde::{Deserialize, Serialize};
11
12/// An opaque token used for resuming an interrupted
13/// [`ChangeStream`](crate::change_stream::ChangeStream).
14///
15/// When starting a new change stream,
16/// [`crate::action::Watch::start_after`] and
17/// [`crate::action::Watch::resume_after`] fields can be specified
18/// with instances of `ResumeToken`.
19///
20/// See the documentation
21/// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for more
22/// information on resume tokens.
23#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
24pub struct ResumeToken(pub(crate) RawBson);
25
26impl ResumeToken {
27    pub(crate) fn initial(
28        options: Option<&ChangeStreamOptions>,
29        spec: &CursorSpecification,
30    ) -> Option<ResumeToken> {
31        match &spec.post_batch_resume_token {
32            // Token from initial response from `aggregate`
33            Some(token) if spec.initial_buffer.is_empty() => Some(token.clone()),
34            // Token from options passed to `watch`
35            _ => options
36                .and_then(|o| o.start_after.as_ref().or(o.resume_after.as_ref()))
37                .cloned(),
38        }
39    }
40
41    pub(crate) fn from_raw(doc: Option<RawDocumentBuf>) -> Option<ResumeToken> {
42        doc.map(|doc| ResumeToken(RawBson::Document(doc)))
43    }
44
45    #[cfg(test)]
46    pub(crate) fn parsed(self) -> std::result::Result<Bson, RawError> {
47        self.0.try_into()
48    }
49}
50
51/// A `ChangeStreamEvent` represents a
52/// [change event](https://www.mongodb.com/docs/manual/reference/change-events/) in the associated change stream.
53#[derive(Debug, Serialize, Deserialize, PartialEq)]
54#[serde(rename_all = "camelCase")]
55#[non_exhaustive]
56pub struct ChangeStreamEvent<T> {
57    /// An opaque token for use when resuming an interrupted `ChangeStream`.
58    ///
59    /// See the documentation
60    /// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for
61    /// more information on resume tokens.
62    ///
63    /// Also see the documentation on [resuming a change
64    /// stream](https://www.mongodb.com/docs/manual/changeStreams/#resume-a-change-stream).
65    #[serde(rename = "_id")]
66    pub id: ResumeToken,
67
68    /// Describes the type of operation represented in this change notification.
69    pub operation_type: OperationType,
70
71    /// Identifies the collection or database on which the event occurred.
72    pub ns: Option<ChangeNamespace>,
73
74    /// The type of the newly created object.  Only included for `OperationType::Create`.
75    pub ns_type: Option<ChangeNamespaceType>,
76
77    /// The new name for the `ns` collection.  Only included for `OperationType::Rename`.
78    pub to: Option<ChangeNamespace>,
79
80    /// The identifier for the session associated with the transaction.
81    /// Only present if the operation is part of a multi-document transaction.
82    pub lsid: Option<Document>,
83
84    /// Together with the lsid, a number that helps uniquely identify a transaction.
85    /// Only present if the operation is part of a multi-document transaction.
86    pub txn_number: Option<i64>,
87
88    /// A `Document` that contains the `_id` of the document created or modified by the `insert`,
89    /// `replace`, `delete`, `update` operations (i.e. CRUD operations). For sharded collections,
90    /// also displays the full shard key for the document. The `_id` field is not repeated if it is
91    /// already a part of the shard key.
92    pub document_key: Option<Document>,
93
94    /// A description of the fields that were updated or removed by the update operation.
95    /// Only specified if `operation_type` is `OperationType::Update`.
96    pub update_description: Option<UpdateDescription>,
97
98    /// The cluster time at which the change occurred.
99    pub cluster_time: Option<Timestamp>,
100
101    /// The wall time from the mongod that the change event originated from.
102    pub wall_time: Option<DateTime>,
103
104    /// The `Document` created or modified by the `insert`, `replace`, `delete`, `update`
105    /// operations (i.e. CRUD operations).
106    ///
107    /// For `insert` and `replace` operations, this represents the new document created by the
108    /// operation.  For `delete` operations, this field is `None`.
109    ///
110    /// For `update` operations, this field only appears if you configured the change stream with
111    /// [`full_document`](crate::action::Watch::full_document) set to
112    /// [`UpdateLookup`](crate::options::FullDocumentType::UpdateLookup). This field then
113    /// represents the most current majority-committed version of the document modified by the
114    /// update operation.
115    pub full_document: Option<T>,
116
117    /// Contains the pre-image of the modified or deleted document if the pre-image is available
118    /// for the change event and either `Required` or `WhenAvailable` was specified for the
119    /// [`full_document_before_change`](
120    /// crate::action::Watch::full_document_before_change) option when creating the
121    /// change stream. If `WhenAvailable` was specified but the pre-image is unavailable, this
122    /// will be explicitly set to `None`.
123    pub full_document_before_change: Option<T>,
124}
125
126/// Describes which fields have been updated or removed from a document.
127#[derive(Debug, Serialize, Deserialize, PartialEq)]
128#[serde(rename_all = "camelCase")]
129#[non_exhaustive]
130pub struct UpdateDescription {
131    /// A `Document` containing key:value pairs of names of the fields that were changed, and the
132    /// new value for those fields.
133    pub updated_fields: Document,
134
135    /// An array of field names that were removed from the `Document`.
136    pub removed_fields: Vec<String>,
137
138    /// Arrays that were truncated in the `Document`.
139    pub truncated_arrays: Option<Vec<TruncatedArray>>,
140
141    /// When an update event reports changes involving ambiguous fields, the disambiguatedPaths
142    /// document provides the path key with an array listing each path component.
143    /// Note: The disambiguatedPaths field is only available on change streams started with the
144    /// showExpandedEvents option
145    pub disambiguated_paths: Option<Document>,
146}
147
148/// Describes an array that has been truncated.
149#[derive(Debug, Serialize, Deserialize, PartialEq)]
150#[serde(rename_all = "camelCase")]
151#[non_exhaustive]
152pub struct TruncatedArray {
153    /// The field path of the array.
154    pub field: String,
155
156    /// The new size of the array.
157    pub new_size: i32,
158}
159
160/// The operation type represented in a given change notification.
161#[derive(Debug, Clone, PartialEq, Eq)]
162#[non_exhaustive]
163pub enum OperationType {
164    /// See [insert-event](https://www.mongodb.com/docs/manual/reference/change-events/#insert-event)
165    Insert,
166
167    /// See [update-event](https://www.mongodb.com/docs/manual/reference/change-events/#update-event)
168    Update,
169
170    /// See [replace-event](https://www.mongodb.com/docs/manual/reference/change-events/#replace-event)
171    Replace,
172
173    /// See [delete-event](https://www.mongodb.com/docs/manual/reference/change-events/#delete-event)
174    Delete,
175
176    /// See [drop-event](https://www.mongodb.com/docs/manual/reference/change-events/#drop-event)
177    Drop,
178
179    /// See [rename-event](https://www.mongodb.com/docs/manual/reference/change-events/#rename-event)
180    Rename,
181
182    /// See [dropdatabase-event](https://www.mongodb.com/docs/manual/reference/change-events/#dropdatabase-event)
183    DropDatabase,
184
185    /// See [invalidate-event](https://www.mongodb.com/docs/manual/reference/change-events/#invalidate-event)
186    Invalidate,
187
188    /// A catch-all for future event types.
189    Other(String),
190}
191
192#[derive(Serialize, Deserialize)]
193#[serde(rename_all = "camelCase")]
194enum OperationTypeHelper {
195    Insert,
196    Update,
197    Replace,
198    Delete,
199    Drop,
200    Rename,
201    DropDatabase,
202    Invalidate,
203}
204
205#[derive(Serialize, Deserialize)]
206#[serde(untagged)]
207enum OperationTypeWrapper<'a> {
208    Known(OperationTypeHelper),
209    Unknown(&'a str),
210}
211
212impl<'a> From<&'a OperationType> for OperationTypeWrapper<'a> {
213    fn from(src: &'a OperationType) -> Self {
214        match src {
215            OperationType::Insert => Self::Known(OperationTypeHelper::Insert),
216            OperationType::Update => Self::Known(OperationTypeHelper::Update),
217            OperationType::Replace => Self::Known(OperationTypeHelper::Replace),
218            OperationType::Delete => Self::Known(OperationTypeHelper::Delete),
219            OperationType::Drop => Self::Known(OperationTypeHelper::Drop),
220            OperationType::Rename => Self::Known(OperationTypeHelper::Rename),
221            OperationType::DropDatabase => Self::Known(OperationTypeHelper::DropDatabase),
222            OperationType::Invalidate => Self::Known(OperationTypeHelper::Invalidate),
223            OperationType::Other(s) => Self::Unknown(s),
224        }
225    }
226}
227
228impl From<OperationTypeWrapper<'_>> for OperationType {
229    fn from(src: OperationTypeWrapper) -> Self {
230        match src {
231            OperationTypeWrapper::Known(h) => match h {
232                OperationTypeHelper::Insert => Self::Insert,
233                OperationTypeHelper::Update => Self::Update,
234                OperationTypeHelper::Replace => Self::Replace,
235                OperationTypeHelper::Delete => Self::Delete,
236                OperationTypeHelper::Drop => Self::Drop,
237                OperationTypeHelper::Rename => Self::Rename,
238                OperationTypeHelper::DropDatabase => Self::DropDatabase,
239                OperationTypeHelper::Invalidate => Self::Invalidate,
240            },
241            OperationTypeWrapper::Unknown(s) => Self::Other(s.to_string()),
242        }
243    }
244}
245
246impl<'de> Deserialize<'de> for OperationType {
247    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
248    where
249        D: serde::Deserializer<'de>,
250    {
251        OperationTypeWrapper::deserialize(deserializer).map(OperationType::from)
252    }
253}
254
255impl Serialize for OperationType {
256    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
257    where
258        S: serde::Serializer,
259    {
260        OperationTypeWrapper::serialize(&self.into(), serializer)
261    }
262}
263
264/// Identifies the collection or database on which an event occurred.
265#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
266#[non_exhaustive]
267pub struct ChangeNamespace {
268    /// The name of the database in which the change occurred.
269    pub db: String,
270
271    /// The name of the collection in which the change occurred.
272    pub coll: Option<String>,
273}
274
275/// Identifies the type of object for a `create` event.
276#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
277#[non_exhaustive]
278pub enum ChangeNamespaceType {
279    /// A collection with no special options set.
280    Collection,
281    /// A timeseries collection.
282    Timeseries,
283    /// A view collection.
284    View,
285    /// Forward compatibility fallthrough.
286    #[serde(untagged)]
287    Other(String),
288}