mongodb 0.3.2

A native MongoDB driver developed under MongoDB labs.
Documentation
//! Wire protocol operational client-server communication logic.
use bson;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use Error::{ArgumentError, ResponseError};
use Result;
use wire_protocol::header::{Header, OpCode};
use wire_protocol::flags::{OpInsertFlags, OpQueryFlags, OpReplyFlags, OpUpdateFlags};

use std::io::{Read, Write};
use std::mem;
use std::result::Result::{Ok, Err};

trait ByteLength {
    /// Calculates the number of bytes in the serialized version of the struct.
    fn byte_length(&self) -> Result<i32>;
}

impl ByteLength for bson::Document {
    /// Gets the length of a BSON document.
    ///
    /// # Return value
    ///
    /// Returns the number of bytes in the serialized BSON document, or an
    /// Error if the document couldn't be serialized.
    fn byte_length(&self) -> Result<i32> {
        let mut temp_buffer = vec![];

        try!(bson::encode_document(&mut temp_buffer, self));
        Ok(temp_buffer.len() as i32)
    }
}

/// Represents a message in the MongoDB Wire Protocol.
pub enum Message {
    OpReply {
        /// The message header.
        header: Header,
        /// A Bit vector of reply options.
        flags: OpReplyFlags,
        /// Uniquely identifies the cursor being returned.
        cursor_id: i64,
        /// The starting position for the cursor.
        starting_from: i32,
        /// The total number of documents being returned.
        number_returned: i32,
        /// The documents being returned.
        documents: Vec<bson::Document>,
    },
    OpUpdate {
        /// The message header.
        header: Header,
        // The wire protocol specifies that a 32-bit 0 field goes here
        /// The full qualified name of the collection, beginning with the
        /// database name and a dot separator.
        namespace: String,
        /// A bit vector of update options.
        flags: OpUpdateFlags,
        /// Identifies the document(s) to be updated.
        selector: bson::Document,
        /// Instruction document for how to update the document(s).
        update: bson::Document,
    },
    OpInsert {
        /// The message header.
        header: Header,
        /// A bit vector of insert options.
        flags: OpInsertFlags,
        /// The full qualified name of the collection, beginning with the
        /// database name and a dot separator.
        namespace: String,
        /// The documents to be inserted.
        documents: Vec<bson::Document>,
    },
    OpQuery {
        /// The message header.
        header: Header,
        /// A bit vector of query options.
        flags: OpQueryFlags,
        /// The full qualified name of the collection, beginning with the
        /// database name and a dot separator.
        namespace: String,
        /// The number of initial documents to skip over in the query results.
        number_to_skip: i32,
        /// The total number of documents that should be returned by the query.
        number_to_return: i32,
        /// Specifies which documents to return.
        query: bson::Document,
        /// An optional projection of which fields should be present in the
        /// documents to be returned by the query.
        return_field_selector: Option<bson::Document>,
    },
    OpGetMore {
        /// The message header.
        header: Header,
        // The wire protocol specifies that a 32-bit 0 field goes here
        /// The full qualified name of the collection, beginning with the
        /// database name and a dot separator.
        namespace: String,
        /// The total number of documents that should be returned by the query.
        number_to_return: i32,
        /// Uniquely identifies the cursor being returned.
        cursor_id: i64,
    },
}

impl Message {
    /// Constructs a new message for a reply.
    fn new_reply(header: Header,
                 flags: i32,
                 cursor_id: i64,
                 starting_from: i32,
                 number_returned: i32,
                 documents: Vec<bson::Document>)
                 -> Message {
        Message::OpReply {
            header: header,
            flags: OpReplyFlags::from_bits_truncate(flags),
            cursor_id: cursor_id,
            starting_from: starting_from,
            number_returned: number_returned,
            documents: documents,
        }
    }

