1#[cfg(test)]
4mod test;
5
6use std::time::Duration;
7
8use serde::{Deserialize, Deserializer, Serialize, Serializer};
9use serde_with::skip_serializing_none;
10use typed_builder::TypedBuilder;
11
12use crate::{
13 bson::{doc, Timestamp},
14 error::{ErrorKind, Result},
15 serde_util,
16};
17
18#[skip_serializing_none]
24#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
25#[serde(rename_all = "camelCase")]
26#[non_exhaustive]
27pub struct ReadConcern {
28 pub level: ReadConcernLevel,
30}
31
32#[skip_serializing_none]
35#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
36#[serde(rename_all = "camelCase")]
37#[serde(rename = "readConcern")]
38pub(crate) struct ReadConcernInternal {
39 pub(crate) level: Option<ReadConcernLevel>,
41
42 pub(crate) at_cluster_time: Option<Timestamp>,
44
45 pub(crate) after_cluster_time: Option<Timestamp>,
48}
49
50impl ReadConcern {
51 pub fn majority() -> Self {
54 ReadConcernLevel::Majority.into()
55 }
56
57 pub fn local() -> Self {
60 ReadConcernLevel::Local.into()
61 }
62
63 pub fn linearizable() -> Self {
66 ReadConcernLevel::Linearizable.into()
67 }
68
69 pub fn available() -> Self {
72 ReadConcernLevel::Available.into()
73 }
74
75 pub fn snapshot() -> Self {
78 ReadConcernLevel::Snapshot.into()
79 }
80
81 pub fn custom(level: impl AsRef<str>) -> Self {
85 ReadConcernLevel::from_str(level.as_ref()).into()
86 }
87
88 pub(crate) fn serialize<S>(
89 read_concern: &Option<ReadConcern>,
90 serializer: S,
91 ) -> std::result::Result<S::Ok, S::Error>
92 where
93 S: Serializer,
94 {
95 #[derive(Serialize)]
96 struct ReadConcernHelper<'a> {
97 readconcernlevel: &'a str,
98 }
99
100 let state = read_concern.as_ref().map(|concern| ReadConcernHelper {
101 readconcernlevel: concern.level.as_str(),
102 });
103 state.serialize(serializer)
104 }
105}
106
107impl From<ReadConcern> for ReadConcernInternal {
108 fn from(rc: ReadConcern) -> Self {
109 ReadConcernInternal {
110 level: Some(rc.level),
111 at_cluster_time: None,
112 after_cluster_time: None,
113 }
114 }
115}
116
117impl From<ReadConcernLevel> for ReadConcern {
118 fn from(level: ReadConcernLevel) -> Self {
119 Self { level }
120 }
121}
122
123#[derive(Clone, Debug, PartialEq)]
128#[non_exhaustive]
129pub enum ReadConcernLevel {
130 Local,
132
133 Majority,
135
136 Linearizable,
138
139 Available,
141
142 Snapshot,
144
145 Custom(String),
148}
149
150impl ReadConcernLevel {
151 pub(crate) fn from_str(s: &str) -> Self {
152 match s {
153 "local" => ReadConcernLevel::Local,
154 "majority" => ReadConcernLevel::Majority,
155 "linearizable" => ReadConcernLevel::Linearizable,
156 "available" => ReadConcernLevel::Available,
157 "snapshot" => ReadConcernLevel::Snapshot,
158 s => ReadConcernLevel::Custom(s.to_string()),
159 }
160 }
161
162 pub(crate) fn as_str(&self) -> &str {
164 match self {
165 ReadConcernLevel::Local => "local",
166 ReadConcernLevel::Majority => "majority",
167 ReadConcernLevel::Linearizable => "linearizable",
168 ReadConcernLevel::Available => "available",
169 ReadConcernLevel::Snapshot => "snapshot",
170 ReadConcernLevel::Custom(ref s) => s,
171 }
172 }
173}
174
175impl<'de> Deserialize<'de> for ReadConcernLevel {
176 fn deserialize<D: Deserializer<'de>>(deserializer: D) -> std::result::Result<Self, D::Error> {
177 let s = String::deserialize(deserializer)?;
178 Ok(ReadConcernLevel::from_str(&s))
179 }
180}
181
182impl Serialize for ReadConcernLevel {
183 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
184 where
185 S: Serializer,
186 {
187 self.as_str().serialize(serializer)
188 }
189}
190
191#[skip_serializing_none]
196#[derive(Clone, Debug, Default, PartialEq, TypedBuilder, Serialize, Deserialize)]
197#[builder(field_defaults(default, setter(into)))]
198#[non_exhaustive]
199pub struct WriteConcern {
200 pub w: Option<Acknowledgment>,
203
204 #[serde(rename = "wtimeout", alias = "wtimeoutMS")]
211 #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
212 #[serde(deserialize_with = "serde_util::deserialize_duration_option_from_u64_millis")]
213 #[serde(default)]
214 pub w_timeout: Option<Duration>,
215
216 #[serde(rename = "j", alias = "journal")]
218 pub journal: Option<bool>,
219}
220
221#[derive(Clone, Debug, PartialEq)]
223#[non_exhaustive]
224pub enum Acknowledgment {
225 Nodes(u32),
230
231 Majority,
233
234 Custom(String),
237}
238
239impl Serialize for Acknowledgment {
240 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
241 where
242 S: Serializer,
243 {
244 match self {
245 Acknowledgment::Majority => serializer.serialize_str("majority"),
246 Acknowledgment::Nodes(n) => serde_util::serialize_u32_as_i32(n, serializer),
247 Acknowledgment::Custom(name) => serializer.serialize_str(name),
248 }
249 }
250}
251
252impl<'de> Deserialize<'de> for Acknowledgment {
253 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
254 where
255 D: Deserializer<'de>,
256 {
257 #[derive(Deserialize)]
258 #[serde(untagged)]
259 enum IntOrString {
260 Int(u32),
261 String(String),
262 }
263 match IntOrString::deserialize(deserializer)? {
264 IntOrString::String(s) => Ok(s.into()),
265 IntOrString::Int(i) => Ok(i.into()),
266 }
267 }
268}
269
270impl From<u32> for Acknowledgment {
271 fn from(i: u32) -> Self {
272 Acknowledgment::Nodes(i)
273 }
274}
275
276impl From<&str> for Acknowledgment {
277 fn from(s: &str) -> Self {
278 if s == "majority" {
279 Acknowledgment::Majority
280 } else {
281 Acknowledgment::Custom(s.to_string())
282 }
283 }
284}
285
286impl From<String> for Acknowledgment {
287 fn from(s: String) -> Self {
288 if s == "majority" {
289 Acknowledgment::Majority
290 } else {
291 Acknowledgment::Custom(s)
292 }
293 }
294}
295
296impl WriteConcern {
297 pub fn nodes(v: u32) -> Self {
299 Acknowledgment::Nodes(v).into()
300 }
301
302 pub fn majority() -> Self {
304 Acknowledgment::Majority.into()
305 }
306
307 pub fn custom(s: impl AsRef<str>) -> Self {
309 Acknowledgment::from(s.as_ref()).into()
310 }
311
312 pub(crate) fn is_acknowledged(&self) -> bool {
313 self.w != Some(Acknowledgment::Nodes(0)) || self.journal == Some(true)
314 }
315
316 pub(crate) fn is_empty(&self) -> bool {
319 self.w.is_none() && self.w_timeout.is_none() && self.journal.is_none()
320 }
321
322 pub(crate) fn validate(&self) -> Result<()> {
325 if self.w == Some(Acknowledgment::Nodes(0)) && self.journal == Some(true) {
326 return Err(ErrorKind::InvalidArgument {
327 message: "write concern cannot have w=0 and j=true".to_string(),
328 }
329 .into());
330 }
331
332 if let Some(w_timeout) = self.w_timeout {
333 if w_timeout < Duration::from_millis(0) {
334 return Err(ErrorKind::InvalidArgument {
335 message: "write concern `w_timeout` field cannot be negative".to_string(),
336 }
337 .into());
338 }
339 }
340
341 Ok(())
342 }
343
344 pub(crate) fn serialize<S>(
345 write_concern: &Option<WriteConcern>,
346 serializer: S,
347 ) -> std::result::Result<S::Ok, S::Error>
348 where
349 S: Serializer,
350 {
351 #[derive(Serialize)]
352 struct WriteConcernHelper<'a> {
353 w: Option<&'a Acknowledgment>,
354
355 #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
356 wtimeoutms: Option<Duration>,
357
358 journal: Option<bool>,
359 }
360
361 let state = write_concern.as_ref().map(|concern| WriteConcernHelper {
362 w: concern.w.as_ref(),
363 wtimeoutms: concern.w_timeout,
364 journal: concern.journal,
365 });
366
367 state.serialize(serializer)
368 }
369}
370
371impl From<Acknowledgment> for WriteConcern {
372 fn from(w: Acknowledgment) -> Self {
373 WriteConcern {
374 w: Some(w),
375 w_timeout: None,
376 journal: None,
377 }
378 }
379}