debot_db/
item.rs

1use async_trait::async_trait;
2use bson::to_document;
3use bson::Document;
4use debot_utils::HasId;
5use futures::stream::TryStreamExt;
6use mongodb::bson::doc;
7use mongodb::options::*;
8use mongodb::Database;
9use mongodb::{Collection, IndexModel};
10use serde::de::DeserializeOwned;
11use serde::Serialize;
12use std::error;
13use std::io::{Error, ErrorKind};
14
15use crate::PositionLog;
16
17use super::AppState;
18use super::PnlLog;
19use super::PriceLog;
20
21pub enum SearchMode {
22    Ascending,
23    Descending,
24    ById,
25}
26
27#[async_trait]
28pub trait Entity {
29    async fn insert(&self, db: &Database) -> Result<(), Box<dyn error::Error>>;
30    async fn update(&self, db: &Database) -> Result<(), Box<dyn error::Error>>;
31    async fn delete(&self, db: &Database) -> Result<(), Box<dyn error::Error>>;
32    async fn delete_all(&self, db: &Database) -> Result<(), Box<dyn error::Error>>;
33
34    async fn search(
35        &self,
36        db: &Database,
37        mode: SearchMode,
38        limit: Option<u32>,
39        id: Option<u32>,
40        sort_key: Option<&str>,
41    ) -> Result<Vec<Self>, Box<dyn error::Error>>
42    where
43        Self: std::marker::Sized;
44
45    fn get_collection_name(&self) -> &str;
46
47    fn get_collection(&self, db: &Database) -> Collection<Self>
48    where
49        Self: std::marker::Sized,
50    {
51        db.collection::<Self>(self.get_collection_name())
52    }
53
54    async fn create_indexes(&self, db: &Database) -> Result<(), Box<dyn error::Error>>
55    where
56        Self: std::marker::Sized,
57        Self: std::marker::Send;
58}
59
60pub async fn insert_item<T: Entity>(db: &Database, item: &T) -> Result<(), Box<dyn error::Error>> {
61    item.insert(db).await
62}
63
64pub async fn update_item<T: Entity>(db: &Database, item: &T) -> Result<(), Box<dyn error::Error>> {
65    item.update(db).await
66}
67
68#[allow(dead_code)]
69pub async fn delete_item<T: Entity>(db: &Database, item: &T) -> Result<(), Box<dyn error::Error>> {
70    item.delete(db).await
71}
72
73#[allow(dead_code)]
74pub async fn delete_item_all<T: Entity>(
75    db: &Database,
76    item: &T,
77) -> Result<(), Box<dyn error::Error>> {
78    item.delete_all(db).await
79}
80
81pub async fn search_items<T: Entity>(
82    db: &Database,
83    item: &T,
84    mode: SearchMode,
85    limit: Option<u32>,
86    id: Option<u32>,
87    sort_key: Option<&str>,
88) -> Result<Vec<T>, Box<dyn error::Error>> {
89    item.search(db, mode, limit, id, sort_key).await
90}
91
92pub async fn search_item<T: Entity>(
93    db: &Database,
94    item: &T,
95    id: Option<u32>,
96    sort_key: Option<&str>,
97) -> Result<T, Box<dyn error::Error>> {
98    let mut items = item
99        .search(db, SearchMode::ById, None, id, sort_key)
100        .await?;
101    if items.len() == 1 {
102        Ok(items.pop().unwrap())
103    } else {
104        Err(Box::new(Error::new(
105            ErrorKind::Other,
106            "Multiple items are found".to_string(),
107        )))
108    }
109}
110
111async fn ensure_collection_exists(collection: &Collection<Document>) {
112    let doc = doc! { "_id": bson::oid::ObjectId::new() };
113    let _ = collection.insert_one(doc, None).await.ok();
114}
115
116async fn get_existing_indexes<T>(
117    collection: &Collection<T>,
118) -> Result<Vec<String>, Box<dyn error::Error>> {
119    let mut indexes = collection.list_indexes(None).await?;
120    let mut index_names = Vec::new();
121
122    while let Some(index) = indexes.try_next().await? {
123        let document: Document = to_document(&index)?;
124        if let Some(name) = document.get_str("name").ok() {
125            index_names.push(name.to_string());
126        }
127    }
128
129    log::debug!("Existing indexes: {:?}", index_names);
130    Ok(index_names)
131}
132pub async fn create_unique_index(db: &Database) -> Result<(), Box<dyn error::Error>> {
133    async fn create_index<T: Entity>(
134        db: &Database,
135        entity: &T,
136    ) -> Result<(), Box<dyn error::Error>> {
137        let collection: Collection<Document> =
138            db.collection::<Document>(entity.get_collection_name());
139        ensure_collection_exists(&collection).await;
140        let existing_indexes = get_existing_indexes(&collection).await?;
141
142        let indexes = vec![
143            (
144                "id_1",
145                IndexModel::builder()
146                    .keys(doc! {"id": 1})
147                    .options(IndexOptions::builder().unique(true).build())
148                    .build(),
149            ),
150            (
151                "open_timestamp_1",
152                IndexModel::builder()
153                    .keys(doc! {"open_timestamp": 1})
154                    .build(),
155            ),
156            (
157                "open_timestamp_-1",
158                IndexModel::builder()
159                    .keys(doc! {"open_timestamp": -1})
160                    .build(),
161            ),
162            (
163                "price_point.timestamp_1",
164                IndexModel::builder()
165                    .keys(doc! {"price_point.timestamp": 1})
166                    .build(),
167            ),
168            (
169                "price_point.timestamp_-1",
170                IndexModel::builder()
171                    .keys(doc! {"price_point.timestamp": -1})
172                    .build(),
173            ),
174        ];
175
176        for (index_name, index_model) in indexes {
177            if !existing_indexes.contains(&index_name.to_string()) {
178                log::info!("Creating index `{}`...", index_name);
179                collection.create_index(index_model, None).await?;
180                log::info!("Index `{}` has been created successfully!", index_name);
181            } else {
182                log::debug!("Index `{}` already exists, skipping.", index_name);
183            }
184        }
185
186        Ok(())
187    }
188
189    create_index(db, &PositionLog::default()).await?;
190    create_index(db, &AppState::default()).await?;
191    create_index(db, &PriceLog::default()).await?;
192    create_index(db, &PnlLog::default()).await?;
193
194    Ok(())
195}
196
197#[async_trait]
198impl Entity for PositionLog {
199    async fn create_indexes(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
200        let collection = self.get_collection(db);
201
202        let id_index = IndexModel::builder()
203            .keys(doc! {"id": 1})
204            .options(IndexOptions::builder().unique(true).build())
205            .build();
206
207        let open_timestamp_index = IndexModel::builder()
208            .keys(doc! {"open_timestamp": 1})
209            .build();
210
211        let open_timestamp_index_2 = IndexModel::builder()
212            .keys(doc! {"open_timestamp": -1})
213            .build();
214
215        collection.create_index(id_index, None).await?;
216        collection.create_index(open_timestamp_index, None).await?;
217        collection
218            .create_index(open_timestamp_index_2, None)
219            .await?;
220
221        Ok(())
222    }
223
224    async fn insert(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
225        let collection = self.get_collection(db);
226        collection.insert_one(self, None).await?;
227        Ok(())
228    }
229
230    async fn update(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
231        let query = doc! { "id": self.id() };
232        let update = bson::to_bson(self).unwrap();
233        let update = doc! { "$set" : update };
234        let collection = self.get_collection(db);
235        collection.update(query, update, true).await
236    }
237
238    async fn delete(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
239        panic!("Not implemented")
240    }
241
242    async fn delete_all(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
243        let collection = self.get_collection(db);
244        collection.delete_all().await
245    }
246
247    async fn search(
248        &self,
249        db: &Database,
250        mode: SearchMode,
251        limit: Option<u32>,
252        id: Option<u32>,
253        sort_key: Option<&str>,
254    ) -> Result<Vec<Self>, Box<dyn error::Error>> {
255        let mut query = doc! { "id": { "$gt": 0 }};
256        if self.id() != None {
257            query = doc! { "id": self.id().unwrap() };
258        }
259        let collection = self.get_collection(db);
260        let sort_key = sort_key.unwrap_or("id");
261        collection.search(query, mode, limit, id, &sort_key).await
262    }
263
264    fn get_collection_name(&self) -> &str {
265        "position"
266    }
267}
268
269#[async_trait]
270impl Entity for PnlLog {
271    async fn create_indexes(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
272        let collection = self.get_collection(db);
273
274        let id_index = IndexModel::builder()
275            .keys(doc! {"id": 1})
276            .options(IndexOptions::builder().unique(true).build())
277            .build();
278
279        collection.create_index(id_index, None).await?;
280
281        Ok(())
282    }
283
284    async fn insert(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
285        let collection = self.get_collection(db);
286        collection.insert_one(self, None).await?;
287        Ok(())
288    }
289
290    async fn update(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
291        panic!("Not implemented")
292    }
293
294    async fn delete(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
295        panic!("Not implemented")
296    }
297
298    async fn delete_all(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
299        panic!("Not implemented")
300    }
301
302    async fn search(
303        &self,
304        db: &Database,
305        mode: SearchMode,
306        limit: Option<u32>,
307        id: Option<u32>,
308        sort_key: Option<&str>,
309    ) -> Result<Vec<Self>, Box<dyn error::Error>> {
310        let mut query = doc! { "id": { "$gt": 0 }};
311        if self.id != None {
312            query = doc! { "id": self.id.unwrap() };
313        }
314        let collection = self.get_collection(db);
315        let sort_key = sort_key.unwrap_or("id");
316        collection.search(query, mode, limit, id, &sort_key).await
317    }
318
319    fn get_collection_name(&self) -> &str {
320        "balance"
321    }
322}
323
324#[async_trait]
325impl Entity for AppState {
326    async fn create_indexes(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
327        let collection = self.get_collection(db);
328
329        let id_index = IndexModel::builder()
330            .keys(doc! {"id": 1})
331            .options(IndexOptions::builder().unique(true).build())
332            .build();
333
334        collection.create_index(id_index, None).await?;
335
336        Ok(())
337    }
338
339    async fn insert(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
340        panic!("Not implemented")
341    }
342
343    async fn update(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
344        let query = doc! { "id": 1 };
345        let update = bson::to_bson(self).unwrap();
346        let update = doc! { "$set" : update };
347        let collection = self.get_collection(db);
348        collection.update(query, update, true).await
349    }
350
351    async fn delete(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
352        panic!("Not implemented")
353    }
354
355    async fn delete_all(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
356        let collection = self.get_collection(db);
357        collection.delete_all().await
358    }
359
360    async fn search(
361        &self,
362        db: &Database,
363        mode: SearchMode,
364        limit: Option<u32>,
365        id: Option<u32>,
366        sort_key: Option<&str>,
367    ) -> Result<Vec<Self>, Box<dyn error::Error>> {
368        let query = doc! { "id": 1 };
369        let collection = self.get_collection(db);
370        let sort_key = sort_key.unwrap_or("id");
371        collection.search(query, mode, limit, id, &sort_key).await
372    }
373
374    fn get_collection_name(&self) -> &str {
375        "app-state"
376    }
377}
378
379#[async_trait]
380impl Entity for PriceLog {
381    async fn create_indexes(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
382        let collection = self.get_collection(db);
383
384        let id_index = IndexModel::builder()
385            .keys(doc! {"id": 1})
386            .options(IndexOptions::builder().unique(true).build())
387            .build();
388
389        let price_point_timestamp_index = IndexModel::builder()
390            .keys(doc! {"price_point.timestamp": 1})
391            .build();
392
393        let price_point_timestamp_index_2 = IndexModel::builder()
394            .keys(doc! {"price_point.timestamp": -1})
395            .build();
396
397        collection.create_index(id_index, None).await?;
398        collection
399            .create_index(price_point_timestamp_index, None)
400            .await?;
401        collection
402            .create_index(price_point_timestamp_index_2, None)
403            .await?;
404
405        Ok(())
406    }
407
408    async fn insert(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
409        let collection = self.get_collection(db);
410        collection.insert_one(self, None).await?;
411        Ok(())
412    }
413
414    async fn update(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
415        let query = doc! { "id": self.id };
416        let update = bson::to_bson(self).unwrap();
417        let update = doc! { "$set" : update };
418        let collection = self.get_collection(db);
419        collection.update(query, update, true).await
420    }
421
422    async fn delete(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
423        panic!("Not implemented")
424    }
425
426    async fn delete_all(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
427        panic!("Not implemented")
428    }
429
430    async fn search(
431        &self,
432        db: &Database,
433        mode: SearchMode,
434        limit: Option<u32>,
435        id: Option<u32>,
436        sort_key: Option<&str>,
437    ) -> Result<Vec<Self>, Box<dyn error::Error>> {
438        let mut query = doc! { "id": { "$gt": 0 }};
439        if self.id != None {
440            query = doc! { "id": self.id.unwrap() };
441        }
442        let collection = self.get_collection(db);
443        let sort_key = sort_key.unwrap_or("id");
444        collection.search(query, mode, limit, id, &sort_key).await
445    }
446
447    fn get_collection_name(&self) -> &str {
448        "price"
449    }
450}
451
452#[async_trait]
453pub trait HelperCollection<T> {
454    async fn update(
455        &self,
456        query: Document,
457        update: Document,
458        upsert: bool,
459    ) -> Result<(), Box<dyn error::Error>>;
460    async fn delete(&self, query: Document) -> Result<(), Box<dyn error::Error>>;
461    async fn delete_all(&self) -> Result<(), Box<dyn error::Error>>;
462    async fn search(
463        &self,
464        query: Document,
465        mode: SearchMode,
466        limit: Option<u32>,
467        id: Option<u32>,
468        sort_key: &str,
469    ) -> Result<Vec<T>, Box<dyn error::Error>>;
470}
471
472#[async_trait]
473impl<T> HelperCollection<T> for Collection<T>
474where
475    T: DeserializeOwned + Unpin + Send + Sync + Serialize + std::fmt::Debug,
476{
477    async fn update(
478        &self,
479        query: Document,
480        update: Document,
481        upsert: bool,
482    ) -> Result<(), Box<dyn error::Error>> {
483        let options = FindOneAndUpdateOptions::builder()
484            .upsert(upsert)
485            .return_document(ReturnDocument::After)
486            .build();
487        let _ = self.find_one_and_update(query, update, options).await?;
488        Ok(())
489    }
490
491    async fn delete(&self, query: Document) -> Result<(), Box<dyn error::Error>> {
492        let result = self.delete_one(query, None).await?;
493        if result.deleted_count == 1 {
494            return Ok(());
495        } else {
496            panic!("Not implemented")
497        }
498    }
499
500    async fn delete_all(&self) -> Result<(), Box<dyn error::Error>> {
501        let options = DropCollectionOptions::builder().build();
502        self.drop(options).await?;
503        Ok(())
504    }
505
506    async fn search(
507        &self,
508        mut query: Document,
509        mode: SearchMode,
510        limit: Option<u32>,
511        id: Option<u32>,
512        sort_key: &str,
513    ) -> Result<Vec<T>, Box<dyn error::Error>> {
514        let mut items: Vec<T> = vec![];
515
516        match sort_key {
517            "id" | "open_timestamp" | "price_point.timestamp" => {}
518            _ => {
519                return Err(Box::new(Error::new(
520                    ErrorKind::InvalidInput,
521                    "Invalid sort key",
522                )))
523            }
524        };
525
526        let find_options = match mode {
527            SearchMode::Ascending => {
528                let builder = FindOptions::builder()
529                    .allow_disk_use(Some(true))
530                    .sort(doc! { sort_key: 1 });
531
532                if let Some(limit_value) = limit {
533                    builder.limit(limit_value as i64).build()
534                } else {
535                    builder.build()
536                }
537            }
538            SearchMode::Descending => {
539                let builder = FindOptions::builder()
540                    .allow_disk_use(Some(true))
541                    .sort(doc! { sort_key: -1 });
542
543                if let Some(limit_value) = limit {
544                    builder.limit(limit_value as i64).build()
545                } else {
546                    builder.build()
547                }
548            }
549            SearchMode::ById => {
550                if let Some(id_value) = id {
551                    query.insert("id", id_value);
552                } else {
553                    return Err(Box::new(Error::new(
554                        ErrorKind::InvalidInput,
555                        "ID not provided".to_string(),
556                    )));
557                }
558                FindOptions::builder().allow_disk_use(Some(true)).build()
559            }
560        };
561
562        let mut cursor = self.find(query, find_options).await?;
563        while let Some(item) = cursor.try_next().await? {
564            items.push(item);
565        }
566
567        if items.is_empty() {
568            Err(Box::new(Error::new(
569                ErrorKind::Other,
570                "Item not found".to_string(),
571            )))
572        } else {
573            Ok(items)
574        }
575    }
576}