mongodb/
coll.rs

1mod action;
2pub mod options;
3
4use std::{fmt, fmt::Debug, str::FromStr, sync::Arc};
5
6use crate::bson::rawdoc;
7use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize};
8
9use self::options::*;
10use crate::{
11    client::options::ServerAddress,
12    cmap::conn::PinnedConnectionHandle,
13    concern::{ReadConcern, WriteConcern},
14    error::{Error, Result},
15    selection_criteria::SelectionCriteria,
16    Client,
17    Database,
18};
19
20/// `Collection` is the client-side abstraction of a MongoDB Collection. It can be used to
21/// perform collection-level operations such as CRUD operations. A `Collection` can be obtained
22/// through a [`Database`](struct.Database.html) by calling either
23/// [`Database::collection`](struct.Database.html#method.collection) or
24/// [`Database::collection_with_options`](struct.Database.html#method.collection_with_options).
25///
26/// A [`Collection`] can be parameterized with any type that implements the
27/// `Serialize` and `Deserialize` traits from the [`serde`](https://serde.rs/) crate. This includes but
28/// is not limited to just `Document`. The various methods that accept or return instances of the
29/// documents in the collection will accept/return instances of the generic parameter (e.g.
30/// [`Collection::insert_one`] accepts it as an argument, [`Collection::find_one`] returns an
31/// `Option` of it). It is recommended to define types that model your data which you can
32/// parameterize your [`Collection`]s with instead of `Document`, since doing so eliminates a lot of
33/// boilerplate deserialization code and is often more performant.
34///
35/// `Collection` uses [`std::sync::Arc`](https://doc.rust-lang.org/std/sync/struct.Arc.html) internally,
36/// so it can safely be shared across threads or async tasks.
37///
38/// # Example
39/// ```rust
40/// # use mongodb::{
41/// #     bson::doc,
42/// #     error::Result,
43/// # };
44/// #
45/// # async fn start_workers() -> Result<()> {
46/// # use mongodb::Client;
47/// #
48/// # let client = Client::with_uri_str("mongodb://example.com").await?;
49/// use serde::{Deserialize, Serialize};
50///
51/// /// Define a type that models our data.
52/// #[derive(Clone, Debug, Deserialize, Serialize)]
53/// struct Item {
54///     id: u32,
55/// }
56///
57/// // Parameterize our collection with the model.
58/// let coll = client.database("items").collection::<Item>("in_stock");
59///
60/// for i in 0..5 {
61///     let coll_ref = coll.clone();
62///
63///     // Spawn several tasks that operate on the same collection concurrently.
64///     tokio::task::spawn(async move {
65///         // Perform operations with `coll_ref` that work with directly our model.
66///         coll_ref.insert_one(Item { id: i }).await;
67///     });
68/// }
69/// #
70/// # Ok(())
71/// # }
72/// ```
73#[derive(Debug)]
74pub struct Collection<T>
75where
76    T: Send + Sync,
77{
78    inner: Arc<CollectionInner>,
79    _phantom: std::marker::PhantomData<fn() -> T>,
80}
81
82// Because derive is too conservative, derive only implements Clone if T is Clone.
83// Collection<T> does not actually store any value of type T (so T does not need to be clone).
84impl<T> Clone for Collection<T>
85where
86    T: Send + Sync,
87{
88    fn clone(&self) -> Self {
89        Self {
90            inner: self.inner.clone(),
91            _phantom: Default::default(),
92        }
93    }
94}
95
96#[derive(Debug, Clone)]
97struct CollectionInner {
98    client: Client,
99    db: Database,
100    name: String,
101    selection_criteria: Option<SelectionCriteria>,
102    read_concern: Option<ReadConcern>,
103    write_concern: Option<WriteConcern>,
104}
105
106impl<T> Collection<T>
107where
108    T: Send + Sync,
109{
110    pub(crate) fn new(db: Database, name: &str, options: Option<CollectionOptions>) -> Self {
111        let options = options.unwrap_or_default();
112        let selection_criteria = options
113            .selection_criteria
114            .or_else(|| db.selection_criteria().cloned());
115
116        let read_concern = options.read_concern.or_else(|| db.read_concern().cloned());
117
118        let write_concern = options
119            .write_concern
120            .or_else(|| db.write_concern().cloned());
121
122        Self {
123            inner: Arc::new(CollectionInner {
124                client: db.client().clone(),
125                db,
126                name: name.to_string(),
127                selection_criteria,
128                read_concern,
129                write_concern,
130            }),
131            _phantom: Default::default(),
132        }
133    }
134
135    /// Gets a clone of the `Collection` with a different type `U`.
136    pub fn clone_with_type<U: Send + Sync>(&self) -> Collection<U> {
137        Collection {
138            inner: self.inner.clone(),
139            _phantom: Default::default(),
140        }
141    }
142
143    pub(crate) fn clone_unconcerned(&self) -> Self {
144        let mut new_inner = CollectionInner::clone(&self.inner);
145        new_inner.write_concern = None;
146        new_inner.read_concern = None;
147        Self {
148            inner: Arc::new(new_inner),
149            _phantom: Default::default(),
150        }
151    }
152
153    /// Get the `Client` that this collection descended from.
154    pub fn client(&self) -> &Client {
155        &self.inner.client
156    }
157
158    /// Gets the name of the `Collection`.
159    pub fn name(&self) -> &str {
160        &self.inner.name
161    }
162
163    /// Gets the namespace of the `Collection`.
164    ///
165    /// The namespace of a MongoDB collection is the concatenation of the name of the database
166    /// containing it, the '.' character, and the name of the collection itself. For example, if a
167    /// collection named "bar" is created in a database named "foo", the namespace of the collection
168    /// is "foo.bar".
169    pub fn namespace(&self) -> Namespace {
170        Namespace {
171            db: self.inner.db.name().into(),
172            coll: self.name().into(),
173        }
174    }
175
176    /// Gets the selection criteria of the `Collection`.
177    pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
178        self.inner.selection_criteria.as_ref()
179    }
180
181    /// Gets the read concern of the `Collection`.
182    pub fn read_concern(&self) -> Option<&ReadConcern> {
183        self.inner.read_concern.as_ref()
184    }
185
186    /// Gets the write concern of the `Collection`.
187    pub fn write_concern(&self) -> Option<&WriteConcern> {
188        self.inner.write_concern.as_ref()
189    }
190
191    /// Kill the server side cursor that id corresponds to.
192    pub(super) async fn kill_cursor(
193        &self,
194        cursor_id: i64,
195        pinned_connection: Option<&PinnedConnectionHandle>,
196        drop_address: Option<ServerAddress>,
197    ) -> Result<()> {
198        let ns = self.namespace();
199
200        let op = crate::operation::run_command::RunCommand::new(
201            ns.db,
202            rawdoc! {
203                "killCursors": ns.coll.as_str(),
204                "cursors": [cursor_id]
205            },
206            drop_address.map(SelectionCriteria::from_address),
207            pinned_connection,
208        );
209        self.client().execute_operation(op, None).await?;
210        Ok(())
211    }
212}
213
214/// A struct modeling the canonical name for a collection in MongoDB.
215#[derive(Debug, Clone, PartialEq, Eq, Hash)]
216pub struct Namespace {
217    /// The name of the database associated with this namespace.
218    pub db: String,
219
220    /// The name of the collection this namespace corresponds to.
221    pub coll: String,
222}
223
224impl Namespace {
225    /// Construct a `Namespace` with the given database and collection.
226    pub fn new(db: impl Into<String>, coll: impl Into<String>) -> Self {
227        Self {
228            db: db.into(),
229            coll: coll.into(),
230        }
231    }
232
233    pub(crate) fn from_str(s: &str) -> Option<Self> {
234        let mut parts = s.split('.');
235
236        let db = parts.next();
237        let coll = parts.collect::<Vec<_>>().join(".");
238
239        match (db, coll) {
240            (Some(db), coll) if !coll.is_empty() => Some(Self {
241                db: db.to_string(),
242                coll,
243            }),
244            _ => None,
245        }
246    }
247}
248
249impl fmt::Display for Namespace {
250    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
251        write!(fmt, "{}.{}", self.db, self.coll)
252    }
253}
254
255impl<'de> Deserialize<'de> for Namespace {
256    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
257    where
258        D: Deserializer<'de>,
259    {
260        let s: String = Deserialize::deserialize(deserializer)?;
261        Self::from_str(&s)
262            .ok_or_else(|| D::Error::custom("Missing one or more fields in namespace"))
263    }
264}
265
266impl Serialize for Namespace {
267    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
268    where
269        S: serde::Serializer,
270    {
271        serializer.serialize_str(&(self.db.clone() + "." + &self.coll))
272    }
273}
274
275impl FromStr for Namespace {
276    type Err = Error;
277    fn from_str(s: &str) -> Result<Self> {
278        let mut parts = s.split('.');
279
280        let db = parts.next();
281        let coll = parts.collect::<Vec<_>>().join(".");
282
283        match (db, coll) {
284            (Some(db), coll) if !coll.is_empty() => Ok(Self {
285                db: db.to_string(),
286                coll,
287            }),
288            _ => Err(Self::Err::invalid_argument(
289                "Missing one or more fields in namespace",
290            )),
291        }
292    }
293}