1mod action;
2pub mod options;
3
4use std::{fmt, fmt::Debug, str::FromStr, sync::Arc};
5
6use crate::bson::rawdoc;
7use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize};
8
9use self::options::*;
10use crate::{
11 client::options::ServerAddress,
12 cmap::conn::PinnedConnectionHandle,
13 concern::{ReadConcern, WriteConcern},
14 error::{Error, Result},
15 selection_criteria::SelectionCriteria,
16 Client,
17 Database,
18};
19
20#[derive(Debug)]
74pub struct Collection<T>
75where
76 T: Send + Sync,
77{
78 inner: Arc<CollectionInner>,
79 _phantom: std::marker::PhantomData<fn() -> T>,
80}
81
82impl<T> Clone for Collection<T>
85where
86 T: Send + Sync,
87{
88 fn clone(&self) -> Self {
89 Self {
90 inner: self.inner.clone(),
91 _phantom: Default::default(),
92 }
93 }
94}
95
96#[derive(Debug, Clone)]
97struct CollectionInner {
98 client: Client,
99 db: Database,
100 name: String,
101 selection_criteria: Option<SelectionCriteria>,
102 read_concern: Option<ReadConcern>,
103 write_concern: Option<WriteConcern>,
104}
105
106impl<T> Collection<T>
107where
108 T: Send + Sync,
109{
110 pub(crate) fn new(db: Database, name: &str, options: Option<CollectionOptions>) -> Self {
111 let options = options.unwrap_or_default();
112 let selection_criteria = options
113 .selection_criteria
114 .or_else(|| db.selection_criteria().cloned());
115
116 let read_concern = options.read_concern.or_else(|| db.read_concern().cloned());
117
118 let write_concern = options
119 .write_concern
120 .or_else(|| db.write_concern().cloned());
121
122 Self {
123 inner: Arc::new(CollectionInner {
124 client: db.client().clone(),
125 db,
126 name: name.to_string(),
127 selection_criteria,
128 read_concern,
129 write_concern,
130 }),
131 _phantom: Default::default(),
132 }
133 }
134
135 pub fn clone_with_type<U: Send + Sync>(&self) -> Collection<U> {
137 Collection {
138 inner: self.inner.clone(),
139 _phantom: Default::default(),
140 }
141 }
142
143 pub(crate) fn clone_unconcerned(&self) -> Self {
144 let mut new_inner = CollectionInner::clone(&self.inner);
145 new_inner.write_concern = None;
146 new_inner.read_concern = None;
147 Self {
148 inner: Arc::new(new_inner),
149 _phantom: Default::default(),
150 }
151 }
152
153 pub fn client(&self) -> &Client {
155 &self.inner.client
156 }
157
158 pub fn name(&self) -> &str {
160 &self.inner.name
161 }
162
163 pub fn namespace(&self) -> Namespace {
170 Namespace {
171 db: self.inner.db.name().into(),
172 coll: self.name().into(),
173 }
174 }
175
176 pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
178 self.inner.selection_criteria.as_ref()
179 }
180
181 pub fn read_concern(&self) -> Option<&ReadConcern> {
183 self.inner.read_concern.as_ref()
184 }
185
186 pub fn write_concern(&self) -> Option<&WriteConcern> {
188 self.inner.write_concern.as_ref()
189 }
190
191 pub(super) async fn kill_cursor(
193 &self,
194 cursor_id: i64,
195 pinned_connection: Option<&PinnedConnectionHandle>,
196 drop_address: Option<ServerAddress>,
197 ) -> Result<()> {
198 let ns = self.namespace();
199
200 let op = crate::operation::run_command::RunCommand::new(
201 ns.db,
202 rawdoc! {
203 "killCursors": ns.coll.as_str(),
204 "cursors": [cursor_id]
205 },
206 drop_address.map(SelectionCriteria::from_address),
207 pinned_connection,
208 );
209 self.client().execute_operation(op, None).await?;
210 Ok(())
211 }
212}
213
214#[derive(Debug, Clone, PartialEq, Eq, Hash)]
216pub struct Namespace {
217 pub db: String,
219
220 pub coll: String,
222}
223
224impl Namespace {
225 pub fn new(db: impl Into<String>, coll: impl Into<String>) -> Self {
227 Self {
228 db: db.into(),
229 coll: coll.into(),
230 }
231 }
232
233 pub(crate) fn from_str(s: &str) -> Option<Self> {
234 let mut parts = s.split('.');
235
236 let db = parts.next();
237 let coll = parts.collect::<Vec<_>>().join(".");
238
239 match (db, coll) {
240 (Some(db), coll) if !coll.is_empty() => Some(Self {
241 db: db.to_string(),
242 coll,
243 }),
244 _ => None,
245 }
246 }
247}
248
249impl fmt::Display for Namespace {
250 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
251 write!(fmt, "{}.{}", self.db, self.coll)
252 }
253}
254
255impl<'de> Deserialize<'de> for Namespace {
256 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
257 where
258 D: Deserializer<'de>,
259 {
260 let s: String = Deserialize::deserialize(deserializer)?;
261 Self::from_str(&s)
262 .ok_or_else(|| D::Error::custom("Missing one or more fields in namespace"))
263 }
264}
265
266impl Serialize for Namespace {
267 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
268 where
269 S: serde::Serializer,
270 {
271 serializer.serialize_str(&(self.db.clone() + "." + &self.coll))
272 }
273}
274
275impl FromStr for Namespace {
276 type Err = Error;
277 fn from_str(s: &str) -> Result<Self> {
278 let mut parts = s.split('.');
279
280 let db = parts.next();
281 let coll = parts.collect::<Vec<_>>().join(".");
282
283 match (db, coll) {
284 (Some(db), coll) if !coll.is_empty() => Ok(Self {
285 db: db.to_string(),
286 coll,
287 }),
288 _ => Err(Self::Err::invalid_argument(
289 "Missing one or more fields in namespace",
290 )),
291 }
292 }
293}