    /// Constructs a new message for an update.
    pub fn new_update(request_id: i32,
                      namespace: String,
                      flags: OpUpdateFlags,
                      selector: bson::Document,
                      update: bson::Document)
                      -> Result<Message> {
        let header_length = mem::size_of::<Header>() as i32;

        // Add an extra byte after the string for null-termination.
        let string_length = namespace.len() as i32 + 1;

        // There are two i32 fields -- `flags` is represented in the struct as
        // a bit vector, and the wire protocol-specified ZERO field.
        let i32_length = mem::size_of::<i32>() as i32 * 2;

        let selector_length = try!(selector.byte_length());
        let update_length = try!(update.byte_length());

        let total_length = header_length + string_length + i32_length + selector_length +
                           update_length;

        let header = Header::new_update(total_length, request_id);

        Ok(Message::OpUpdate {
            header: header,
            namespace: namespace,
            flags: flags,
            selector: selector,
            update: update,
        })
    }

    /// Constructs a new message request for an insertion.
    pub fn new_insert(request_id: i32,
                      flags: OpInsertFlags,
                      namespace: String,
                      documents: Vec<bson::Document>)
                      -> Result<Message> {
        let header_length = mem::size_of::<Header>() as i32;
        let flags_length = mem::size_of::<i32>() as i32;

        // Add an extra byte after the string for null-termination.
        let string_length = namespace.len() as i32 + 1;

        let mut total_length = header_length + flags_length + string_length;

        for doc in &documents {
            total_length += try!(doc.byte_length());
        }

        let header = Header::new_insert(total_length, request_id);

        Ok(Message::OpInsert {
            header: header,
            flags: flags,
            namespace: namespace,
            documents: documents,
        })
    }

    /// Constructs a new message request for a query.
    pub fn new_query(request_id: i32,
                     flags: OpQueryFlags,
                     namespace: String,
                     number_to_skip: i32,
                     number_to_return: i32,
                     query: bson::Document,
                     return_field_selector: Option<bson::Document>)
                     -> Result<Message> {

        let header_length = mem::size_of::<Header>() as i32;

        // There are three i32 fields in the an OpQuery (since OpQueryFlags is
        // represented as an 32-bit vector in the wire protocol).
        let i32_length = 3 * mem::size_of::<i32>() as i32;

        // Add an extra byte after the string for null-termination.
        let string_length = namespace.len() as i32 + 1;

        let bson_length = try!(query.byte_length());

        // Add the length of the optional BSON document only if it exists.
        let option_length = match return_field_selector {
            Some(ref bson) => try!(bson.byte_length()),
            None => 0,
        };

        let total_length = header_length + i32_length + string_length + bson_length + option_length;

        let header = Header::new_query(total_length, request_id);

        Ok(Message::OpQuery {
            header: header,
            flags: flags,
            namespace: namespace,
            number_to_skip: number_to_skip,
            number_to_return: number_to_return,
            query: query,
            return_field_selector: return_field_selector,
        })
    }

    /// Constructs a new "get more" request message.
    pub fn new_get_more(request_id: i32,
                        namespace: String,
                        number_to_return: i32,
                        cursor_id: i64)
                        -> Message {
        let header_length = mem::size_of::<Header>() as i32;

        // There are two i32 fields because of the reserved "ZERO".
        let i32_length = 2 * mem::size_of::<i32>() as i32;

        // Add an extra byte after the string for null-termination.
        let string_length = namespace.len() as i32 + 1;

        let i64_length = mem::size_of::<i64>() as i32;
        let total_length = header_length + i32_length + string_length + i64_length;

        let header = Header::new_get_more(total_length, request_id);

        Message::OpGetMore {
            header: header,
            namespace: namespace,
            number_to_return: number_to_return,
            cursor_id: cursor_id,
        }
    }

    /// Writes a serialized BSON document to a given buffer.
    ///
    /// # Arguments
    ///
    /// `buffer` - The buffer to write to.
    /// `bson` - The document to serialize and write.
    ///
    /// # Return value
    ///
    /// Returns nothing on success, or an Error on failure.
    fn write_bson_document<W: Write>(buffer: &mut W, bson: &bson::Document) -> Result<()> {
        let mut temp_buffer = vec![];

        try!(bson::encode_document(&mut temp_buffer, bson));
        try!(buffer.write_all(&temp_buffer));
        Ok(())
    }

