mongors 0.2.2

Rust driver for MongoDB
use std::time::Instant;

use client::MongoClient;
use pool::PooledStream;
use bson::{Document, Bson};
use message::{OpMsg, Section};
use apm::{CommandStarted, CommandResult, EventRunner};
use error::{Result, Error, CodedError};

macro_rules! try {
    (
        $client:expr,
        $duration:expr,
        $command_name:expr,
        $request_id:expr,
        $connstring:expr,
        $result:expr
    ) => {
        match $result {
            Ok(result) => result,
            Err(err) => {
                if let Err(_) = $client.run_completion_hooks(&CommandResult::Failure {
                    duration: $duration,
                    command_name: $command_name.clone(),
                    failure: &err,
                    request_id: $request_id,
                    connection_string: $connstring.clone()
                }) {
                    return Err(Error::EventListenerError(Some(Box::new(err))));
                }

                return Err(err)
            }
        }
    }
}

pub fn base_command(
    client: &MongoClient,
    stream: &mut PooledStream,
    command: Document
) -> Result<Document> {
    let mut msg_builder = OpMsg::builder();
    let request_msg = msg_builder
        .request_id(client.get_request_id())
        .push_section(Section::from_document(&command)?)
        .build();

    let socket = stream.get_socket();
    let connstring = format!("{}", socket.get_ref().peer_addr()?);
    let command_name = {
        if let Some((ref command_name, _)) = command.front() {
            command_name.to_string()
        } else {
            return Err(Error::ArgumentError("Then command must include command name".to_string()))
        }
    };

    let db_name = if let Some(&Bson::String(ref db_name)) = command.get("$db") {
        db_name.to_owned()
    } else {
        return Err(Error::ArgumentError("The command must include '$db'".to_string()))
    };

    let init_time = Instant::now();

    if let Err(_) = client.run_start_hooks(&CommandStarted {
        command: command.clone(),
        database_name: db_name,
        command_name: command_name.clone(),
        request_id: i64::from(request_msg.header.request_id),
        connection_string: connstring.clone()
    }) {
        return Err(Error::EventListenerError(None))
    }

    try!(
        client,
        u64::from((Instant::now() - init_time).subsec_nanos()),
        command_name,
        i64::from(request_msg.header.request_id),
        connstring,
        request_msg.write(socket)
    );

    let response_msg = try!(
        client,
        u64::from((Instant::now() - init_time).subsec_nanos()),
        command_name,
        i64::from(request_msg.header.request_id),
        connstring,
        OpMsg::read(socket)
    );

    let doc = try!(
        client,
        u64::from((Instant::now() - init_time).subsec_nanos()),
        command_name,
        i64::from(request_msg.header.request_id),
        connstring,
        response_msg.get_document()
    );

    if let Some(&Bson::Int32(code)) = doc.get("code") {
        if let Some(&Bson::String(ref msg)) = doc.get("errmsg") {
            return Err(Error::CodedError(CodedError{code: code, errmsg: msg.to_string()}))
        }
    }

    if let Err(_) = client.run_completion_hooks(&CommandResult::Success {
        duration: u64::from((Instant::now() - init_time).subsec_nanos()),
        reply: &doc,
        command_name,
        request_id: i64::from(request_msg.header.request_id),
        connection_string: connstring
    }) {
        return Err(Error::EventListenerError(None))
    }

    Ok(doc)
}

pub fn base_command_wihtout_hook(
    client: &MongoClient,
    stream: &mut PooledStream,
    command: Document
) -> Result<Document> {
    let mut msg_builder = OpMsg::builder();
    let request_msg = msg_builder
        .request_id(client.get_request_id())
        .push_section(Section::from_document(&command)?)
        .build();

    let socket = stream.get_socket();

    request_msg.write(socket)?;

    let response_msg = OpMsg::read(socket)?;

    let doc = response_msg.get_document()?;

    if let Some(&Bson::Int32(code)) = doc.get("code") {
        if let Some(&Bson::String(ref msg)) = doc.get("errmsg") {
            return Err(Error::CodedError(CodedError{code: code, errmsg: msg.to_string()}))
        }
    }

    Ok(doc)
}

pub fn is_write_command(name: &str) -> bool {

    let names: Vec<_> = vec![
        "aggregate",
        "count",
        "distinct",
        "find",
        "user",
        "isMaster",
        "usersInfo",
        "listCollections",
        "listDatabases",
        "listIndexes"
    ];

    if names.contains(&name) {
        return false
    } else {
        return true
    }
}