use std::{collections::HashMap, sync::Arc, time::Duration};
use derive_where::derive_where;
use serde::{de::Error as SerdeError, Deserialize, Deserializer, Serialize};
use typed_builder::TypedBuilder;
use crate::{
bson::doc,
error::{ErrorKind, Result},
options::ServerAddress,
sdam::public::ServerInfo,
serde_util,
};
#[derive(Clone, derive_more::Display)]
#[derive_where(Debug)]
#[non_exhaustive]
pub enum SelectionCriteria {
#[display(fmt = "ReadPreference {}", _0)]
ReadPreference(ReadPreference),
#[display(fmt = "Custom predicate")]
Predicate(#[derive_where(skip)] Predicate),
}
impl PartialEq for SelectionCriteria {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::ReadPreference(r1), Self::ReadPreference(r2)) => r1 == r2,
_ => false,
}
}
}
impl From<ReadPreference> for SelectionCriteria {
fn from(read_pref: ReadPreference) -> Self {
Self::ReadPreference(read_pref)
}
}
impl SelectionCriteria {
pub(crate) fn as_read_pref(&self) -> Option<&ReadPreference> {
match self {
Self::ReadPreference(ref read_pref) => Some(read_pref),
Self::Predicate(..) => None,
}
}
pub(crate) fn from_address(address: ServerAddress) -> Self {
SelectionCriteria::Predicate(Arc::new(move |server| server.address() == &address))
}
#[cfg(test)]
pub(crate) fn serialize_for_client_options<S>(
selection_criteria: &Option<SelectionCriteria>,
serializer: S,
) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match selection_criteria {
Some(SelectionCriteria::ReadPreference(read_preference)) => {
read_preference.serialize(serializer)
}
_ => serializer.serialize_none(),
}
}
}
impl<'de> Deserialize<'de> for SelectionCriteria {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
Ok(SelectionCriteria::ReadPreference(
ReadPreference::deserialize(deserializer)?,
))
}
}
pub type Predicate = Arc<dyn Send + Sync + Fn(&ServerInfo) -> bool>;
#[allow(missing_docs)]
#[derive(Clone, Debug, PartialEq)]
#[non_exhaustive]
pub enum ReadPreference {
Primary,
Secondary {
options: Option<ReadPreferenceOptions>,
},
PrimaryPreferred {
options: Option<ReadPreferenceOptions>,
},
SecondaryPreferred {
options: Option<ReadPreferenceOptions>,
},
Nearest {
options: Option<ReadPreferenceOptions>,
},
}
impl std::fmt::Display for ReadPreference {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut mode = self.mode().to_string();
mode[0..1].make_ascii_uppercase();
write!(f, "{{ Mode: {}", mode)?;
if let Some(options) = self.options() {
if let Some(ref tag_sets) = options.tag_sets {
write!(f, ", Tag Sets: {:?}", tag_sets)?;
}
if let Some(ref max_staleness) = options.max_staleness {
write!(f, ", Max Staleness: {:?}", max_staleness)?;
}
if let Some(ref hedge) = options.hedge {
write!(f, ", Hedge: {}", hedge.enabled)?;
}
}
write!(f, " }}")
}
}
impl<'de> Deserialize<'de> for ReadPreference {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
struct ReadPreferenceHelper {
mode: String,
#[serde(flatten)]
options: ReadPreferenceOptions,
}
let helper = ReadPreferenceHelper::deserialize(deserializer)?;
match helper.mode.to_ascii_lowercase().as_str() {
"primary" => {
if !helper.options.is_default() {
return Err(D::Error::custom(format!(
"cannot specify options for primary read preference, got {:?}",
helper.options
)));
}
Ok(ReadPreference::Primary)
}
"secondary" => Ok(ReadPreference::Secondary {
options: Some(helper.options),
}),
"primarypreferred" => Ok(ReadPreference::PrimaryPreferred {
options: Some(helper.options),
}),
"secondarypreferred" => Ok(ReadPreference::SecondaryPreferred {
options: Some(helper.options),
}),
"nearest" => Ok(ReadPreference::Nearest {
options: Some(helper.options),
}),
other => Err(D::Error::custom(format!(
"Unknown read preference mode: {}",
other
))),
}
}
}
impl Serialize for ReadPreference {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
#[serde_with::skip_serializing_none]
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct ReadPreferenceHelper<'a> {
mode: &'static str,
#[serde(flatten)]
options: Option<&'a ReadPreferenceOptions>,
}
let helper = ReadPreferenceHelper {
mode: self.mode(),
options: self.options(),
};
helper.serialize(serializer)
}
}
#[serde_with::skip_serializing_none]
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, TypedBuilder)]
#[builder(field_defaults(default, setter(into)))]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ReadPreferenceOptions {
#[serde(alias = "tag_sets")]
pub tag_sets: Option<Vec<TagSet>>,
#[serde(
rename = "maxStalenessSeconds",
default,
with = "serde_util::duration_option_as_int_seconds"
)]
pub max_staleness: Option<Duration>,
pub hedge: Option<HedgedReadOptions>,
}
impl ReadPreferenceOptions {
pub(crate) fn is_default(&self) -> bool {
self.hedge.is_none()
&& self.max_staleness.is_none()
&& self
.tag_sets
.as_ref()
.map(|ts| ts.is_empty() || ts[..] == [HashMap::default()])
.unwrap_or(true)
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, TypedBuilder)]
#[builder(field_defaults(default, setter(into)))]
#[non_exhaustive]
pub struct HedgedReadOptions {
pub enabled: bool,
}
impl ReadPreference {
pub(crate) fn mode(&self) -> &'static str {
match self {
Self::Primary => "primary",
Self::Secondary { .. } => "secondary",
Self::PrimaryPreferred { .. } => "primaryPreferred",
Self::SecondaryPreferred { .. } => "secondaryPreferred",
Self::Nearest { .. } => "nearest",
}
}
pub(crate) fn options(&self) -> Option<&ReadPreferenceOptions> {
match self {
Self::Primary => None,
Self::Secondary { options }
| Self::PrimaryPreferred { options }
| Self::SecondaryPreferred { options }
| Self::Nearest { options } => options.as_ref(),
}
}
pub(crate) fn max_staleness(&self) -> Option<Duration> {
self.options().and_then(|options| options.max_staleness)
}
pub(crate) fn tag_sets(&self) -> Option<&Vec<TagSet>> {
self.options().and_then(|options| options.tag_sets.as_ref())
}
pub(crate) fn with_tags(mut self, tag_sets: Vec<TagSet>) -> Result<Self> {
let options = match self {
Self::Primary => {
return Err(ErrorKind::InvalidArgument {
message: "read preference tags can only be specified when a non-primary mode \
is specified"
.to_string(),
}
.into());
}
Self::Secondary { ref mut options } => options,
Self::PrimaryPreferred { ref mut options } => options,
Self::SecondaryPreferred { ref mut options } => options,
Self::Nearest { ref mut options } => options,
};
options.get_or_insert_with(Default::default).tag_sets = Some(tag_sets);
Ok(self)
}
pub(crate) fn with_max_staleness(mut self, max_staleness: Duration) -> Result<Self> {
let options = match self {
ReadPreference::Primary => {
return Err(ErrorKind::InvalidArgument {
message: "max staleness can only be specified when a non-primary mode is \
specified"
.to_string(),
}
.into());
}
ReadPreference::Secondary { ref mut options } => options,
ReadPreference::PrimaryPreferred { ref mut options } => options,
ReadPreference::SecondaryPreferred { ref mut options } => options,
ReadPreference::Nearest { ref mut options } => options,
};
options.get_or_insert_with(Default::default).max_staleness = Some(max_staleness);
Ok(self)
}
}
pub type TagSet = HashMap<String, String>;
#[cfg(test)]
mod test {
use super::{HedgedReadOptions, ReadPreference, ReadPreferenceOptions};
use crate::bson::doc;
#[test]
fn hedged_read_included_in_document() {
let options = Some(
ReadPreferenceOptions::builder()
.hedge(HedgedReadOptions { enabled: true })
.build(),
);
let read_pref = ReadPreference::Secondary { options };
let doc = bson::to_document(&read_pref).unwrap();
assert_eq!(
doc,
doc! { "mode": "secondary", "hedge": { "enabled": true } }
);
}
}