    /// Writes a serialized update message to a given buffer.
    ///
    /// # Arguments
    ///
    /// `buffer` - The buffer to write to.
    /// `header` - The header for the given message.
    /// `namespace` - The full qualified name of the collection, beginning with
    ///               the database name and a dot.
    /// `flags` - Bit vector of query option.
    /// `selector` - Identifies the document(s) to be updated.
    /// `update` - Instructs how to update the document(s).
    ///
    /// # Return value
    ///
    /// Returns nothing on success, or an Error on failure.
    pub fn write_update<W: Write>(buffer: &mut W,
                                  header: &Header,
                                  namespace: &str,
                                  flags: &OpUpdateFlags,
                                  selector: &bson::Document,
                                  update: &bson::Document)
                                  -> Result<()> {

        try!(header.write(buffer));

        // Write ZERO field
        try!(buffer.write_i32::<LittleEndian>(0));

        for byte in namespace.bytes() {
            try!(buffer.write_u8(byte));
        }

        // Writes the null terminator for the collection name string.
        try!(buffer.write_u8(0));

        try!(buffer.write_i32::<LittleEndian>(flags.bits()));

        try!(Message::write_bson_document(buffer, selector));
        try!(Message::write_bson_document(buffer, update));

        let _ = buffer.flush();
        Ok(())
    }

    /// Writes a serialized update message to a given buffer.
    ///
    /// # Arguments
    ///
    /// `buffer` - The buffer to write to.
    /// `header` - The header for the given message.
    /// `flags` - Bit vector of query options.
    /// `namespace` - The full qualified name of the collection, beginning with
    ///               the database name and a dot.
    /// `documents` - The documents to insert.
    ///
    /// # Return value
    ///
    /// Returns nothing on success, or an Error on failure.
    fn write_insert<W: Write>(buffer: &mut W,
                              header: &Header,
                              flags: &OpInsertFlags,
                              namespace: &str,
                              documents: &[bson::Document])
                              -> Result<()> {

        try!(header.write(buffer));
        try!(buffer.write_i32::<LittleEndian>(flags.bits()));

        for byte in namespace.bytes() {
            try!(buffer.write_u8(byte));
        }

        // Writes the null terminator for the collection name string.
        try!(buffer.write_u8(0));

        for doc in documents {
            try!(Message::write_bson_document(buffer, doc));
        }

        let _ = buffer.flush();
        Ok(())
    }

    /// Writes a serialized query message to a given buffer.
    ///
    /// # Arguments
    ///
    /// `buffer` - The buffer to write to.
    /// `header` - The header for the given message.
    /// `flags` - Bit vector of query option.
    /// `namespace` - The full qualified name of the collection, beginning with
    ///               the database name and a dot.
    /// `number_to_skip` - The number of initial documents to skip over in the
    ///                    query results.
    /// `number_to_return - The total number of documents that should be
    ///                     returned by the query.
    /// `query` - Specifies which documents to return.
    /// `return_field_selector - An optional projection of which fields should
    ///                          be present in the documents to be returned by
    ///                          the query.
    ///
    /// # Return value
    ///
    /// Returns nothing on success, or an Error on failure.
    fn write_query<W: Write>(buffer: &mut W,
                             header: &Header,
                             flags: &OpQueryFlags,
                             namespace: &str,
                             number_to_skip: i32,
                             number_to_return: i32,
                             query: &bson::Document,
                             return_field_selector: &Option<bson::Document>)
                             -> Result<()> {

        try!(header.write(buffer));
        try!(buffer.write_i32::<LittleEndian>(flags.bits()));

        for byte in namespace.bytes() {
            try!(buffer.write_u8(byte));
        }

        // Writes the null terminator for the collection name string.
        try!(buffer.write_u8(0));

        try!(buffer.write_i32::<LittleEndian>(number_to_skip));
        try!(buffer.write_i32::<LittleEndian>(number_to_return));
        try!(Message::write_bson_document(buffer, query));

        if let Some(ref doc) = *return_field_selector {
            try!(Message::write_bson_document(buffer, doc));
        }

        let _ = buffer.flush();
        Ok(())
    }

