1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
pub mod auth;
mod executor;
pub mod options;

use std::{
    sync::{Arc, RwLock},
    time::Duration,
};

use time::PreciseTime;

use bson::{Bson, Document};
use derivative::Derivative;

use crate::{
    concern::{ReadConcern, WriteConcern},
    db::Database,
    error::{ErrorKind, Result},
    event::command::{
        CommandEventHandler,
        CommandFailedEvent,
        CommandStartedEvent,
        CommandSucceededEvent,
    },
    operation::ListDatabases,
    options::{ClientOptions, DatabaseOptions},
    sdam::{Server, Topology, TopologyUpdateCondvar},
    selection_criteria::{ReadPreference, SelectionCriteria},
};

const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);

/// This is the main entry point for the API. A `Client` is used to connect to a MongoDB cluster.
/// By default, it will monitor the topology of the cluster, keeping track of any changes, such
/// as servers being added or removed
///
/// `Client` uses [`std::sync::Arc`](https://doc.rust-lang.org/std/sync/struct.Arc.html) internally,
/// so it can safely be shared across threads. For example:
///
/// ```rust
/// # use mongodb::{Client, error::Result};
/// #
/// # fn start_workers() -> Result<()> {
/// let client = Client::with_uri_str("mongodb://example.com")?;
///
/// for i in 0..5 {
///     let client_ref = client.clone();
///
///     std::thread::spawn(move || {
///         let collection = client_ref.database("items").collection(&format!("coll{}", i));
///
///         // Do something with the collection
///     });
/// }
/// #
/// # // Technically we should join the threads here, but for the purpose of the example, we'll just
/// # // sleep for a bit.
/// # std::thread::sleep(std::time::Duration::from_secs(3));
/// # Ok(())
/// # }
/// ```
#[derive(Clone, Debug)]
pub struct Client {
    inner: Arc<ClientInner>,
}

#[derive(Derivative)]
#[derivative(Debug)]
struct ClientInner {
    topology: Arc<RwLock<Topology>>,
    options: ClientOptions,
    #[derivative(Debug = "ignore")]
    condvar: TopologyUpdateCondvar,
}

impl Client {
    /// Creates a new `Client` connected to the cluster specified by `uri`. `uri` must be a valid
    /// MongoDB connection string.
    ///
    /// See the documentation on
    /// [`ClientOptions::parse`](options/struct.ClientOptions.html#method.parse) for more details.
    pub fn with_uri_str(uri: &str) -> Result<Self> {
        let options = ClientOptions::parse(uri)?;

        Client::with_options(options)
    }

    /// Creates a new `Client` connected to the cluster specified by `options`.
    pub fn with_options(options: ClientOptions) -> Result<Self> {
        let condvar = TopologyUpdateCondvar::new();

        let inner = Arc::new(ClientInner {
            topology: Topology::new(condvar.clone(), options.clone())?,
            condvar,
            options,
        });

        Ok(Self { inner })
    }

    fn emit_command_event(&self, emit: impl FnOnce(&Arc<dyn CommandEventHandler>)) {
        if let Some(ref handler) = self.inner.options.command_event_handler {
            emit(handler);
        }
    }

