#[cfg(test)]
mod test;
use std::time::Duration;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::skip_serializing_none;
use typed_builder::TypedBuilder;
use crate::{
bson::{doc, serde_helpers, Timestamp},
bson_util,
error::{ErrorKind, Result},
};
#[skip_serializing_none]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ReadConcern {
pub level: ReadConcernLevel,
}
#[skip_serializing_none]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
#[serde(rename = "readConcern")]
pub(crate) struct ReadConcernInternal {
pub(crate) level: Option<ReadConcernLevel>,
pub(crate) at_cluster_time: Option<Timestamp>,
pub(crate) after_cluster_time: Option<Timestamp>,
}
impl ReadConcern {
pub fn majority() -> Self {
ReadConcernLevel::Majority.into()
}
pub fn local() -> Self {
ReadConcernLevel::Local.into()
}
pub fn linearizable() -> Self {
ReadConcernLevel::Linearizable.into()
}
pub fn available() -> Self {
ReadConcernLevel::Available.into()
}
pub fn snapshot() -> Self {
ReadConcernLevel::Snapshot.into()
}
pub fn custom(level: String) -> Self {
ReadConcernLevel::from_str(level.as_str()).into()
}
#[cfg(test)]
pub(crate) fn serialize_for_client_options<S>(
read_concern: &Option<ReadConcern>,
serializer: S,
) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
#[derive(Serialize)]
struct ReadConcernHelper<'a> {
readconcernlevel: &'a str,
}
let state = read_concern.as_ref().map(|concern| ReadConcernHelper {
readconcernlevel: concern.level.as_str(),
});
state.serialize(serializer)
}
}
impl From<ReadConcern> for ReadConcernInternal {
fn from(rc: ReadConcern) -> Self {
ReadConcernInternal {
level: Some(rc.level),
at_cluster_time: None,
after_cluster_time: None,
}
}
}
impl From<ReadConcernLevel> for ReadConcern {
fn from(level: ReadConcernLevel) -> Self {
Self { level }
}
}
#[derive(Clone, Debug, PartialEq)]
#[non_exhaustive]
pub enum ReadConcernLevel {
Local,
Majority,
Linearizable,
Available,
Snapshot,
Custom(String),
}
impl ReadConcernLevel {
pub(crate) fn from_str(s: &str) -> Self {
match s {
"local" => ReadConcernLevel::Local,
"majority" => ReadConcernLevel::Majority,
"linearizable" => ReadConcernLevel::Linearizable,
"available" => ReadConcernLevel::Available,
"snapshot" => ReadConcernLevel::Snapshot,
s => ReadConcernLevel::Custom(s.to_string()),
}
}
pub(crate) fn as_str(&self) -> &str {
match self {
ReadConcernLevel::Local => "local",
ReadConcernLevel::Majority => "majority",
ReadConcernLevel::Linearizable => "linearizable",
ReadConcernLevel::Available => "available",
ReadConcernLevel::Snapshot => "snapshot",
ReadConcernLevel::Custom(ref s) => s,
}
}
}
impl<'de> Deserialize<'de> for ReadConcernLevel {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> std::result::Result<Self, D::Error> {
let s = String::deserialize(deserializer)?;
Ok(ReadConcernLevel::from_str(&s))
}
}
impl Serialize for ReadConcernLevel {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
self.as_str().serialize(serializer)
}
}
#[skip_serializing_none]
#[derive(Clone, Debug, Default, PartialEq, TypedBuilder, Serialize, Deserialize)]
#[builder(field_defaults(default, setter(into)))]
#[non_exhaustive]
pub struct WriteConcern {
pub w: Option<Acknowledgment>,
#[serde(rename = "wtimeout", alias = "wtimeoutMS")]
#[serde(serialize_with = "bson_util::serialize_duration_option_as_int_millis")]
#[serde(deserialize_with = "bson_util::deserialize_duration_option_from_u64_millis")]
#[serde(default)]
pub w_timeout: Option<Duration>,
#[serde(rename = "j", alias = "journal")]
pub journal: Option<bool>,
}
#[derive(Clone, Debug, PartialEq)]
#[non_exhaustive]
pub enum Acknowledgment {
Nodes(u32),
Majority,
Custom(String),
}
impl Serialize for Acknowledgment {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
match self {
Acknowledgment::Majority => serializer.serialize_str("majority"),
Acknowledgment::Nodes(n) => serde_helpers::serialize_u32_as_i32(n, serializer),
Acknowledgment::Custom(name) => serializer.serialize_str(name),
}
}
}
impl<'de> Deserialize<'de> for Acknowledgment {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum IntOrString {
Int(u32),
String(String),
}
match IntOrString::deserialize(deserializer)? {
IntOrString::String(s) => Ok(s.into()),
IntOrString::Int(i) => Ok(i.into()),
}
}
}
impl From<u32> for Acknowledgment {
fn from(i: u32) -> Self {
Acknowledgment::Nodes(i)
}
}
impl From<String> for Acknowledgment {
fn from(s: String) -> Self {
if s == "majority" {
Acknowledgment::Majority
} else {
Acknowledgment::Custom(s)
}
}
}
impl WriteConcern {
pub(crate) fn is_acknowledged(&self) -> bool {
self.w != Some(Acknowledgment::Nodes(0)) || self.journal == Some(true)
}
pub(crate) fn is_empty(&self) -> bool {
self.w == None && self.w_timeout == None && self.journal == None
}
pub(crate) fn validate(&self) -> Result<()> {
if self.w == Some(Acknowledgment::Nodes(0)) && self.journal == Some(true) {
return Err(ErrorKind::InvalidArgument {
message: "write concern cannot have w=0 and j=true".to_string(),
}
.into());
}
if let Some(w_timeout) = self.w_timeout {
if w_timeout < Duration::from_millis(0) {
return Err(ErrorKind::InvalidArgument {
message: "write concern `w_timeout` field cannot be negative".to_string(),
}
.into());
}
}
Ok(())
}
#[cfg(test)]
pub(crate) fn serialize_for_client_options<S>(
write_concern: &Option<WriteConcern>,
serializer: S,
) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
#[derive(Serialize)]
struct WriteConcernHelper<'a> {
w: Option<&'a Acknowledgment>,
#[serde(serialize_with = "bson_util::serialize_duration_option_as_int_millis")]
wtimeoutms: Option<Duration>,
journal: Option<bool>,
}
let state = write_concern.as_ref().map(|concern| WriteConcernHelper {
w: concern.w.as_ref(),
wtimeoutms: concern.w_timeout,
journal: concern.journal,
});
state.serialize(serializer)
}
}