1#![allow(dead_code)]
2
3pub mod observer;
4pub mod util;
5
6use crate::futures::StreamExt;
7use crate::macros::{error, trace};
8use crate::model::observer::Observer;
9use crate::model::util::ModelTimestamps;
10use crate::Spark;
11use mongodb::bson::{doc, to_document, Document};
12use mongodb::error::Result;
13use mongodb::options::{
14 DeleteOptions, DropIndexOptions, FindOneOptions, FindOptions, InsertOneOptions,
15 ListIndexesOptions, UpdateOptions,
16};
17use mongodb::results::UpdateResult;
18use mongodb::{Collection, Cursor, Database, IndexModel};
19use serde::de::DeserializeOwned;
20use serde::Serialize;
21use std::fmt::Debug;
22use std::ops::{Deref, DerefMut};
23use std::sync::Arc;
24use std::time::Duration;
25
26type Id = mongodb::bson::Bson;
28pub type MongodbResult<T> = Result<T>;
29
30#[derive(Debug, Serialize)]
31pub struct Model<'a, M> {
32 inner: Box<M>,
33 #[serde(skip)]
34 db: Arc<Database>,
35 #[serde(skip)]
36 collection_name: &'a str,
37 #[serde(skip)]
38 collection: Collection<M>,
39}
40
41impl<'a, T: 'a> Deref for Model<'a, T> {
42 type Target = T;
43
44 fn deref(&self) -> &Self::Target {
45 &self.inner
46 }
47}
48
49impl<'a, T: 'a> DerefMut for Model<'a, T> {
50 fn deref_mut(&mut self) -> &mut Self::Target {
51 &mut self.inner
52 }
53}
54
55impl<'a, M> Model<'a, M>
56where
57 M: Default,
58 M: Serialize,
59 M: DeserializeOwned,
60 M: Send,
61 M: Sync,
62 M: Unpin,
63 M: Debug,
64 M: ModelTimestamps,
65 M: Observer<M>,
66{
67 pub fn new(db: Option<&Arc<Database>>, collection_name: &'a str) -> Model<'a, M> {
87 if let Some(database) = db {
88 let collection = database.collection::<M>(collection_name);
89 return Model {
90 inner: Box::<M>::default(),
91 db: database.clone(),
92 collection_name,
93 collection,
94 };
95 }
96 let database = Spark::get_db();
98 let collection = database.collection::<M>(collection_name);
99 Model {
100 inner: Box::<M>::default(),
101 db: database,
102 collection_name,
103 collection,
104 }
105 }
106
107 pub async fn save(
110 &mut self,
111 options: impl Into<Option<InsertOneOptions>>,
112 ) -> MongodbResult<Id> {
113 self.inner.updated_at();
114 let mut converted = to_document(&self.inner)?;
115 if let Some(id) = converted.get("_id") {
116 let owned_id = id.to_owned();
117 let upsert = self
118 .collection
119 .update_one(
120 doc! {
121 "_id" : id
122 },
123 doc! { "$set": &converted},
124 None,
125 )
126 .await?;
127 if upsert.modified_count >= 1 {
128 Box::pin(M::updated(self)).await?;
131
132 return Ok(owned_id);
133 };
134 }
135 converted.remove("_id");
136 self.inner.created_at();
137
138 let re = self.collection.insert_one(&*self.inner, options).await?;
139
140 Box::pin(M::created(self)).await?;
143
144 Ok(re.inserted_id)
145 }
146 pub async fn find_one(
147 &mut self,
148 doc: impl Into<Document>,
149 options: impl Into<Option<FindOneOptions>>,
150 ) -> MongodbResult<Option<&mut Self>> {
151 let result = self.collection.find_one(Some(doc.into()), options).await?;
152 match result {
153 Some(inner) => {
154 self.fill(inner);
155 Ok(Some(self))
156 }
157 None => Ok(None),
158 }
159 }
160
161 pub async fn update(
209 &self,
210 query: impl Into<Document>,
211 doc: impl Into<Document>,
212 options: impl Into<Option<UpdateOptions>>,
213 ) -> MongodbResult<UpdateResult> {
214 self.collection
215 .update_one(query.into(), doc.into(), options)
216 .await
217 }
218
219 pub async fn find(
220 &self,
221 filter: impl Into<Document>,
222 options: impl Into<Option<FindOptions>>,
223 ) -> MongodbResult<Cursor<M>> {
224 self.collection.find(Some(filter.into()), options).await
225 }
226
227 pub async fn find_and_collect(
228 &self,
229 filter: impl Into<Document>,
230 options: impl Into<Option<FindOptions>>,
231 ) -> MongodbResult<Vec<MongodbResult<M>>> {
232 let converted = filter.into();
234 let doc = if converted.is_empty() {
235 None
236 } else {
237 Some(converted)
238 };
239
240 let future = self.collection.find(doc, options).await?;
241 Ok(future.collect().await)
242 }
243
244 pub fn register_attributes(&self, attributes: Vec<&str>) {
245 let mut attrs = attributes
246 .iter()
247 .map(|attr| attr.to_string())
248 .collect::<Vec<String>>();
249 let max_time_to_drop = Some(Duration::from_secs(5));
250 let (tx, _) = tokio::sync::oneshot::channel();
251 let db = self.db.clone();
252 let coll_name = self.collection_name.to_owned();
253 trace!("Spawn task to register indexes");
254 let register_attrs = async move {
255 let coll = db.collection::<M>(&coll_name);
256 let previous_indexes = coll
257 .list_indexes(Some(
258 ListIndexesOptions::builder()
259 .max_time(max_time_to_drop)
260 .build(),
261 ))
262 .await;
263
264 let mut keys_to_remove = Vec::new();
265
266 if previous_indexes.is_ok() {
267 let foreach_future = previous_indexes.unwrap().for_each(|pr| {
268 match pr {
269 Ok(index_model) => {
270 index_model.keys.iter().for_each(|key| {
271 if key.0 != "_id" {
272 if let Some(pos) = attrs.iter().position(|k| k == key.0) {
273 attrs.remove(pos);
275 } else if let Some(rw) = &index_model.options {
276 keys_to_remove.push(rw.name.clone())
278 }
279 }
280 });
281 }
282 Err(error) => {
283 error!("Can't unpack index model {error}");
284 }
285 }
286 futures::future::ready(())
287 });
288 foreach_future.await;
289 }
290 let attrs = attrs
291 .iter()
292 .map(|attr| {
293 let key = attr.to_string();
294 IndexModel::builder()
295 .keys(doc! {
296 key : 1
297 })
298 .build()
299 })
300 .collect::<Vec<IndexModel>>();
301
302 for name in keys_to_remove {
303 let key = name.as_ref().unwrap();
304 let _ = coll
305 .drop_index(
306 key,
307 Some(
308 DropIndexOptions::builder()
309 .max_time(max_time_to_drop)
310 .build(),
311 ),
312 )
313 .await;
314 }
315 if !attrs.is_empty() {
316 let result = coll.create_indexes(attrs, None).await;
317 if let Err(error) = result {
318 error!("Can't create indexes : {:?}", error);
319 }
320 }
321 };
322
323 let task = tokio::spawn(register_attrs);
324
325 let wait_for_complete = async move {
326 let _ = task.await;
327 let _ = tx.send(());
328 };
329
330 tokio::task::spawn(wait_for_complete);
331 }
332
333 pub async fn delete(
334 &mut self,
335 query: impl Into<Document>,
336 options: impl Into<Option<DeleteOptions>>,
337 ) -> MongodbResult<u64> {
338 let re = self
339 .collection
340 .delete_one(query.into(), options)
341 .await?
342 .deleted_count;
343
344 M::deleted(self).await?;
347
348 Ok(re)
349 }
350
351 pub fn fill(&mut self, inner: M) {
352 *self.inner = inner;
353 }
354}
355
356impl<'a, M> Model<'a, M>
357where
358 M: Default,
359 M: Serialize,
360{
361 pub fn take_inner(&mut self) -> M {
364 std::mem::take(&mut *self.inner)
365 }
366
367 pub fn inner_ref(&self) -> &M {
368 self.inner.as_ref()
369 }
370
371 pub fn inner_mut(&mut self) -> &mut M {
372 self.inner.as_mut()
373 }
374
375 pub fn inner_to_doc(&self) -> MongodbResult<Document> {
376 let re = to_document(&self.inner)?;
377 Ok(re)
378 }
379}
380
381impl<'a, M> From<Model<'a, M>> for Document
384where
385 M: Serialize,
386{
387 fn from(value: Model<M>) -> Self {
388 mongodb::bson::to_document(&value.inner).unwrap()
389 }
390}
391
392impl<'a, M> From<&Model<'a, M>> for Document
393where
394 M: Serialize,
395{
396 fn from(value: &Model<'a, M>) -> Self {
397 mongodb::bson::to_document(&value.inner).unwrap()
398 }
399}
400
401impl<'a, M> From<&mut Model<'a, M>> for Document
402where
403 M: Serialize,
404{
405 fn from(value: &mut Model<'a, M>) -> Self {
406 mongodb::bson::to_document(&value.inner).unwrap()
407 }
408}