1use serde::Serialize;
16use bson::Document;
17use std::borrow::Borrow;
18use std::sync::Weak;
19use serde::de::DeserializeOwned;
20use crate::options::UpdateOptions;
21use crate::{Error, IndexModel, Result};
22use crate::db::db_inner::DatabaseInner;
23use crate::action::{Aggregate, Find};
24use crate::results::{DeleteResult, InsertManyResult, InsertOneResult, UpdateResult};
25
26macro_rules! try_multiple {
27 ($err: expr, $action: expr) => {
28 match $action {
29 Ok(ret) => ret,
30 Err(expr_err) => {
31 return Err($err.add(expr_err))
32 },
33 }
34 }
35}
36
37macro_rules! try_db_op {
38 ($txn: expr, $action: expr) => {
39 match $action {
40 Ok(ret) => {
41 $txn.commit()?;
42 ret
43 }
44
45 Err(err) => {
46 try_multiple!(err, $txn.rollback());
47 return Err(err);
48 }
49 }
50 }
51}
52
53pub trait CollectionT<T> {
54 fn name(&self) -> &str;
55 fn count_documents(&self) -> Result<u64>;
57
58 fn update_one(&self, query: Document, update: Document) -> Result<UpdateResult>;
61
62 fn update_one_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result<UpdateResult>;
63
64 fn update_many(&self, query: Document, update: Document) -> Result<UpdateResult>;
67
68 fn update_many_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result<UpdateResult>;
69
70 fn delete_one(&self, query: Document) -> Result<DeleteResult>;
72
73 fn delete_many(&self, query: Document) -> Result<DeleteResult>;
77 fn create_index(&self, index: IndexModel) -> Result<()>;
78
79 fn drop_index(&self, name: impl AsRef<str>) -> Result<()>;
81 fn drop(&self) -> Result<()>;
82
83 fn insert_one(&self, doc: impl Borrow<T>) -> Result<InsertOneResult>
85 where T: Serialize;
86
87 fn insert_many(&self, docs: impl IntoIterator<Item = impl Borrow<T>>) -> Result<InsertManyResult>
89 where T: Serialize;
90
91 fn find(&self, filter: Document) -> Find<'_, '_, T>
94 where T: DeserializeOwned + Send + Sync;
95
96 fn find_one(&self, filter: Document) -> Result<Option<T>>
98 where T: DeserializeOwned + Send + Sync;
99
100 fn aggregate(&self, pipeline: impl IntoIterator<Item = Document>) -> Aggregate<'_, '_>;
102}
103
104
105pub struct Collection<T> {
111 db: Weak<DatabaseInner>,
112 name: String,
113 _phantom: std::marker::PhantomData<T>,
114}
115
116impl<T> Collection<T>
117{
118 pub(crate) fn new(db: Weak<DatabaseInner>, name: &str) -> Collection<T> {
119 Collection {
120 db,
121 name: name.into(),
122 _phantom: std::default::Default::default(),
123 }
124 }
125}
126
127impl<T> CollectionT<T> for Collection<T> {
128
129 fn name(&self) -> &str {
130 &self.name
131 }
132
133 fn count_documents(&self) -> Result<u64> {
134 let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
135 let txn = db.start_transaction()?;
136 let count = db.count_documents(&self.name, &txn)?;
137 Ok(count)
138 }
139
140 fn update_one(&self, query: Document, update: Document) -> Result<UpdateResult> {
141 let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
142 let txn = db.start_transaction()?;
143 let result = try_db_op!(txn, db.update_one(
144 &self.name,
145 query,
146 update,
147 UpdateOptions::default(),
148 &txn,
149 ));
150 Ok(result)
151 }
152
153 fn update_one_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result<UpdateResult> {
154 let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
155 let txn = db.start_transaction()?;
156 let result = try_db_op!(txn, db.update_one(
157 &self.name,
158 query,
159 update,
160 options,
161 &txn,
162 ));
163 Ok(result)
164 }
165
166 fn update_many(&self, query: Document, update: Document) -> Result<UpdateResult> {
167 let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
168 let txn = db.start_transaction()?;
169 let result = try_db_op!(txn, db.update_many(
170 &self.name,
171 query,
172 update,
173 UpdateOptions::default(),
174 &txn,
175 ));
176 Ok(result)
177 }
178
179 fn update_many_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result<UpdateResult> {
180 let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
181 let txn = db.start_transaction()?;
182 let result = try_db_op!(txn, db.update_many(
183 &self.name,
184 query,
185 update,
186 options,
187 &txn,
188 ));
189 Ok(result)
190 }
191
192 fn delete_one(&self, query: Document) -> Result<DeleteResult> {
193 let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
194 let txn = db.start_transaction()?;
195 let result = try_db_op!(txn, db.delete_one(&self.name, query, &txn));
196 Ok(result)
197 }
198
199 fn delete_many(&self, query: Document) -> Result<DeleteResult> {
200 let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
201 let txn = db.start_transaction()?;
202 let result = try_db_op!(txn, db.delete_many(&self.name, query, &txn));
203 Ok(result)
204 }
205
206 fn create_index(&self, index: IndexModel) -> Result<()> {
207 let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
208 let txn = db.start_transaction()?;
209 try_db_op!(txn, db.create_index(&self.name, index, &txn));
210 Ok(())
211 }
212
213 fn drop_index(&self, name: impl AsRef<str>) -> Result<()> {
214 let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
215 let txn = db.start_transaction()?;
216 try_db_op!(txn, db.drop_index(&self.name, name.as_ref(), &txn));
217 Ok(())
218 }
219
220 fn drop(&self) -> Result<()> {
221 let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
222 let txn = db.start_transaction()?;
223 try_db_op!(txn, db.drop_collection(&self.name, &txn));
224 Ok(())
225 }
226
227 fn insert_one(&self, doc: impl Borrow<T>) -> Result<InsertOneResult>
228 where T: Serialize {
229 let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
230 let txn = db.start_transaction()?;
231 let result = try_db_op!(txn, db.insert_one(
232 &self.name,
233 bson::to_document(doc.borrow())?,
234 &txn,
235 ));
236 Ok(result)
237 }
238
239 fn insert_many(&self, docs: impl IntoIterator<Item = impl Borrow<T>>) -> Result<InsertManyResult>
240 where T: Serialize {
241 let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
242 let txn = db.start_transaction()?;
243 let result = try_db_op!(txn, db.insert_many(&self.name, docs, &txn));
244 Ok(result)
245 }
246
247 fn find(&self, filter: Document) -> Find<T>
248 where T: DeserializeOwned + Send + Sync {
249 Find::new(self.db.clone(), &self.name, None, filter)
250 }
251
252 fn find_one(&self, filter: Document) -> Result<Option<T>>
253 where T: DeserializeOwned + Send + Sync {
254 let mut cursor = self.find(filter).run()?;
255 let test = cursor.advance()?;
256 if !test {
257 return Ok(None);
258 }
259 Ok(Some(cursor.deserialize_current()?))
260 }
261
262 fn aggregate(&self, pipeline: impl IntoIterator<Item = Document>) -> Aggregate<'_, '_> {
263 Aggregate::new(
264 self.db.clone(),
265 &self.name,
266 pipeline.into_iter().collect(),
267 None,
268 )
269 }
270}