mongodb/change_stream/
event.rs

1//! Contains the types related to a `ChangeStream` event.
2#[cfg(test)]
3use std::convert::TryInto;
4
5use crate::{cursor::CursorSpecification, options::ChangeStreamOptions};
6
7#[cfg(test)]
8use bson::Bson;
9use bson::{DateTime, Document, RawBson, RawDocumentBuf, Timestamp};
10use serde::{Deserialize, Serialize};
11
12/// An opaque token used for resuming an interrupted
13/// [`ChangeStream`](crate::change_stream::ChangeStream).
14///
15/// When starting a new change stream,
16/// [`crate::action::Watch::start_after`] and
17/// [`crate::action::Watch::resume_after`] fields can be specified
18/// with instances of `ResumeToken`.
19///
20/// See the documentation
21/// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for more
22/// information on resume tokens.
23#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
24pub struct ResumeToken(pub(crate) RawBson);
25
26impl ResumeToken {
27    pub(crate) fn initial(
28        options: Option<&ChangeStreamOptions>,
29        spec: &CursorSpecification,
30    ) -> Option<ResumeToken> {
31        match &spec.post_batch_resume_token {
32            // Token from initial response from `aggregate`
33            Some(token) if spec.initial_buffer.is_empty() => Some(token.clone()),
34            // Token from options passed to `watch`
35            _ => options
36                .and_then(|o| o.start_after.as_ref().or(o.resume_after.as_ref()))
37                .cloned(),
38        }
39    }
40
41    pub(crate) fn from_raw(doc: Option<RawDocumentBuf>) -> Option<ResumeToken> {
42        doc.map(|doc| ResumeToken(RawBson::Document(doc)))
43    }
44
45    #[cfg(test)]
46    pub(crate) fn parsed(self) -> std::result::Result<Bson, bson::raw::Error> {
47        self.0.try_into()
48    }
49}
50
51/// A `ChangeStreamEvent` represents a
52/// [change event](https://www.mongodb.com/docs/manual/reference/change-events/) in the associated change stream.
53#[derive(Debug, Serialize, Deserialize, PartialEq)]
54#[serde(rename_all = "camelCase")]
55#[non_exhaustive]
56pub struct ChangeStreamEvent<T> {
57    /// An opaque token for use when resuming an interrupted `ChangeStream`.
58    ///
59    /// See the documentation
60    /// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for
61    /// more information on resume tokens.
62    ///
63    /// Also see the documentation on [resuming a change
64    /// stream](https://www.mongodb.com/docs/manual/changeStreams/#resume-a-change-stream).
65    #[serde(rename = "_id")]
66    pub id: ResumeToken,
67
68    /// Describes the type of operation represented in this change notification.
69    pub operation_type: OperationType,
70
71    /// Identifies the collection or database on which the event occurred.
72    pub ns: Option<ChangeNamespace>,
73
74    /// The new name for the `ns` collection.  Only included for `OperationType::Rename`.
75    pub to: Option<ChangeNamespace>,
76
77    /// The identifier for the session associated with the transaction.
78    /// Only present if the operation is part of a multi-document transaction.
79    pub lsid: Option<Document>,
80
81    /// Together with the lsid, a number that helps uniquely identify a transaction.
82    /// Only present if the operation is part of a multi-document transaction.
83    pub txn_number: Option<i64>,
84
85    /// A `Document` that contains the `_id` of the document created or modified by the `insert`,
86    /// `replace`, `delete`, `update` operations (i.e. CRUD operations). For sharded collections,
87    /// also displays the full shard key for the document. The `_id` field is not repeated if it is
88    /// already a part of the shard key.
89    pub document_key: Option<Document>,
90
91    /// A description of the fields that were updated or removed by the update operation.
92    /// Only specified if `operation_type` is `OperationType::Update`.
93    pub update_description: Option<UpdateDescription>,
94
95    /// The cluster time at which the change occurred.
96    pub cluster_time: Option<Timestamp>,
97
98    /// The wall time from the mongod that the change event originated from.
99    pub wall_time: Option<DateTime>,
100
101    /// The `Document` created or modified by the `insert`, `replace`, `delete`, `update`
102    /// operations (i.e. CRUD operations).
103    ///
104    /// For `insert` and `replace` operations, this represents the new document created by the
105    /// operation.  For `delete` operations, this field is `None`.
106    ///
107    /// For `update` operations, this field only appears if you configured the change stream with
108    /// [`full_document`](crate::action::Watch::full_document) set to
109    /// [`UpdateLookup`](crate::options::FullDocumentType::UpdateLookup). This field then
110    /// represents the most current majority-committed version of the document modified by the
111    /// update operation.
112    pub full_document: Option<T>,
113
114    /// Contains the pre-image of the modified or deleted document if the pre-image is available
115    /// for the change event and either `Required` or `WhenAvailable` was specified for the
116    /// [`full_document_before_change`](
117    /// crate::action::Watch::full_document_before_change) option when creating the
118    /// change stream. If `WhenAvailable` was specified but the pre-image is unavailable, this
119    /// will be explicitly set to `None`.
120    pub full_document_before_change: Option<T>,
121}
122
123/// Describes which fields have been updated or removed from a document.
124#[derive(Debug, Serialize, Deserialize, PartialEq)]
125#[serde(rename_all = "camelCase")]
126#[non_exhaustive]
127pub struct UpdateDescription {
128    /// A `Document` containing key:value pairs of names of the fields that were changed, and the
129    /// new value for those fields.
130    pub updated_fields: Document,
131
132    /// An array of field names that were removed from the `Document`.
133    pub removed_fields: Vec<String>,
134
135    /// Arrays that were truncated in the `Document`.
136    pub truncated_arrays: Option<Vec<TruncatedArray>>,
137
138    /// When an update event reports changes involving ambiguous fields, the disambiguatedPaths
139    /// document provides the path key with an array listing each path component.
140    /// Note: The disambiguatedPaths field is only available on change streams started with the
141    /// showExpandedEvents option
142    pub disambiguated_paths: Option<Document>,
143}
144
145/// Describes an array that has been truncated.
146#[derive(Debug, Serialize, Deserialize, PartialEq)]
147#[serde(rename_all = "camelCase")]
148#[non_exhaustive]
149pub struct TruncatedArray {
150    /// The field path of the array.
151    pub field: String,
152
153    /// The new size of the array.
154    pub new_size: i32,
155}
156
157/// The operation type represented in a given change notification.
158#[derive(Debug, Clone, PartialEq, Eq)]
159#[non_exhaustive]
160pub enum OperationType {
161    /// See [insert-event](https://www.mongodb.com/docs/manual/reference/change-events/#insert-event)
162    Insert,
163
164    /// See [update-event](https://www.mongodb.com/docs/manual/reference/change-events/#update-event)
165    Update,
166
167    /// See [replace-event](https://www.mongodb.com/docs/manual/reference/change-events/#replace-event)
168    Replace,
169
170    /// See [delete-event](https://www.mongodb.com/docs/manual/reference/change-events/#delete-event)
171    Delete,
172
173    /// See [drop-event](https://www.mongodb.com/docs/manual/reference/change-events/#drop-event)
174    Drop,
175
176    /// See [rename-event](https://www.mongodb.com/docs/manual/reference/change-events/#rename-event)
177    Rename,
178
179    /// See [dropdatabase-event](https://www.mongodb.com/docs/manual/reference/change-events/#dropdatabase-event)
180    DropDatabase,
181
182    /// See [invalidate-event](https://www.mongodb.com/docs/manual/reference/change-events/#invalidate-event)
183    Invalidate,
184
185    /// A catch-all for future event types.
186    Other(String),
187}
188
189#[derive(Serialize, Deserialize)]
190#[serde(rename_all = "camelCase")]
191enum OperationTypeHelper {
192    Insert,
193    Update,
194    Replace,
195    Delete,
196    Drop,
197    Rename,
198    DropDatabase,
199    Invalidate,
200}
201
202#[derive(Serialize, Deserialize)]
203#[serde(untagged)]
204enum OperationTypeWrapper<'a> {
205    Known(OperationTypeHelper),
206    Unknown(&'a str),
207}
208
209impl<'a> From<&'a OperationType> for OperationTypeWrapper<'a> {
210    fn from(src: &'a OperationType) -> Self {
211        match src {
212            OperationType::Insert => Self::Known(OperationTypeHelper::Insert),
213            OperationType::Update => Self::Known(OperationTypeHelper::Update),
214            OperationType::Replace => Self::Known(OperationTypeHelper::Replace),
215            OperationType::Delete => Self::Known(OperationTypeHelper::Delete),
216            OperationType::Drop => Self::Known(OperationTypeHelper::Drop),
217            OperationType::Rename => Self::Known(OperationTypeHelper::Rename),
218            OperationType::DropDatabase => Self::Known(OperationTypeHelper::DropDatabase),
219            OperationType::Invalidate => Self::Known(OperationTypeHelper::Invalidate),
220            OperationType::Other(s) => Self::Unknown(s),
221        }
222    }
223}
224
225impl From<OperationTypeWrapper<'_>> for OperationType {
226    fn from(src: OperationTypeWrapper) -> Self {
227        match src {
228            OperationTypeWrapper::Known(h) => match h {
229                OperationTypeHelper::Insert => Self::Insert,
230                OperationTypeHelper::Update => Self::Update,
231                OperationTypeHelper::Replace => Self::Replace,
232                OperationTypeHelper::Delete => Self::Delete,
233                OperationTypeHelper::Drop => Self::Drop,
234                OperationTypeHelper::Rename => Self::Rename,
235                OperationTypeHelper::DropDatabase => Self::DropDatabase,
236                OperationTypeHelper::Invalidate => Self::Invalidate,
237            },
238            OperationTypeWrapper::Unknown(s) => Self::Other(s.to_string()),
239        }
240    }
241}
242
243impl<'de> Deserialize<'de> for OperationType {
244    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
245    where
246        D: serde::Deserializer<'de>,
247    {
248        OperationTypeWrapper::deserialize(deserializer).map(OperationType::from)
249    }
250}
251
252impl Serialize for OperationType {
253    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
254    where
255        S: serde::Serializer,
256    {
257        OperationTypeWrapper::serialize(&self.into(), serializer)
258    }
259}
260
261/// Identifies the collection or database on which an event occurred.
262#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
263#[non_exhaustive]
264pub struct ChangeNamespace {
265    /// The name of the database in which the change occurred.
266    pub db: String,
267
268    /// The name of the collection in which the change occurred.
269    pub coll: Option<String>,
270}