1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
//! Contains options for ChangeStreams.
use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;
use std::time::Duration;
use typed_builder::TypedBuilder;

use crate::{
    bson::Timestamp,
    change_stream::event::ResumeToken,
    collation::Collation,
    concern::ReadConcern,
    options::AggregateOptions,
    selection_criteria::SelectionCriteria,
};

/// These are the valid options that can be passed to the `watch` method for creating a
/// [`ChangeStream`](crate::change_stream::ChangeStream).
#[skip_serializing_none]
#[derive(Clone, Debug, Default, Deserialize, Serialize, TypedBuilder)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ChangeStreamOptions {
    #[rustfmt::skip]
    /// Configures how the
    /// [`ChangeStreamEvent::full_document`](crate::change_stream::event::ChangeStreamEvent::full_document)
    /// field will be populated. By default, the field will be empty for updates.
    #[builder(default)]
    pub full_document: Option<FullDocumentType>,

    /// Configures how the
    /// [`ChangeStreamEvent::full_document_before_change`](
    /// crate::change_stream::event::ChangeStreamEvent::full_document_before_change) field will be
    /// populated.  By default, the field will be empty for updates.
    #[builder(default)]
    pub full_document_before_change: Option<FullDocumentBeforeChangeType>,

    /// Specifies the logical starting point for the new change stream. Note that if a watched
    /// collection is dropped and recreated or newly renamed, `start_after` should be set instead.
    /// `resume_after` and `start_after` cannot be set simultaneously.
    ///
    /// For more information on resuming a change stream see the documentation [here](https://www.mongodb.com/docs/manual/changeStreams/#change-stream-resume-after)
    #[builder(default)]
    pub resume_after: Option<ResumeToken>,

    /// The change stream will only provide changes that occurred at or after the specified
    /// timestamp. Any command run against the server will return an operation time that can be
    /// used here.
    #[builder(default)]
    pub start_at_operation_time: Option<Timestamp>,

    /// Takes a resume token and starts a new change stream returning the first notification after
    /// the token. This will allow users to watch collections that have been dropped and
    /// recreated or newly renamed collections without missing any notifications.
    ///
    /// This feature is only available on MongoDB 4.2+.
    ///
    /// See the documentation [here](https://www.mongodb.com/docs/master/changeStreams/#change-stream-start-after) for more
    /// information.
    #[builder(default)]
    pub start_after: Option<ResumeToken>,

    /// If `true`, the change stream will monitor all changes for the given cluster.
    #[builder(default, setter(skip))]
    pub(crate) all_changes_for_cluster: Option<bool>,

    /// The maximum amount of time for the server to wait on new documents to satisfy a change
    /// stream query.
    #[builder(default)]
    #[serde(skip_serializing)]
    pub max_await_time: Option<Duration>,

    /// The number of documents to return per batch.
    #[builder(default)]
    #[serde(skip_serializing)]
    pub batch_size: Option<u32>,

    /// Specifies a collation.
    #[builder(default)]
    #[serde(skip_serializing)]
    pub collation: Option<Collation>,

    /// The read concern to use for the operation.
    ///
    /// If none is specified, the read concern defined on the object executing this operation will
    /// be used.
    #[builder(default)]
    #[serde(skip_serializing)]
    pub read_concern: Option<ReadConcern>,

    /// The criteria used to select a server for this operation.
    ///
    /// If none is specified, the selection criteria defined on the object executing this operation
    /// will be used.
    #[builder(default)]
    #[serde(skip_serializing)]
    pub selection_criteria: Option<SelectionCriteria>,
}

impl ChangeStreamOptions {
    pub(crate) fn aggregate_options(&self) -> AggregateOptions {
        AggregateOptions::builder()
            .batch_size(self.batch_size)
            .collation(self.collation.clone())
            .max_await_time(self.max_await_time)
            .read_concern(self.read_concern.clone())
            .selection_criteria(self.selection_criteria.clone())
            .build()
    }
}

/// Describes the modes for configuring the
/// [`ChangeStreamEvent::full_document`](
/// crate::change_stream::event::ChangeStreamEvent::full_document) field.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub enum FullDocumentType {
    /// The field will be populated with a copy of the entire document that was updated.
    UpdateLookup,

    /// The field will be populated for replace and update change events if the post-image for this
    /// event is available.
    WhenAvailable,

    /// The same behavior as `WhenAvailable` except that an error is raised if the post-image is
    /// not available.
    Required,

    /// User-defined other types for forward compatibility.
    Other(String),
}

/// Describes the modes for configuring the
/// [`ChangeStreamEvent::full_document_before_change`](
/// crate::change_stream::event::ChangeStreamEvent::full_document_before_change) field.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub enum FullDocumentBeforeChangeType {
    /// The field will be populated for replace, update, and delete change events if the pre-image
    /// for this event is available.
    WhenAvailable,

    /// The same behavior as `WhenAvailable` except that an error is raised if the pre-image is
    /// not available.
    Required,

    /// Do not send a value.
    Off,

    /// User-defined other types for forward compatibility.
    Other(String),
}