mongodb 0.9.1

The official MongoDB driver for Rust (currently in alpha)
Documentation
use std::time::Duration;

use bson::UtcDateTime;
use chrono::{naive::NaiveDateTime, offset::Utc, DateTime};
use serde::Deserialize;

use crate::test::run_spec_test;

use crate::{
    is_master::{IsMasterCommandResponse, IsMasterReply, LastWrite},
    options::StreamAddress,
    sdam::description::{
        server::{ServerDescription, ServerType},
        topology::{test::f64_ms_as_duration, TopologyDescription, TopologyType},
    },
    selection_criteria::{ReadPreference, TagSet},
};

#[derive(Debug, Deserialize)]
struct TestFile {
    #[serde(rename = "heartbeatFrequencyMS")]
    heartbeat_frequency_ms: Option<u64>,
    topology_description: TestTopologyDescription,
    read_preference: TestReadPreference,
    suitable_servers: Option<Vec<TestServerDescription>>,
    in_latency_window: Option<Vec<TestServerDescription>>,
    error: Option<bool>,
}

#[derive(Debug, Deserialize)]
struct TestTopologyDescription {
    #[serde(rename = "type")]
    topology_type: TopologyType,
    servers: Vec<TestServerDescription>,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TestServerDescription {
    address: String,
    #[serde(rename = "avg_rtt_ms")]
    avg_rtt_ms: Option<f64>,
    #[serde(rename = "type")]
    server_type: TestServerType,
    tags: Option<TagSet>,
    last_update_time: Option<i64>,
    last_write: Option<LastWriteDate>,
    max_wire_version: Option<i32>,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct LastWriteDate {
    last_write_date: NumberLong,
}

#[derive(Debug, Deserialize)]
struct NumberLong {
    #[serde(rename = "$numberLong")]
    number: String,
}

#[derive(Clone, Copy, Debug, Deserialize)]
enum TestServerType {
    Standalone,
    Mongos,
    RSPrimary,
    RSSecondary,
    RSArbiter,
    RSOther,
    RSGhost,
    Unknown,
    PossiblePrimary,
}

impl TestServerType {
    fn into_server_type(self) -> Option<ServerType> {
        match self {
            TestServerType::Standalone => Some(ServerType::Standalone),
            TestServerType::Mongos => Some(ServerType::Mongos),
            TestServerType::RSPrimary => Some(ServerType::RSPrimary),
            TestServerType::RSSecondary => Some(ServerType::RSSecondary),
            TestServerType::RSArbiter => Some(ServerType::RSArbiter),
            TestServerType::RSOther => Some(ServerType::RSOther),
            TestServerType::RSGhost => Some(ServerType::RSGhost),
            TestServerType::Unknown => Some(ServerType::Unknown),
            TestServerType::PossiblePrimary => None,
        }
    }
}

#[derive(Debug, Deserialize)]
pub struct TestReadPreference {
    pub mode: Option<String>,
    pub tag_sets: Option<Vec<TagSet>>,
    #[serde(rename = "maxStalenessSeconds")]
    pub max_staleness_seconds: Option<u64>,
}

fn convert_read_preference(test_read_pref: TestReadPreference) -> Option<ReadPreference> {
    let max_staleness = test_read_pref
        .max_staleness_seconds
        .map(Duration::from_secs);

    let read_pref = match &test_read_pref.mode.as_ref()?[..] {
        "Primary" => ReadPreference::Primary,
        "Secondary" => ReadPreference::Secondary {
            tag_sets: test_read_pref.tag_sets,
            max_staleness,
        },
        "PrimaryPreferred" => ReadPreference::PrimaryPreferred {
            tag_sets: test_read_pref.tag_sets,
            max_staleness,
        },
        "SecondaryPreferred" => ReadPreference::SecondaryPreferred {
            tag_sets: test_read_pref.tag_sets,
            max_staleness,
        },
        "Nearest" => ReadPreference::Nearest {
            tag_sets: test_read_pref.tag_sets,
            max_staleness,
        },
        _ => panic!("invalid read preference: {:?}", test_read_pref),
    };

    Some(read_pref)
}

fn is_master_response_from_server_type(server_type: ServerType) -> IsMasterCommandResponse {
    let mut response = IsMasterCommandResponse::default();

    match server_type {
        ServerType::Unknown => {
            response.ok = Some(0.0);
        }
        ServerType::Mongos => {
            response.ok = Some(1.0);
            response.msg = Some("isdbgrid".into());
        }
        ServerType::RSPrimary => {
            response.ok = Some(1.0);
            response.set_name = Some("foo".into());
            response.is_master = Some(true);
        }
        ServerType::RSOther => {
            response.ok = Some(1.0);
            response.set_name = Some("foo".into());
            response.hidden = Some(true);
        }
        ServerType::RSSecondary => {
            response.ok = Some(1.0);
            response.set_name = Some("foo".into());
            response.secondary = Some(true);
        }
        ServerType::RSArbiter => {
            response.ok = Some(1.0);
            response.set_name = Some("foo".into());
            response.arbiter_only = Some(true);
        }
        ServerType::RSGhost => {
            response.ok = Some(1.0);
            response.is_replica_set = Some(true);
        }
        ServerType::Standalone => {
            response.ok = Some(1.0);
        }
    };

    response
}

fn utc_datetime_from_millis(millis: i64) -> UtcDateTime {
    let seconds_portion = millis / 1000;
    let nanos_portion = (millis % 1000) * 1_000_000;

    let naive_datetime = NaiveDateTime::from_timestamp(seconds_portion, nanos_portion as u32);
    let datetime = DateTime::from_utc(naive_datetime, Utc);

    UtcDateTime(datetime)
}

fn convert_server_description(
    test_server_desc: TestServerDescription,
) -> Option<ServerDescription> {
    let server_type = match test_server_desc.server_type.into_server_type() {
        Some(server_type) => server_type,
        None => return None,
    };

    let mut command_response = is_master_response_from_server_type(server_type);
    command_response.tags = test_server_desc.tags;
    command_response.last_write = test_server_desc.last_write.map(|last_write| {
        let millis: i64 = last_write.last_write_date.number.parse().unwrap();

        LastWrite {
            last_write_date: utc_datetime_from_millis(millis),
        }
    });

    let is_master = IsMasterReply {
        command_response,
        round_trip_time: test_server_desc.avg_rtt_ms.map(f64_ms_as_duration),
    };

    let mut server_desc = ServerDescription::new(
        StreamAddress::parse(&test_server_desc.address).unwrap(),
        Some(Ok(is_master)),
    );
    server_desc.last_update_time = test_server_desc
        .last_update_time
        .map(utc_datetime_from_millis);

    Some(server_desc)
}

macro_rules! get_sorted_addresses {
    ($servers:expr) => {{
        let mut servers: Vec<_> = $servers.iter().map(|s| s.address.to_string()).collect();
        servers.sort_unstable();
        servers
    }};
}

fn run_test(test_file: TestFile) {
    let read_pref = match convert_read_preference(test_file.read_preference) {
        Some(read_pref) => read_pref,
        None => return,
    };

    let servers: Option<Vec<ServerDescription>> = test_file
        .topology_description
        .servers
        .into_iter()
        // The driver doesn't support server versions low enough not to support max staleness, so we
        // just manually filter them out here.
        .filter(|server| server.max_wire_version.map(|version| version >= 5).unwrap_or(true))
        .map(convert_server_description)
        .collect();

    let servers = match servers {
        Some(servers) => servers,
        None => return,
    };

    let topology = TopologyDescription {
        single_seed: servers.len() == 1,
        topology_type: test_file.topology_description.topology_type,
        set_name: None,
        max_set_version: None,
        max_election_id: None,
        compatibility_error: None,
        logical_session_timeout_minutes: None,
        local_threshold: None,
        heartbeat_freq: test_file.heartbeat_frequency_ms.map(Duration::from_millis),
        servers: servers
            .into_iter()
            .map(|server| (server.address.clone(), server))
            .collect(),
    };

    if let Some(ref expected_suitable_servers) = test_file.suitable_servers {
        let mut actual_servers: Vec<_> = topology.suitable_servers(&read_pref).unwrap();

        assert_eq!(
            get_sorted_addresses!(expected_suitable_servers),
            get_sorted_addresses!(&actual_servers),
        );

        if let Some(ref expected_in_latency_window) = test_file.in_latency_window {
            topology.retain_servers_within_latency_window(&mut actual_servers);

            assert_eq!(
                get_sorted_addresses!(expected_in_latency_window),
                get_sorted_addresses!(&actual_servers)
            );
        }
    }
}

#[test]
fn server_selection_replica_set_no_primary() {
    run_spec_test(
        &[
            "server-selection",
            "server_selection",
            "ReplicaSetNoPrimary",
            "read",
        ],
        run_test,
    );
}

#[test]
fn server_selection_replica_set_with_primary() {
    run_spec_test(
        &[
            "server-selection",
            "server_selection",
            "ReplicaSetWithPrimary",
            "read",
        ],
        run_test,
    );
}

#[test]
fn server_selection_sharded() {
    run_spec_test(
        &["server-selection", "server_selection", "Sharded", "read"],
        run_test,
    );
}

#[test]
fn server_selection_single() {
    run_spec_test(
        &["server-selection", "server_selection", "Single", "read"],
        run_test,
    );
}

#[test]
fn server_selection_unknown() {
    run_spec_test(
        &["server-selection", "server_selection", "Unknown", "read"],
        run_test,
    );
}

#[test]
fn max_staleness_replica_set_no_primary() {
    run_spec_test(&["max-staleness", "ReplicaSetNoPrimary"], run_test);
}

#[test]
fn max_staleness_replica_set_with_primary() {
    run_spec_test(&["max-staleness", "ReplicaSetWithPrimary"], run_test);
}

#[test]
fn max_staleness_sharded() {
    run_spec_test(&["max-staleness", "Sharded"], run_test);
}

#[test]
fn max_staleness_single() {
    run_spec_test(&["max-staleness", "Single"], run_test);
}

#[test]
fn max_staleness_unknown() {
    run_spec_test(&["max-staleness", "Unknown"], run_test);
}