    /// Writes a serialized "get more" request to a given buffer.
    ///
    /// # Arguments
    ///
    /// `buffer` - The buffer to write to.
    /// `header` - The header for the given message.
    /// `namespace` - The full qualified name of the collection, beginning with
    ///               the database name and a dot.
    /// `number_to_return - The total number of documents that should be
    ///                     returned by the query.
    /// `cursor_id` - Specifies which cursor to get more documents from.
    ///
    /// # Return value
    ///
    /// Returns nothing on success, or an Error on failure.
    pub fn write_get_more<W: Write>(buffer: &mut W,
                                    header: &Header,
                                    namespace: &str,
                                    number_to_return: i32,
                                    cursor_id: i64)
                                    -> Result<()> {

        try!(header.write(buffer));

        // Write ZERO field
        try!(buffer.write_i32::<LittleEndian>(0));

        for byte in namespace.bytes() {
            try!(buffer.write_u8(byte));
        }

        // Writes the null terminator for the collection name string.
        try!(buffer.write_u8(0));

        try!(buffer.write_i32::<LittleEndian>(number_to_return));
        try!(buffer.write_i64::<LittleEndian>(cursor_id));

        let _ = buffer.flush();
        Ok(())
    }

    /// Attemps to write the serialized message to a buffer.
    ///
    /// # Arguments
    ///
    /// `buffer` - The buffer to write to.
    ///
    /// # Return value
    ///
    /// Returns nothing on success, or an error string on failure.
    pub fn write<W: Write>(&self, buffer: &mut W) -> Result<()> {
        match *self {
            /// Only the server should send replies
            Message::OpReply { .. } => {
                Err(ArgumentError(String::from("OP_REPLY should not be sent to the client.")))
            }
            Message::OpUpdate { ref header,
                                ref namespace,
                                ref flags,
                                ref selector,
                                ref update } => {
                Message::write_update(buffer, header, namespace, flags, selector, update)
            }
            Message::OpInsert { ref header, ref flags, ref namespace, ref documents } => {
                Message::write_insert(buffer, header, flags, namespace, documents)
            }
            Message::OpQuery { ref header,
                               ref flags,
                               ref namespace,
                               number_to_skip,
                               number_to_return,
                               ref query,
                               ref return_field_selector } => {
                Message::write_query(buffer,
                                     header,
                                     flags,
                                     namespace,
                                     number_to_skip,
                                     number_to_return,
                                     query,
                                     return_field_selector)
            }
            Message::OpGetMore { ref header, ref namespace, number_to_return, cursor_id } => {
                Message::write_get_more(buffer, header, namespace, number_to_return, cursor_id)
            }
        }
    }

    /// Reads a serialized reply message from a buffer
    ///
    /// # Arguments
    ///
    /// `buffer` - The buffer to read from.
    ///
    /// # Return value
    ///
    /// Returns the reply message on success, or an Error on failure.
    fn read_reply<R: Read>(buffer: &mut R, header: Header) -> Result<Message> {
        let mut length = header.message_length - mem::size_of::<Header>() as i32;

        // Read flags
        let flags = try!(buffer.read_i32::<LittleEndian>());
        length -= mem::size_of::<i32>() as i32;

        // Read cursor_id
        let cid = try!(buffer.read_i64::<LittleEndian>());
        length -= mem::size_of::<i64>() as i32;

        // Read starting_from
        let sf = try!(buffer.read_i32::<LittleEndian>());
        length -= mem::size_of::<i32>() as i32;

        // Read number_returned
        let nr = try!(buffer.read_i32::<LittleEndian>());
        length -= mem::size_of::<i32>() as i32;

        let mut v = vec![];

        while length > 0 {
            let bson = try!(bson::decode_document(buffer));
            length -= try!(bson.byte_length());
            v.push(bson);
        }

        Ok(Message::new_reply(header, flags, cid, sf, nr, v))
    }

    /// Attempts to read a serialized reply Message from a buffer.
    ///
    /// # Arguments
    ///
    /// `buffer` - The buffer to read from.
    ///
    /// # Return value
    ///
    /// Returns the reply message on success, or an Error on failure.
    pub fn read<T>(buffer: &mut T) -> Result<Message>
        where T: Read + Write
    {
        let header = try!(Header::read(buffer));
        match header.op_code {
            OpCode::Reply => Message::read_reply(buffer, header),
            opcode => {
                Err(ResponseError(format!("Expected to read OpCode::Reply but instead found \
                                           opcode {}",
                                          opcode)))
            }
        }
    }
}