mongodb/
concern.rs

1//! Contains the types for read concerns and write concerns.
2
3#[cfg(test)]
4mod test;
5
6use std::time::Duration;
7
8use serde::{Deserialize, Deserializer, Serialize, Serializer};
9use serde_with::skip_serializing_none;
10use typed_builder::TypedBuilder;
11
12use crate::{
13    bson::{doc, Timestamp},
14    error::{ErrorKind, Result},
15    serde_util,
16};
17
18/// Specifies the consistency and isolation properties of read operations from replica sets and
19/// replica set shards.
20///
21/// See the documentation [here](https://www.mongodb.com/docs/manual/reference/read-concern/) for more
22/// information about read concerns.
23#[skip_serializing_none]
24#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
25#[serde(rename_all = "camelCase")]
26#[non_exhaustive]
27pub struct ReadConcern {
28    /// The level of the read concern.
29    pub level: ReadConcernLevel,
30}
31
32/// An internal-only read concern type that allows the omission of a "level" as well as
33/// specification of "atClusterTime" and "afterClusterTime".
34#[skip_serializing_none]
35#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
36#[serde(rename_all = "camelCase")]
37#[serde(rename = "readConcern")]
38pub(crate) struct ReadConcernInternal {
39    /// The level of the read concern.
40    pub(crate) level: Option<ReadConcernLevel>,
41
42    /// The snapshot read timestamp.
43    pub(crate) at_cluster_time: Option<Timestamp>,
44
45    /// The time of most recent operation using this session.
46    /// Used for providing causal consistency.
47    pub(crate) after_cluster_time: Option<Timestamp>,
48}
49
50impl ReadConcern {
51    /// Creates a read concern with level "majority".
52    /// See the specific documentation for this read concern level [here](https://www.mongodb.com/docs/manual/reference/read-concern-majority/).
53    pub fn majority() -> Self {
54        ReadConcernLevel::Majority.into()
55    }
56
57    /// Creates a read concern with level "local".
58    /// See the specific documentation for this read concern level [here](https://www.mongodb.com/docs/manual/reference/read-concern-local/).
59    pub fn local() -> Self {
60        ReadConcernLevel::Local.into()
61    }
62
63    /// Creates a read concern with level "linearizable".
64    /// See the specific documentation for this read concern level [here](https://www.mongodb.com/docs/manual/reference/read-concern-linearizable/).
65    pub fn linearizable() -> Self {
66        ReadConcernLevel::Linearizable.into()
67    }
68
69    /// Creates a read concern with level "available".
70    /// See the specific documentation for this read concern level [here](https://www.mongodb.com/docs/manual/reference/read-concern-available/).
71    pub fn available() -> Self {
72        ReadConcernLevel::Available.into()
73    }
74
75    /// Creates a read concern with level "snapshot".
76    /// See the specific documentation for this read concern level [here](https://www.mongodb.com/docs/manual/reference/read-concern-snapshot/).
77    pub fn snapshot() -> Self {
78        ReadConcernLevel::Snapshot.into()
79    }
80
81    /// Creates a read concern with a custom read concern level. This is present to provide forwards
82    /// compatibility with any future read concerns which may be added to new versions of
83    /// MongoDB.
84    pub fn custom(level: impl AsRef<str>) -> Self {
85        ReadConcernLevel::from_str(level.as_ref()).into()
86    }
87
88    pub(crate) fn serialize<S>(
89        read_concern: &Option<ReadConcern>,
90        serializer: S,
91    ) -> std::result::Result<S::Ok, S::Error>
92    where
93        S: Serializer,
94    {
95        #[derive(Serialize)]
96        struct ReadConcernHelper<'a> {
97            readconcernlevel: &'a str,
98        }
99
100        let state = read_concern.as_ref().map(|concern| ReadConcernHelper {
101            readconcernlevel: concern.level.as_str(),
102        });
103        state.serialize(serializer)
104    }
105}
106
107impl From<ReadConcern> for ReadConcernInternal {
108    fn from(rc: ReadConcern) -> Self {
109        ReadConcernInternal {
110            level: Some(rc.level),
111            at_cluster_time: None,
112            after_cluster_time: None,
113        }
114    }
115}
116
117impl From<ReadConcernLevel> for ReadConcern {
118    fn from(level: ReadConcernLevel) -> Self {
119        Self { level }
120    }
121}
122
123/// Specifies the level consistency and isolation properties of a given `ReadCocnern`.
124///
125/// See the documentation [here](https://www.mongodb.com/docs/manual/reference/read-concern/) for more
126/// information about read concerns.
127#[derive(Clone, Debug, PartialEq)]
128#[non_exhaustive]
129pub enum ReadConcernLevel {
130    /// See the specific documentation for this read concern level [here](https://www.mongodb.com/docs/manual/reference/read-concern-local/).
131    Local,
132
133    /// See the specific documentation for this read concern level [here](https://www.mongodb.com/docs/manual/reference/read-concern-majority/).
134    Majority,
135
136    /// See the specific documentation for this read concern level [here](https://www.mongodb.com/docs/manual/reference/read-concern-linearizable/).
137    Linearizable,
138
139    /// See the specific documentation for this read concern level [here](https://www.mongodb.com/docs/manual/reference/read-concern-available/).
140    Available,
141
142    /// See the specific documentation for this read concern level [here](https://www.mongodb.com/docs/manual/reference/read-concern-snapshot/).
143    Snapshot,
144
145    /// Specify a custom read concern level. This is present to provide forwards compatibility with
146    /// any future read concerns which may be added to new versions of MongoDB.
147    Custom(String),
148}
149
150impl ReadConcernLevel {
151    pub(crate) fn from_str(s: &str) -> Self {
152        match s {
153            "local" => ReadConcernLevel::Local,
154            "majority" => ReadConcernLevel::Majority,
155            "linearizable" => ReadConcernLevel::Linearizable,
156            "available" => ReadConcernLevel::Available,
157            "snapshot" => ReadConcernLevel::Snapshot,
158            s => ReadConcernLevel::Custom(s.to_string()),
159        }
160    }
161
162    /// Gets the string representation of the `ReadConcernLevel`.
163    pub(crate) fn as_str(&self) -> &str {
164        match self {
165            ReadConcernLevel::Local => "local",
166            ReadConcernLevel::Majority => "majority",
167            ReadConcernLevel::Linearizable => "linearizable",
168            ReadConcernLevel::Available => "available",
169            ReadConcernLevel::Snapshot => "snapshot",
170            ReadConcernLevel::Custom(ref s) => s,
171        }
172    }
173}
174
175impl<'de> Deserialize<'de> for ReadConcernLevel {
176    fn deserialize<D: Deserializer<'de>>(deserializer: D) -> std::result::Result<Self, D::Error> {
177        let s = String::deserialize(deserializer)?;
178        Ok(ReadConcernLevel::from_str(&s))
179    }
180}
181
182impl Serialize for ReadConcernLevel {
183    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
184    where
185        S: Serializer,
186    {
187        self.as_str().serialize(serializer)
188    }
189}
190
191/// Specifies the level of acknowledgement requested from the server for write operations.
192///
193/// See the documentation [here](https://www.mongodb.com/docs/manual/reference/write-concern/) for more
194/// information about write concerns.
195#[skip_serializing_none]
196#[derive(Clone, Debug, Default, PartialEq, TypedBuilder, Serialize, Deserialize)]
197#[builder(field_defaults(default, setter(into)))]
198#[non_exhaustive]
199pub struct WriteConcern {
200    /// Requests acknowledgement that the operation has propagated to a specific number or variety
201    /// of servers.
202    pub w: Option<Acknowledgment>,
203
204    /// Specifies a time limit for the write concern. If an operation has not propagated to the
205    /// requested level within the time limit, an error will return.
206    ///
207    /// Note that an error being returned due to a write concern error does not imply that the
208    /// write would not have finished propagating if allowed more time to finish, and the
209    /// server will not roll back the writes that occurred before the timeout was reached.
210    #[serde(rename = "wtimeout", alias = "wtimeoutMS")]
211    #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
212    #[serde(deserialize_with = "serde_util::deserialize_duration_option_from_u64_millis")]
213    #[serde(default)]
214    pub w_timeout: Option<Duration>,
215
216    /// Requests acknowledgement that the operation has propagated to the on-disk journal.
217    #[serde(rename = "j", alias = "journal")]
218    pub journal: Option<bool>,
219}
220
221/// The type of the `w` field in a [`WriteConcern`](struct.WriteConcern.html).
222#[derive(Clone, Debug, PartialEq)]
223#[non_exhaustive]
224pub enum Acknowledgment {
225    /// Requires acknowledgement that the write has reached the specified number of nodes.
226    ///
227    /// Note: specifying 0 here indicates that the write concern is unacknowledged, which is
228    /// currently unsupported and will result in an error during operation execution.
229    Nodes(u32),
230
231    /// Requires acknowledgement that the write has reached the majority of nodes.
232    Majority,
233
234    /// Requires acknowledgement according to the given custom write concern. See [here](https://www.mongodb.com/docs/manual/tutorial/configure-replica-set-tag-sets/#tag-sets-and-custom-write-concern-behavior)
235    /// for more information.
236    Custom(String),
237}
238
239impl Serialize for Acknowledgment {
240    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
241    where
242        S: Serializer,
243    {
244        match self {
245            Acknowledgment::Majority => serializer.serialize_str("majority"),
246            Acknowledgment::Nodes(n) => serde_util::serialize_u32_as_i32(n, serializer),
247            Acknowledgment::Custom(name) => serializer.serialize_str(name),
248        }
249    }
250}
251
252impl<'de> Deserialize<'de> for Acknowledgment {
253    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
254    where
255        D: Deserializer<'de>,
256    {
257        #[derive(Deserialize)]
258        #[serde(untagged)]
259        enum IntOrString {
260            Int(u32),
261            String(String),
262        }
263        match IntOrString::deserialize(deserializer)? {
264            IntOrString::String(s) => Ok(s.into()),
265            IntOrString::Int(i) => Ok(i.into()),
266        }
267    }
268}
269
270impl From<u32> for Acknowledgment {
271    fn from(i: u32) -> Self {
272        Acknowledgment::Nodes(i)
273    }
274}
275
276impl From<&str> for Acknowledgment {
277    fn from(s: &str) -> Self {
278        if s == "majority" {
279            Acknowledgment::Majority
280        } else {
281            Acknowledgment::Custom(s.to_string())
282        }
283    }
284}
285
286impl From<String> for Acknowledgment {
287    fn from(s: String) -> Self {
288        if s == "majority" {
289            Acknowledgment::Majority
290        } else {
291            Acknowledgment::Custom(s)
292        }
293    }
294}
295
296impl WriteConcern {
297    /// A 'WriteConcern' requesting [`Acknowledgment::Nodes`].
298    pub fn nodes(v: u32) -> Self {
299        Acknowledgment::Nodes(v).into()
300    }
301
302    /// A `WriteConcern` requesting [`Acknowledgment::Majority`].
303    pub fn majority() -> Self {
304        Acknowledgment::Majority.into()
305    }
306
307    /// A `WriteConcern` with a custom acknowledgment.
308    pub fn custom(s: impl AsRef<str>) -> Self {
309        Acknowledgment::from(s.as_ref()).into()
310    }
311
312    pub(crate) fn is_acknowledged(&self) -> bool {
313        self.w != Some(Acknowledgment::Nodes(0)) || self.journal == Some(true)
314    }
315
316    /// Whether the write concern was created with no values specified. If true, the write concern
317    /// should be considered the server's default.
318    pub(crate) fn is_empty(&self) -> bool {
319        self.w.is_none() && self.w_timeout.is_none() && self.journal.is_none()
320    }
321
322    /// Validates that the write concern. A write concern is invalid if both the `w` field is 0
323    /// and the `j` field is `true`.
324    pub(crate) fn validate(&self) -> Result<()> {
325        if self.w == Some(Acknowledgment::Nodes(0)) && self.journal == Some(true) {
326            return Err(ErrorKind::InvalidArgument {
327                message: "write concern cannot have w=0 and j=true".to_string(),
328            }
329            .into());
330        }
331
332        if let Some(w_timeout) = self.w_timeout {
333            if w_timeout < Duration::from_millis(0) {
334                return Err(ErrorKind::InvalidArgument {
335                    message: "write concern `w_timeout` field cannot be negative".to_string(),
336                }
337                .into());
338            }
339        }
340
341        Ok(())
342    }
343
344    pub(crate) fn serialize<S>(
345        write_concern: &Option<WriteConcern>,
346        serializer: S,
347    ) -> std::result::Result<S::Ok, S::Error>
348    where
349        S: Serializer,
350    {
351        #[derive(Serialize)]
352        struct WriteConcernHelper<'a> {
353            w: Option<&'a Acknowledgment>,
354
355            #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
356            wtimeoutms: Option<Duration>,
357
358            journal: Option<bool>,
359        }
360
361        let state = write_concern.as_ref().map(|concern| WriteConcernHelper {
362            w: concern.w.as_ref(),
363            wtimeoutms: concern.w_timeout,
364            journal: concern.journal,
365        });
366
367        state.serialize(serializer)
368    }
369}
370
371impl From<Acknowledgment> for WriteConcern {
372    fn from(w: Acknowledgment) -> Self {
373        WriteConcern {
374            w: Some(w),
375            w_timeout: None,
376            journal: None,
377        }
378    }
379}