1use std::{borrow::Borrow, collections::HashMap, marker::PhantomData, ops::Deref, path::PathBuf, str::FromStr, sync::Arc};
2
3use bson::Document;
4use polodb_core::{options::UpdateOptions, results::{DeleteResult, InsertManyResult, InsertOneResult, UpdateResult}, CollectionT, IndexModel};
5use serde::{de::DeserializeOwned, Serialize};
6use tauri::{AppHandle, Manager, Runtime};
7use tokio::{fs::{File, OpenOptions}, sync::Mutex};
8
9use super::{state::{ContextDB, ContextFileHandle, ContextState, FileHandleMode, PluginState}, types::{PathInformation, PathMetadata}};
10
11pub struct Context<R: Runtime> {
12 handle: Arc<AppHandle<R>>,
13 name: String,
14 path: String,
15}
16
17impl<R: Runtime> Clone for Context<R> {
18 fn clone(&self) -> Self {
19 Self {
20 handle: self.handle.clone(),
21 name: self.name.clone(),
22 path: self.path.clone(),
23 }
24 }
25}
26
27impl<R: Runtime> Context<R> {
28 pub(crate) fn create(handle: AppHandle<R>, name: String, path: String) -> Self {
29 Self {
30 handle: Arc::new(handle),
31 name,
32 path,
33 }
34 }
35
36 pub fn name(&self) -> String {
37 self.name.clone()
38 }
39
40 pub fn path(&self) -> String {
41 self.path.clone()
42 }
43
44 pub(crate) fn handle(&self) -> AppHandle<R> {
45 self.handle.clone().deref().clone()
46 }
47
48 pub fn base_path(&self) -> PathBuf {
49 PathBuf::from_str(&self.path()).unwrap()
50 }
51
52 pub fn get_path(&self, path: impl AsRef<str>) -> crate::Result<PathBuf> {
53 let resolved = PathBuf::from_str(path.as_ref()).unwrap();
54 if resolved.is_absolute() {
55 return Err(crate::Error::no_absolute_path(path.as_ref()));
56 }
57 let joined = self
58 .base_path()
59 .canonicalize().or(Err(crate::Error::invalid_path(path.as_ref())))?
60 .join(&resolved);
61
62 if !joined.starts_with(self.base_path_canonicalized()?) {
63 return Err(crate::Error::path_escapes_context(path.as_ref()));
64 }
65
66 Ok(joined)
67 }
68
69 pub fn base_path_canonicalized(&self) -> crate::Result<PathBuf> {
70 PathBuf::from_str(&self.path()).unwrap().canonicalize().or_else(|_| Err(crate::Error::invalid_path(self.path())))
71 }
72
73 pub async fn create_directory(&self, path: impl AsRef<str>, parents: bool) -> crate::Result<()> {
74 let resolved = self.get_path(path)?;
75 let create_result = if parents {tokio::fs::create_dir_all(&resolved).await} else {tokio::fs::create_dir(&resolved).await};
76 if let Err(error) = create_result {
77 return Err(crate::Error::filesystem("CREATE_DIRECTORY", error.to_string()));
78 }
79
80 Ok(())
81 }
82
83 pub async fn remove_directory(&self, path: impl AsRef<str>) -> crate::Result<()> {
84 let resolved = self.get_path(path)?;
85 if !resolved.is_dir() {
86 return Err(crate::Error::filesystem("REMOVE_DIRECTORY", "Specified path is not a directory or does not exist."));
87 }
88 tokio::fs::remove_dir_all(resolved).await.or_else(|error| Err(crate::Error::filesystem("REMOVE_DIRECTORY", error.to_string())))?;
89 Ok(())
90 }
91
92 pub async fn remove_file(&self, path: impl AsRef<str>) -> crate::Result<()> {
93 let resolved = self.get_path(path)?;
94 if !resolved.is_file() {
95 return Err(crate::Error::filesystem("REMOVE_FILE", "Specified path is not a file or does not exist."));
96 }
97 tokio::fs::remove_file(resolved).await.or_else(|error| Err(crate::Error::filesystem("REMOVE_FILE", error.to_string())))?;
98 Ok(())
99 }
100
101 pub async fn file_metadata(&self, path: impl AsRef<str>) -> crate::Result<PathMetadata> {
102 let resolved = self.get_path(path)?;
103 match tokio::fs::metadata(resolved).await {
104 Ok(meta) => Ok(PathMetadata::from(meta)),
105 Err(e) => Err(crate::Error::filesystem("FILE_METADATA", e.to_string()))
106 }
107 }
108
109 pub async fn list_directory(&self, path: impl AsRef<str>) -> crate::Result<Vec<PathInformation>> {
110 let resolved = self.get_path(path)?;
111 if !resolved.is_dir() {
112 return Err(crate::Error::filesystem("LIST_DIRECTORY", "Specified path is not a directory or does not exist."));
113 }
114
115 match tokio::fs::read_dir(resolved).await {
116 Ok(mut results) => {
117 let mut infos: Vec<PathInformation> = Vec::new();
118 while let Ok(Some(info)) = results.next_entry().await {
119 infos.push(PathInformation::from(info));
120 }
121
122 Ok(infos)
123 },
124 Err(e) => Err(crate::Error::filesystem("LIST_DIRECTORY", e.to_string()))
125 }
126 }
127
128 pub(crate) async fn state(&self) -> ContextState {
129 self.handle()
130 .state::<PluginState>()
131 .lock()
132 .await
133 .get(&self.name())
134 .expect("Context not initialized.")
135 .clone()
136 }
137
138 pub(crate) async fn databases(&self) -> Arc<Mutex<HashMap<String, ContextDB>>> {
139 self.state().await.databases.clone()
140 }
141
142 pub(crate) async fn files(&self) -> Arc<Mutex<HashMap<bson::Uuid, ContextFileHandle>>> {
143 self.state().await.files.clone()
144 }
145
146 pub async fn open_database(
147 &self,
148 name: impl AsRef<str>,
149 path: impl AsRef<str>,
150 ) -> crate::Result<Database<R>> {
151 let _dbs = self.databases().await;
152 let mut dbs = _dbs.lock().await;
153 let resolved_path = self.get_path(path.as_ref())?;
154 if let Some(db) = dbs.get(&name.as_ref().to_string()) {
155 if db.path == path.as_ref().to_string() {
156 Ok(Database::<R>::create(
157 self.clone(),
158 name.as_ref().to_string(),
159 path.as_ref().to_string(),
160 ))
161 } else {
162 Err(crate::Error::open_database(
163 name.as_ref(),
164 self.name(),
165 path.as_ref(),
166 "Database is already open at another path.",
167 ))
168 }
169 } else {
170 if resolved_path.exists() {
171 if resolved_path.is_file() {
172 let database =
173 polodb_core::Database::open_path(resolved_path).or_else(|e| {
174 Err(crate::Error::open_database(
175 name.as_ref(),
176 self.name(),
177 path.as_ref(),
178 e.to_string(),
179 ))
180 })?;
181 let _ = dbs.insert(
182 name.as_ref().to_string(),
183 ContextDB {
184 name: name.as_ref().to_string(),
185 path: path.as_ref().to_string(),
186 database: Arc::new(Mutex::new(database)),
187 transactions: Arc::new(Mutex::new(HashMap::new())),
188 },
189 );
190 Ok(Database::<R>::create(
191 self.clone(),
192 name.as_ref().to_string(),
193 path.as_ref().to_string(),
194 ))
195 } else {
196 Err(crate::Error::open_database(
197 name.as_ref(),
198 self.name(),
199 path.as_ref(),
200 "Specified path is not a file.",
201 ))
202 }
203 } else {
204 let database = polodb_core::Database::open_path(resolved_path).or_else(|e| {
205 Err(crate::Error::open_database(
206 name.as_ref(),
207 self.name(),
208 path.as_ref(),
209 e.to_string(),
210 ))
211 })?;
212 let _ = dbs.insert(
213 name.as_ref().to_string(),
214 ContextDB {
215 name: name.as_ref().to_string(),
216 path: path.as_ref().to_string(),
217 database: Arc::new(Mutex::new(database)),
218 transactions: Arc::new(Mutex::new(HashMap::new())),
219 },
220 );
221 Ok(Database::<R>::create(
222 self.clone(),
223 name.as_ref().to_string(),
224 path.as_ref().to_string(),
225 ))
226 }
227 }
228 }
229
230 pub async fn database(&self, name: impl AsRef<str>) -> crate::Result<Database<R>> {
231 if let Some(db) = self
232 .databases()
233 .await
234 .lock()
235 .await
236 .get(&name.as_ref().to_string())
237 {
238 Ok(Database::<R>::create(
239 self.clone(),
240 name.as_ref().to_string(),
241 db.path.clone(),
242 ))
243 } else {
244 Err(crate::Error::unknown_database(name.as_ref()))
245 }
246 }
247
248 pub(crate) async fn close_database(&self, name: impl AsRef<str>) -> crate::Result<()> {
249 if let Some(_) = self
250 .databases()
251 .await
252 .lock()
253 .await
254 .remove(&name.as_ref().to_string())
255 {
256 Ok(())
257 } else {
258 Err(crate::Error::unknown_database(name.as_ref()))
259 }
260 }
261
262 pub async fn open_file_handle(
263 &self,
264 path: impl AsRef<str>,
265 mode: FileHandleMode,
266 ) -> crate::Result<FileHandle<R>> {
267 let resolved = self.get_path(path.as_ref())?;
268 if mode.create() && !resolved.exists() && resolved.clone().parent().is_some() {
269 tokio::fs::create_dir_all(resolved.clone().parent().unwrap())
270 .await
271 .or_else(|e| {
272 Err(crate::Error::open_file_handle(
273 path.as_ref(),
274 self.name(),
275 e.to_string(),
276 ))
277 })?;
278 }
279
280 let options: OpenOptions = mode.clone().into();
281 let file = options.open(resolved.clone()).await.or_else(|e| {
282 Err(crate::Error::open_file_handle(
283 path.as_ref(),
284 self.name(),
285 e.to_string(),
286 ))
287 })?;
288 let handle = ContextFileHandle {
289 id: bson::Uuid::new(),
290 path: path.as_ref().to_string(),
291 handle: async_dup::Arc::new(async_dup::Mutex::new(file)),
292 mode: mode.clone(),
293 };
294 let id = handle.id.clone();
295
296 let _files = self.files().await;
297 let mut files = _files.lock().await;
298 let _ = files.insert(id.clone(), handle);
299 Ok(FileHandle::<R>::create(
300 self.clone(),
301 id.clone(),
302 path.as_ref().to_string(),
303 ))
304 }
305
306 pub async fn file_handle(&self, id: bson::Uuid) -> crate::Result<FileHandle<R>> {
307 if let Some(handle) = self.files().await.lock().await.get(&id) {
308 Ok(FileHandle::<R>::create(
309 self.clone(),
310 id.clone(),
311 handle.path.clone(),
312 ))
313 } else {
314 Err(crate::Error::unknown_file_handle(id.to_string()))
315 }
316 }
317
318 pub(crate) async fn close_file_handle(&self, id: bson::Uuid) -> crate::Result<()> {
319 if let Some(_) = self.files().await.lock().await.remove(&id) {
320 Ok(())
321 } else {
322 Err(crate::Error::unknown_file_handle(id.to_string()))
323 }
324 }
325
326 pub(crate) async fn file_ids(&self) -> Vec<bson::Uuid> {
327 let files = self.files().await;
328 let handles = files.lock().await;
329 let mut result: Vec<bson::Uuid> = Vec::new();
330 for id in handles.keys() {
331 result.push(id.clone());
332 }
333
334 result
335 }
336
337 pub(crate) async fn db_ids(&self) -> Vec<String> {
338 let dbs = self.databases().await;
339 let bases = dbs.lock().await;
340 let mut result: Vec<String> = Vec::new();
341 for id in bases.keys() {
342 result.push(id.clone());
343 }
344
345 result
346 }
347
348 pub async fn close(self) -> crate::Result<()> {
349 for handle_id in self.file_ids().await {
350 self.close_file_handle(handle_id).await?;
351 }
352
353 for db_id in self.db_ids().await {
354 self.close_database(db_id).await?;
355 }
356
357 Ok(())
358 }
359}
360
361pub struct Database<R: Runtime> {
362 context: Context<R>,
363 name: String,
364 path: String,
365}
366
367impl<R: Runtime> Clone for Database<R> {
368 fn clone(&self) -> Self {
369 Self {
370 context: self.context.clone(),
371 name: self.name.clone(),
372 path: self.path.clone()
373 }
374 }
375}
376
377impl<R: Runtime> Database<R> {
378 pub(crate) fn create(context: Context<R>, name: String, path: String) -> Self {
379 Self {
380 context,
381 name,
382 path,
383 }
384 }
385
386 pub fn name(&self) -> String {
387 self.name.clone()
388 }
389
390 pub fn path(&self) -> String {
391 self.path.clone()
392 }
393
394 pub fn absolute_path(&self) -> crate::Result<PathBuf> {
395 self.context.get_path(self.path())
396 }
397
398 pub(crate) async fn db_context(&self) -> crate::Result<ContextDB> {
399 if let Some(db) = self.context.databases().await.lock().await.get(&self.name) {
400 Ok(db.clone())
401 } else {
402 Err(crate::Error::unknown_database(self.name()))
403 }
404 }
405
406 pub(crate) async fn db(&self) -> crate::Result<Arc<Mutex<polodb_core::Database>>> {
407 Ok(self.db_context().await?.database.clone())
408 }
409
410 pub async fn close(self) -> crate::Result<()> {
411 self.context.close_database(self.name()).await
412 }
413
414 pub async fn collections(&self) -> crate::Result<Vec<String>> {
415 let db = self.db().await?;
416 let database = db.lock().await;
417 Ok(database.list_collection_names().or_else(|e| Err(crate::Error::from(e)))?)
418 }
419
420 pub async fn collection<T: Serialize + DeserializeOwned + Send + Sync>(&self, name: impl AsRef<str>) -> Collection<T, R> {
421 Collection::<T, R>::create(self.clone(), name.as_ref().to_string(), None)
422 }
423
424 pub async fn start_transaction(&self) -> crate::Result<Transaction<R>> {
425 let context = self.db_context().await?;
426 let db = context.database.lock().await;
427 let mut transactions = context.transactions.lock().await;
428 let new_id = bson::Uuid::new();
429 transactions.insert(new_id.clone(), Arc::new(Mutex::new(db.start_transaction().or_else(|e| Err(crate::Error::from(e)))?)));
430 Ok(Transaction::<R>::create(self.clone(), new_id))
431 }
432
433 pub async fn get_transaction(&self, id: bson::Uuid) -> crate::Result<Transaction<R>> {
434 let context = self.db_context().await?;
435 let transactions = context.transactions.lock().await;
436 if let Some(_) = transactions.get(&id) {
437 Ok(Transaction::<R>::create(self.clone(), id.clone()))
438 } else {
439 Err(crate::Error::unknown_transaction(id.to_string()))
440 }
441 }
442
443 pub async fn commit_transaction(&self, id: bson::Uuid) -> crate::Result<()> {
444 if let Some(mutex) = self.db_context().await?.transactions.lock().await.remove(&id) {
445 let transaction = mutex.lock().await;
446 transaction.commit().or_else(|e| Err(crate::Error::from(e)))
447 } else {
448 Err(crate::Error::unknown_transaction(id.to_string()))
449 }
450 }
451
452 pub async fn rollback_transaction(&self, id: bson::Uuid) -> crate::Result<()> {
453 if let Some(mutex) = self.db_context().await?.transactions.lock().await.remove(&id) {
454 let transaction = mutex.lock().await;
455 transaction.rollback().or_else(|e| Err(crate::Error::from(e)))
456 } else {
457 Err(crate::Error::unknown_transaction(id.to_string()))
458 }
459 }
460}
461
462pub struct Transaction<R: Runtime> {
463 database: Database<R>,
464 id: bson::Uuid
465}
466
467impl<R: Runtime> Clone for Transaction<R> {
468 fn clone(&self) -> Self {
469 Self {
470 database: self.database.clone(),
471 id: self.id.clone()
472 }
473 }
474}
475
476impl<R: Runtime> Transaction<R> {
477 pub(crate) fn create(database: Database<R>, id: bson::Uuid) -> Self {
478 Self {
479 database, id
480 }
481 }
482
483 pub fn id(&self) -> bson::Uuid {
484 self.id.clone()
485 }
486
487 pub fn collection<T: Serialize + DeserializeOwned + Send + Sync>(&self, name: impl AsRef<str>) -> Collection<T, R> {
488 Collection::create(self.database.clone(), name.as_ref().to_string(), Some(self.id.clone()))
489 }
490
491 pub async fn commit(self) -> crate::Result<()> {
492 self.database.commit_transaction(self.id()).await
493 }
494
495 pub async fn rollback(self) -> crate::Result<()> {
496 self.database.rollback_transaction(self.id()).await
497 }
498}
499
500pub struct FileHandle<R: Runtime> {
501 context: Context<R>,
502 id: bson::Uuid,
503 path: String,
504}
505
506impl<R: Runtime> Clone for FileHandle<R> {
507 fn clone(&self) -> Self {
508 Self {
509 context: self.context.clone(),
510 id: self.id.clone(),
511 path: self.path.clone()
512 }
513 }
514}
515
516impl<R: Runtime> FileHandle<R> {
517 pub(crate) fn create(context: Context<R>, id: bson::Uuid, path: String) -> Self {
518 Self { context, id, path }
519 }
520
521 pub fn id(&self) -> bson::Uuid {
522 self.id.clone()
523 }
524
525 pub fn path(&self) -> String {
526 self.path.clone()
527 }
528
529 pub fn absolute_path(&self) -> crate::Result<PathBuf> {
530 self.context.get_path(self.path())
531 }
532
533 pub async fn close(self) -> crate::Result<()> {
534 self.context.close_file_handle(self.id()).await
535 }
536
537 async fn metadata(&self) -> ContextFileHandle {
538 self.context.files().await.lock().await.get(&self.id()).expect("File handle has been closed.").clone()
539 }
540
541 pub async fn mode(&self) -> FileHandleMode {
542 self.metadata().await.mode
543 }
544
545 pub async fn handle(&self) -> async_dup::Arc<async_dup::Mutex<File>> {
546 self.metadata().await.handle.clone()
547 }
548}
549
550pub(crate) enum CollectionType {
551 Standalone(polodb_core::Collection<Document>),
552 Transaction(polodb_core::TransactionalCollection<Document>)
553}
554
555impl CollectionT<Document> for CollectionType {
556 fn name(&self) -> &str {
557 match self {
558 Self::Standalone(c) => c.name(),
559 Self::Transaction(c) => c.name()
560 }
561 }
562
563 fn count_documents(&self) -> polodb_core::Result<u64> {
564 match self {
565 Self::Standalone(c) => c.count_documents(),
566 Self::Transaction(c) => c.count_documents()
567 }
568 }
569
570 fn update_one(&self, query: Document, update: Document) -> polodb_core::Result<polodb_core::results::UpdateResult> {
571 match self {
572 Self::Standalone(c) => c.update_one(query, update),
573 Self::Transaction(c) => c.update_one(query, update)
574 }
575 }
576
577 fn update_one_with_options(&self, query: Document, update: Document, options: polodb_core::options::UpdateOptions) -> polodb_core::Result<polodb_core::results::UpdateResult> {
578 match self {
579 Self::Standalone(c) => c.update_one_with_options(query, update, options),
580 Self::Transaction(c) => c.update_one_with_options(query, update, options)
581 }
582 }
583
584 fn update_many(&self, query: Document, update: Document) -> polodb_core::Result<polodb_core::results::UpdateResult> {
585 match self {
586 Self::Standalone(c) => c.update_many(query, update),
587 Self::Transaction(c) => c.update_many(query, update)
588 }
589 }
590
591 fn update_many_with_options(&self, query: Document, update: Document, options: polodb_core::options::UpdateOptions) -> polodb_core::Result<polodb_core::results::UpdateResult> {
592 match self {
593 Self::Standalone(c) => c.update_many_with_options(query, update, options),
594 Self::Transaction(c) => c.update_many_with_options(query, update, options)
595 }
596 }
597
598 fn delete_one(&self, query: Document) -> polodb_core::Result<polodb_core::results::DeleteResult> {
599 match self {
600 Self::Standalone(c) => c.delete_one(query),
601 Self::Transaction(c) => c.delete_one(query)
602 }
603 }
604
605 fn delete_many(&self, query: Document) -> polodb_core::Result<polodb_core::results::DeleteResult> {
606 match self {
607 Self::Standalone(c) => c.delete_many(query),
608 Self::Transaction(c) => c.delete_many(query)
609 }
610 }
611
612 fn create_index(&self, index: polodb_core::IndexModel) -> polodb_core::Result<()> {
613 match self {
614 Self::Standalone(c) => c.create_index(index),
615 Self::Transaction(c) => c.create_index(index)
616 }
617 }
618
619 fn drop_index(&self, name: impl AsRef<str>) -> polodb_core::Result<()> {
620 match self {
621 Self::Standalone(c) => c.drop_index(name),
622 Self::Transaction(c) => c.drop_index(name)
623 }
624 }
625
626 fn drop(&self) -> polodb_core::Result<()> {
627 match self {
628 Self::Standalone(c) => c.drop(),
629 Self::Transaction(c) => c.drop()
630 }
631 }
632
633 fn insert_one(&self, doc: impl std::borrow::Borrow<Document>) -> polodb_core::Result<polodb_core::results::InsertOneResult>
634 where Document: Serialize {
635 match self {
636 Self::Standalone(c) => c.insert_one(doc),
637 Self::Transaction(c) => c.insert_one(doc)
638 }
639 }
640
641 fn insert_many(&self, docs: impl IntoIterator<Item = impl std::borrow::Borrow<Document>>) -> polodb_core::Result<polodb_core::results::InsertManyResult>
642 where Document: Serialize {
643 match self {
644 Self::Standalone(c) => c.insert_many(docs),
645 Self::Transaction(c) => c.insert_many(docs)
646 }
647 }
648
649 fn find(&self, filter: Document) -> polodb_core::action::Find<'_, '_, Document>
650 where Document: DeserializeOwned + Send + Sync {
651 match self {
652 Self::Standalone(c) => c.find(filter),
653 Self::Transaction(c) => c.find(filter)
654 }
655 }
656
657 fn find_one(&self, filter: Document) -> polodb_core::Result<Option<Document>>
658 where Document: DeserializeOwned + Send + Sync {
659 match self {
660 Self::Standalone(c) => c.find_one(filter),
661 Self::Transaction(c) => c.find_one(filter)
662 }
663 }
664
665 fn aggregate(&self, pipeline: impl IntoIterator<Item = Document>) -> polodb_core::action::Aggregate<'_, '_> {
666 match self {
667 Self::Standalone(c) => c.aggregate(pipeline),
668 Self::Transaction(c) => c.aggregate(pipeline)
669 }
670 }
671}
672
673pub struct Collection<T: Serialize + DeserializeOwned + Send + Sync, R: Runtime> {
674 database: Database<R>,
675 name: String,
676 transaction_id: Option<bson::Uuid>,
677 _doctype: PhantomData<T>
678}
679
680impl<T: Serialize + DeserializeOwned + Send + Sync, R: Runtime> Clone for Collection<T, R> {
681 fn clone(&self) -> Self {
682 Self {
683 database: self.database.clone(),
684 name: self.name.clone(),
685 transaction_id: self.transaction_id.clone(),
686 _doctype: PhantomData
687 }
688 }
689}
690
691impl<T: Serialize + DeserializeOwned + Send + Sync, R: Runtime> Collection<T, R> {
692 pub(crate) fn create(db: Database<R>, name: String, transaction_id: Option<bson::Uuid>) -> Self {
693 Self {
694 database: db.clone(),
695 name,
696 transaction_id,
697 _doctype: PhantomData
698 }
699 }
700
701 pub fn name(&self) -> String {
702 self.name.clone()
703 }
704
705 pub(crate) async fn collection(&self) -> crate::Result<CollectionType> {
706 let db = self.database.db().await?;
707 if let Some(id) = self.transaction_id {
708 let dbcon = self.database.db_context().await?;
709 let transactions = dbcon.transactions.lock().await;
710 if let Some(transaction) = transactions.get(&id) {
711 Ok(CollectionType::Transaction(transaction.lock().await.collection::<Document>(&self.name())))
712 } else {
713 Err(crate::Error::unknown_transaction(id.to_string()))
714 }
715 } else {
716 Ok(CollectionType::Standalone(db.lock().await.collection::<Document>(&self.name())))
717 }
718 }
719
720 pub async fn count_documents(&self) -> crate::Result<u64> {
721 self.collection().await?.count_documents().or_else(|e| Err(crate::Error::from(e)))
722 }
723
724 pub async fn update_one(&self, query: Document, update: Document) -> crate::Result<UpdateResult> {
725 self.collection().await?.update_one(query, update).or_else(|e| Err(crate::Error::from(e)))
726 }
727
728 pub async fn update_one_with_options(
729 &self,
730 query: Document,
731 update: Document,
732 options: UpdateOptions,
733 ) -> crate::Result<UpdateResult> {
734 self.collection().await?.update_one_with_options(query, update, options).or_else(|e| Err(crate::Error::from(e)))
735 }
736
737 pub async fn update_many(&self, query: Document, update: Document) -> crate::Result<UpdateResult> {
738 self.collection().await?.update_many(query, update).or_else(|e| Err(crate::Error::from(e)))
739 }
740
741 pub async fn update_many_with_options(
742 &self,
743 query: Document,
744 update: Document,
745 options: UpdateOptions,
746 ) -> crate::Result<UpdateResult> {
747 self.collection().await?.update_many_with_options(query, update, options).or_else(|e| Err(crate::Error::from(e)))
748 }
749
750 pub async fn delete_one(&self, query: Document) -> crate::Result<DeleteResult> {
751 self.collection().await?.delete_one(query).or_else(|e| Err(crate::Error::from(e)))
752 }
753
754 pub async fn delete_many(&self, query: Document) -> crate::Result<DeleteResult> {
755 self.collection().await?.delete_many(query).or_else(|e| Err(crate::Error::from(e)))
756 }
757
758 pub async fn create_index(&self, index: IndexModel) -> crate::Result<()> {
759 self.collection().await?.create_index(index).or_else(|e| Err(crate::Error::from(e)))
760 }
761
762 pub async fn drop_index(&self, name: impl AsRef<str>) -> crate::Result<()> {
763 self.collection().await?.drop_index(name).or_else(|e| Err(crate::Error::from(e)))
764 }
765
766 pub async fn drop(&self) -> crate::Result<()> {
767 self.collection().await?.drop().or_else(|e| Err(crate::Error::from(e)))
768 }
769
770 pub async fn insert_one(&self, doc: impl Borrow<T>) -> crate::Result<InsertOneResult> {
771 self.collection().await?.insert_one(bson::to_document(doc.borrow()).or_else(|e| Err(crate::Error::from(e)))?).or_else(|e| Err(crate::Error::from(e)))
772 }
773
774 pub async fn insert_many(
775 &self,
776 docs: impl IntoIterator<Item = impl Borrow<T>>,
777 ) -> crate::Result<InsertManyResult> {
778 let mut serialized: Vec<Document> = Vec::new();
779 for doc in docs {
780 serialized.push(bson::to_document(doc.borrow()).or_else(|e| Err(crate::Error::from(e)))?);
781 }
782
783 self.collection().await?.insert_many(serialized).or_else(|e| Err(crate::Error::from(e)))
784 }
785
786 pub async fn find(&self, filter: Document, skip: Option<u64>, limit: Option<u64>, sort: Option<Document>) -> crate::Result<Vec<T>> {
787 let mut results: Vec<T> = Vec::new();
788 let collection = self.collection().await?;
789 let mut find = collection.find(filter);
790 if let Some(_skip) = skip {
791 find = find.skip(_skip);
792 }
793
794 if let Some(_limit) = limit {
795 find = find.limit(_limit);
796 }
797
798 if let Some(_sort) = sort {
799 find = find.sort(_sort);
800 }
801
802 let docs: Vec<Result<Document, polodb_core::Error>> = find.run().or_else(|e| Err(crate::Error::from(e)))?.collect();
803 for dresult in docs {
804 results.push(match dresult {
805 Ok(doc) => bson::from_document::<T>(doc).or_else(|e| Err(crate::Error::from(e))),
806 Err(e) => Err(crate::Error::from(e))
807 }?);
808 }
809
810 Ok(results)
811 }
812
813 pub async fn find_one(&self, filter: Document) -> crate::Result<Option<T>> {
814 let raw = self.collection().await?.find_one(filter).or_else(|e| Err(crate::Error::from(e)))?;
815 if let Some(doc) = raw {
816 Ok(Some(bson::from_document::<T>(doc).or_else(|e| Err(crate::Error::from(e)))?))
817 } else {
818 Ok(None)
819 }
820 }
821}