mongodb/change_stream/event.rs
1//! Contains the types related to a `ChangeStream` event.
2#[cfg(test)]
3use std::convert::TryInto;
4
5use crate::{cursor::CursorSpecification, options::ChangeStreamOptions};
6
7#[cfg(test)]
8use bson::Bson;
9use bson::{DateTime, Document, RawBson, RawDocumentBuf, Timestamp};
10use serde::{Deserialize, Serialize};
11
12/// An opaque token used for resuming an interrupted
13/// [`ChangeStream`](crate::change_stream::ChangeStream).
14///
15/// When starting a new change stream,
16/// [`crate::action::Watch::start_after`] and
17/// [`crate::action::Watch::resume_after`] fields can be specified
18/// with instances of `ResumeToken`.
19///
20/// See the documentation
21/// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for more
22/// information on resume tokens.
23#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
24pub struct ResumeToken(pub(crate) RawBson);
25
26impl ResumeToken {
27 pub(crate) fn initial(
28 options: Option<&ChangeStreamOptions>,
29 spec: &CursorSpecification,
30 ) -> Option<ResumeToken> {
31 match &spec.post_batch_resume_token {
32 // Token from initial response from `aggregate`
33 Some(token) if spec.initial_buffer.is_empty() => Some(token.clone()),
34 // Token from options passed to `watch`
35 _ => options
36 .and_then(|o| o.start_after.as_ref().or(o.resume_after.as_ref()))
37 .cloned(),
38 }
39 }
40
41 pub(crate) fn from_raw(doc: Option<RawDocumentBuf>) -> Option<ResumeToken> {
42 doc.map(|doc| ResumeToken(RawBson::Document(doc)))
43 }
44
45 #[cfg(test)]
46 pub(crate) fn parsed(self) -> std::result::Result<Bson, bson::raw::Error> {
47 self.0.try_into()
48 }
49}
50
51/// A `ChangeStreamEvent` represents a
52/// [change event](https://www.mongodb.com/docs/manual/reference/change-events/) in the associated change stream.
53#[derive(Debug, Serialize, Deserialize, PartialEq)]
54#[serde(rename_all = "camelCase")]
55#[non_exhaustive]
56pub struct ChangeStreamEvent<T> {
57 /// An opaque token for use when resuming an interrupted `ChangeStream`.
58 ///
59 /// See the documentation
60 /// [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-token) for
61 /// more information on resume tokens.
62 ///
63 /// Also see the documentation on [resuming a change
64 /// stream](https://www.mongodb.com/docs/manual/changeStreams/#resume-a-change-stream).
65 #[serde(rename = "_id")]
66 pub id: ResumeToken,
67
68 /// Describes the type of operation represented in this change notification.
69 pub operation_type: OperationType,
70
71 /// Identifies the collection or database on which the event occurred.
72 pub ns: Option<ChangeNamespace>,
73
74 /// The new name for the `ns` collection. Only included for `OperationType::Rename`.
75 pub to: Option<ChangeNamespace>,
76
77 /// The identifier for the session associated with the transaction.
78 /// Only present if the operation is part of a multi-document transaction.
79 pub lsid: Option<Document>,
80
81 /// Together with the lsid, a number that helps uniquely identify a transaction.
82 /// Only present if the operation is part of a multi-document transaction.
83 pub txn_number: Option<i64>,
84
85 /// A `Document` that contains the `_id` of the document created or modified by the `insert`,
86 /// `replace`, `delete`, `update` operations (i.e. CRUD operations). For sharded collections,
87 /// also displays the full shard key for the document. The `_id` field is not repeated if it is
88 /// already a part of the shard key.
89 pub document_key: Option<Document>,
90
91 /// A description of the fields that were updated or removed by the update operation.
92 /// Only specified if `operation_type` is `OperationType::Update`.
93 pub update_description: Option<UpdateDescription>,
94
95 /// The cluster time at which the change occurred.
96 pub cluster_time: Option<Timestamp>,
97
98 /// The wall time from the mongod that the change event originated from.
99 pub wall_time: Option<DateTime>,
100
101 /// The `Document` created or modified by the `insert`, `replace`, `delete`, `update`
102 /// operations (i.e. CRUD operations).
103 ///
104 /// For `insert` and `replace` operations, this represents the new document created by the
105 /// operation. For `delete` operations, this field is `None`.
106 ///
107 /// For `update` operations, this field only appears if you configured the change stream with
108 /// [`full_document`](crate::action::Watch::full_document) set to
109 /// [`UpdateLookup`](crate::options::FullDocumentType::UpdateLookup). This field then
110 /// represents the most current majority-committed version of the document modified by the
111 /// update operation.
112 pub full_document: Option<T>,
113
114 /// Contains the pre-image of the modified or deleted document if the pre-image is available
115 /// for the change event and either `Required` or `WhenAvailable` was specified for the
116 /// [`full_document_before_change`](
117 /// crate::action::Watch::full_document_before_change) option when creating the
118 /// change stream. If `WhenAvailable` was specified but the pre-image is unavailable, this
119 /// will be explicitly set to `None`.
120 pub full_document_before_change: Option<T>,
121}
122
123/// Describes which fields have been updated or removed from a document.
124#[derive(Debug, Serialize, Deserialize, PartialEq)]
125#[serde(rename_all = "camelCase")]
126#[non_exhaustive]
127pub struct UpdateDescription {
128 /// A `Document` containing key:value pairs of names of the fields that were changed, and the
129 /// new value for those fields.
130 pub updated_fields: Document,
131
132 /// An array of field names that were removed from the `Document`.
133 pub removed_fields: Vec<String>,
134
135 /// Arrays that were truncated in the `Document`.
136 pub truncated_arrays: Option<Vec<TruncatedArray>>,
137
138 /// When an update event reports changes involving ambiguous fields, the disambiguatedPaths
139 /// document provides the path key with an array listing each path component.
140 /// Note: The disambiguatedPaths field is only available on change streams started with the
141 /// showExpandedEvents option
142 pub disambiguated_paths: Option<Document>,
143}
144
145/// Describes an array that has been truncated.
146#[derive(Debug, Serialize, Deserialize, PartialEq)]
147#[serde(rename_all = "camelCase")]
148#[non_exhaustive]
149pub struct TruncatedArray {
150 /// The field path of the array.
151 pub field: String,
152
153 /// The new size of the array.
154 pub new_size: i32,
155}
156
157/// The operation type represented in a given change notification.
158#[derive(Debug, Clone, PartialEq, Eq)]
159#[non_exhaustive]
160pub enum OperationType {
161 /// See [insert-event](https://www.mongodb.com/docs/manual/reference/change-events/#insert-event)
162 Insert,
163
164 /// See [update-event](https://www.mongodb.com/docs/manual/reference/change-events/#update-event)
165 Update,
166
167 /// See [replace-event](https://www.mongodb.com/docs/manual/reference/change-events/#replace-event)
168 Replace,
169
170 /// See [delete-event](https://www.mongodb.com/docs/manual/reference/change-events/#delete-event)
171 Delete,
172
173 /// See [drop-event](https://www.mongodb.com/docs/manual/reference/change-events/#drop-event)
174 Drop,
175
176 /// See [rename-event](https://www.mongodb.com/docs/manual/reference/change-events/#rename-event)
177 Rename,
178
179 /// See [dropdatabase-event](https://www.mongodb.com/docs/manual/reference/change-events/#dropdatabase-event)
180 DropDatabase,
181
182 /// See [invalidate-event](https://www.mongodb.com/docs/manual/reference/change-events/#invalidate-event)
183 Invalidate,
184
185 /// A catch-all for future event types.
186 Other(String),
187}
188
189#[derive(Serialize, Deserialize)]
190#[serde(rename_all = "camelCase")]
191enum OperationTypeHelper {
192 Insert,
193 Update,
194 Replace,
195 Delete,
196 Drop,
197 Rename,
198 DropDatabase,
199 Invalidate,
200}
201
202#[derive(Serialize, Deserialize)]
203#[serde(untagged)]
204enum OperationTypeWrapper<'a> {
205 Known(OperationTypeHelper),
206 Unknown(&'a str),
207}
208
209impl<'a> From<&'a OperationType> for OperationTypeWrapper<'a> {
210 fn from(src: &'a OperationType) -> Self {
211 match src {
212 OperationType::Insert => Self::Known(OperationTypeHelper::Insert),
213 OperationType::Update => Self::Known(OperationTypeHelper::Update),
214 OperationType::Replace => Self::Known(OperationTypeHelper::Replace),
215 OperationType::Delete => Self::Known(OperationTypeHelper::Delete),
216 OperationType::Drop => Self::Known(OperationTypeHelper::Drop),
217 OperationType::Rename => Self::Known(OperationTypeHelper::Rename),
218 OperationType::DropDatabase => Self::Known(OperationTypeHelper::DropDatabase),
219 OperationType::Invalidate => Self::Known(OperationTypeHelper::Invalidate),
220 OperationType::Other(s) => Self::Unknown(s),
221 }
222 }
223}
224
225impl From<OperationTypeWrapper<'_>> for OperationType {
226 fn from(src: OperationTypeWrapper) -> Self {
227 match src {
228 OperationTypeWrapper::Known(h) => match h {
229 OperationTypeHelper::Insert => Self::Insert,
230 OperationTypeHelper::Update => Self::Update,
231 OperationTypeHelper::Replace => Self::Replace,
232 OperationTypeHelper::Delete => Self::Delete,
233 OperationTypeHelper::Drop => Self::Drop,
234 OperationTypeHelper::Rename => Self::Rename,
235 OperationTypeHelper::DropDatabase => Self::DropDatabase,
236 OperationTypeHelper::Invalidate => Self::Invalidate,
237 },
238 OperationTypeWrapper::Unknown(s) => Self::Other(s.to_string()),
239 }
240 }
241}
242
243impl<'de> Deserialize<'de> for OperationType {
244 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
245 where
246 D: serde::Deserializer<'de>,
247 {
248 OperationTypeWrapper::deserialize(deserializer).map(OperationType::from)
249 }
250}
251
252impl Serialize for OperationType {
253 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
254 where
255 S: serde::Serializer,
256 {
257 OperationTypeWrapper::serialize(&self.into(), serializer)
258 }
259}
260
261/// Identifies the collection or database on which an event occurred.
262#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
263#[non_exhaustive]
264pub struct ChangeNamespace {
265 /// The name of the database in which the change occurred.
266 pub db: String,
267
268 /// The name of the collection in which the change occurred.
269 pub coll: Option<String>,
270}