polodb_core/coll/
collection.rs

1// Copyright 2024 Vincent Chan
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//	http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use serde::Serialize;
16use bson::Document;
17use std::borrow::Borrow;
18use std::sync::Weak;
19use serde::de::DeserializeOwned;
20use crate::options::UpdateOptions;
21use crate::{Error, IndexModel, Result};
22use crate::db::db_inner::DatabaseInner;
23use crate::action::{Aggregate, Find};
24use crate::results::{DeleteResult, InsertManyResult, InsertOneResult, UpdateResult};
25
26macro_rules! try_multiple {
27    ($err: expr, $action: expr) => {
28        match $action {
29            Ok(ret) => ret,
30            Err(expr_err) => {
31                return Err($err.add(expr_err))
32            },
33        }
34    }
35}
36
37macro_rules! try_db_op {
38    ($txn: expr, $action: expr) => {
39        match $action {
40            Ok(ret) => {
41                $txn.commit()?;
42                ret
43            }
44
45            Err(err) => {
46                try_multiple!(err, $txn.rollback());
47                return Err(err);
48            }
49        }
50    }
51}
52
53pub trait CollectionT<T> {
54    fn name(&self) -> &str;
55    /// Return the size of all data in the collection.
56    fn count_documents(&self) -> Result<u64>;
57
58    /// Updates up to one document matching `query` in the collection.
59    /// [documentation](https://www.polodb.org/docs/curd/update) for more information on specifying updates.
60    fn update_one(&self, query: Document, update: Document) -> Result<UpdateResult>;
61
62    fn update_one_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result<UpdateResult>;
63
64    /// Updates all documents matching `query` in the collection.
65    /// [documentation](https://www.polodb.org/docs/curd/update) for more information on specifying updates.
66    fn update_many(&self, query: Document, update: Document) -> Result<UpdateResult>;
67
68    fn update_many_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result<UpdateResult>;
69
70    /// Deletes up to one document found matching `query`.
71    fn delete_one(&self, query: Document) -> Result<DeleteResult>;
72
73    /// When query is `None`, all the data in the collection will be deleted.
74    ///
75    /// The size of data deleted returns.
76    fn delete_many(&self, query: Document) -> Result<DeleteResult>;
77    fn create_index(&self, index: IndexModel) -> Result<()>;
78
79    /// Drops the index specified by `name` from this collection.
80    fn drop_index(&self, name: impl AsRef<str>) -> Result<()>;
81    fn drop(&self) -> Result<()>;
82
83    /// Inserts `doc` into the collection.
84    fn insert_one(&self, doc: impl Borrow<T>) -> Result<InsertOneResult>
85    where T: Serialize;
86
87    /// Inserts the data in `docs` into the collection.
88    fn insert_many(&self, docs: impl IntoIterator<Item = impl Borrow<T>>) -> Result<InsertManyResult>
89    where T: Serialize;
90
91    /// When query document is passed to the function. The result satisfies
92    /// the query document.
93    fn find(&self, filter: Document) -> Find<'_, '_, T>
94    where T: DeserializeOwned + Send + Sync;
95
96    /// Finds a single document in the collection matching `filter`.
97    fn find_one(&self, filter: Document) -> Result<Option<T>>
98    where T: DeserializeOwned + Send + Sync;
99
100    /// Runs an aggregation operation.
101    fn aggregate(&self, pipeline: impl IntoIterator<Item = Document>) -> Aggregate<'_, '_>;
102}
103
104
105/// A wrapper of collection in struct.
106///
107/// All CURD methods can be done through this structure.
108///
109/// It can be used to perform collection-level operations such as CRUD operations.
110pub struct Collection<T> {
111    db: Weak<DatabaseInner>,
112    name: String,
113    _phantom: std::marker::PhantomData<T>,
114}
115
116impl<T>  Collection<T>
117{
118    pub(crate) fn new(db: Weak<DatabaseInner>, name: &str) -> Collection<T> {
119        Collection {
120            db,
121            name: name.into(),
122            _phantom: std::default::Default::default(),
123        }
124    }
125}
126
127impl<T> CollectionT<T> for Collection<T> {
128
129    fn name(&self) -> &str {
130        &self.name
131    }
132
133    fn count_documents(&self) -> Result<u64> {
134        let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
135        let txn = db.start_transaction()?;
136        let count = db.count_documents(&self.name, &txn)?;
137        Ok(count)
138    }
139
140    fn update_one(&self, query: Document, update: Document) -> Result<UpdateResult> {
141        let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
142        let txn = db.start_transaction()?;
143        let result = try_db_op!(txn, db.update_one(
144            &self.name,
145            query,
146            update,
147            UpdateOptions::default(),
148            &txn,
149        ));
150        Ok(result)
151    }
152
153    fn update_one_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result<UpdateResult> {
154        let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
155        let txn = db.start_transaction()?;
156        let result = try_db_op!(txn, db.update_one(
157            &self.name,
158            query,
159            update,
160            options,
161            &txn,
162        ));
163        Ok(result)
164    }
165
166    fn update_many(&self, query: Document, update: Document) -> Result<UpdateResult> {
167        let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
168        let txn = db.start_transaction()?;
169        let result = try_db_op!(txn, db.update_many(
170            &self.name,
171            query,
172            update,
173            UpdateOptions::default(),
174            &txn,
175        ));
176        Ok(result)
177    }
178
179    fn update_many_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result<UpdateResult> {
180        let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
181        let txn = db.start_transaction()?;
182        let result = try_db_op!(txn, db.update_many(
183            &self.name,
184            query,
185            update,
186            options,
187            &txn,
188        ));
189        Ok(result)
190    }
191
192    fn delete_one(&self, query: Document) -> Result<DeleteResult> {
193        let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
194        let txn = db.start_transaction()?;
195        let result = try_db_op!(txn, db.delete_one(&self.name, query, &txn));
196        Ok(result)
197    }
198
199    fn delete_many(&self, query: Document) -> Result<DeleteResult> {
200        let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
201        let txn = db.start_transaction()?;
202        let result = try_db_op!(txn, db.delete_many(&self.name, query, &txn));
203        Ok(result)
204    }
205
206    fn create_index(&self, index: IndexModel) -> Result<()> {
207        let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
208        let txn = db.start_transaction()?;
209        try_db_op!(txn, db.create_index(&self.name, index, &txn));
210        Ok(())
211    }
212
213    fn drop_index(&self, name: impl AsRef<str>) -> Result<()> {
214        let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
215        let txn = db.start_transaction()?;
216        try_db_op!(txn, db.drop_index(&self.name, name.as_ref(), &txn));
217        Ok(())
218    }
219
220    fn drop(&self) -> Result<()> {
221        let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
222        let txn = db.start_transaction()?;
223        try_db_op!(txn, db.drop_collection(&self.name, &txn));
224        Ok(())
225    }
226
227    fn insert_one(&self, doc: impl Borrow<T>) -> Result<InsertOneResult>
228    where T: Serialize {
229        let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
230        let txn = db.start_transaction()?;
231        let result = try_db_op!(txn, db.insert_one(
232            &self.name,
233            bson::to_document(doc.borrow())?,
234            &txn,
235        ));
236        Ok(result)
237    }
238
239    fn insert_many(&self, docs: impl IntoIterator<Item = impl Borrow<T>>) -> Result<InsertManyResult>
240    where T: Serialize {
241        let db = self.db.upgrade().ok_or(Error::DbIsClosed)?;
242        let txn = db.start_transaction()?;
243        let result = try_db_op!(txn, db.insert_many(&self.name, docs, &txn));
244        Ok(result)
245    }
246
247    fn find(&self, filter: Document) -> Find<T>
248    where T: DeserializeOwned + Send + Sync {
249        Find::new(self.db.clone(), &self.name, None, filter)
250    }
251
252    fn find_one(&self, filter: Document) -> Result<Option<T>>
253    where T: DeserializeOwned + Send + Sync {
254        let mut cursor = self.find(filter).run()?;
255        let test = cursor.advance()?;
256        if !test {
257            return Ok(None);
258        }
259        Ok(Some(cursor.deserialize_current()?))
260    }
261
262    fn aggregate(&self, pipeline: impl IntoIterator<Item = Document>) -> Aggregate<'_, '_> {
263        Aggregate::new(
264            self.db.clone(),
265            &self.name,
266            pipeline.into_iter().collect(),
267            None,
268        )
269    }
270}