    /// Gets the default selection criteria the `Client` uses for operations..
    pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
        self.inner.options.selection_criteria.as_ref()
    }

    /// Gets the default read concern the `Client` uses for operations.
    pub fn read_concern(&self) -> Option<&ReadConcern> {
        self.inner.options.read_concern.as_ref()
    }

    /// Gets the default write concern the `Client` uses for operations.
    pub fn write_concern(&self) -> Option<&WriteConcern> {
        self.inner.options.write_concern.as_ref()
    }

    /// Gets a handle to a database specified by `name` in the cluster the `Client` is connected to.
    /// The `Database` options (e.g. read preference and write concern) will default to those of the
    /// `Client`.
    ///
    /// This method does not send or receive anything across the wire to the database, so it can be
    /// used repeatedly without incurring any costs from I/O.
    pub fn database(&self, name: &str) -> Database {
        Database::new(self.clone(), name, None)
    }

    /// Gets a handle to a database specified by `name` in the cluster the `Client` is connected to.
    /// Operations done with this `Database` will use the options specified by `options` by default
    /// and will otherwise default to those of the `Client`.
    ///
    /// This method does not send or receive anything across the wire to the database, so it can be
    /// used repeatedly without incurring any costs from I/O.
    pub fn database_with_options(&self, name: &str, options: DatabaseOptions) -> Database {
        Database::new(self.clone(), name, Some(options))
    }

    /// Gets information about each database present in the cluster the Client is connected to.
    pub fn list_databases(&self, filter: impl Into<Option<Document>>) -> Result<Vec<Document>> {
        let op = ListDatabases::new(filter.into(), false);
        self.execute_operation(&op, None)
    }

    /// Gets the names of the databases present in the cluster the Client is connected to.
    pub fn list_database_names(&self, filter: impl Into<Option<Document>>) -> Result<Vec<String>> {
        let op = ListDatabases::new(filter.into(), true);
        match self.execute_operation(&op, None) {
            Ok(databases) => databases
                .into_iter()
                .map(|doc| {
                    let name = doc.get("name").and_then(Bson::as_str).ok_or_else(|| {
                        ErrorKind::ResponseError {
                            message: "Expected \"name\" field in server response, but it was not \
                                      found"
                                .to_string(),
                        }
                    })?;
                    Ok(name.to_string())
                })
                .collect(),
            Err(e) => Err(e),
        }
    }

    pub(crate) fn send_command_started_event(&self, event: CommandStartedEvent) {
        self.emit_command_event(|handler| handler.handle_command_started_event(event.clone()));
    }

    pub(crate) fn send_command_succeeded_event(&self, event: CommandSucceededEvent) {
        self.emit_command_event(|handler| handler.handle_command_succeeded_event(event.clone()));
    }

    pub(crate) fn send_command_failed_event(&self, event: CommandFailedEvent) {
        self.emit_command_event(|handler| handler.handle_command_failed_event(event.clone()));
    }

    fn topology(&self) -> Arc<RwLock<Topology>> {
        self.inner.topology.clone()
    }

    /// Select a server using the provided criteria. If none is provided, a primary read preference
    /// will be used instead.
    fn select_server(&self, criteria: Option<&SelectionCriteria>) -> Result<Arc<Server>> {
        let criteria =
            criteria.unwrap_or_else(|| &SelectionCriteria::ReadPreference(ReadPreference::Primary));
        let start_time = PreciseTime::now();
        let timeout = self
            .inner
            .options
            .server_selection_timeout
            .unwrap_or(DEFAULT_SERVER_SELECTION_TIMEOUT);

        while start_time.to(PreciseTime::now()).to_std().unwrap() < timeout {
            // Because we're calling clone on the lock guard, we're actually copying the
            // Topology itself, not just making a new reference to it. The
            // `servers` field will contain references to the same instances
            // though, since each is wrapped in an `Arc`.
            let topology = self.inner.topology.read().unwrap().clone();

            // Return error if the wire version is invalid.
            if let Some(error_msg) = topology.description.compatibility_error() {
                return Err(ErrorKind::ServerSelectionError {
                    message: error_msg.into(),
                }
                .into());
            }

            let server = topology
                .description
                .select_server(&criteria)?
                .and_then(|server| topology.servers.get(&server.address));

            if let Some(server) = server {
                return Ok(server.clone());
            }

            // Because the servers in the copied Topology are Arc aliases of the servers in the
            // original Topology, requesting a check on the copy will in turn request a check from
            // each of the original servers, so the monitoring threads will be woken the same way
            // they would if `request_topology_check` were called on the original Topology.
            topology.request_topology_check();

            self.inner
                .condvar
                .wait_timeout(timeout - start_time.to(PreciseTime::now()).to_std().unwrap());
        }

        Err(ErrorKind::ServerSelectionError {
            message: "timed out while trying to select server".into(),
        }
        .into())
    }
}