1use async_trait::async_trait;
2use bson::to_document;
3use bson::Document;
4use debot_utils::HasId;
5use futures::stream::TryStreamExt;
6use mongodb::bson::doc;
7use mongodb::options::*;
8use mongodb::Database;
9use mongodb::{Collection, IndexModel};
10use serde::de::DeserializeOwned;
11use serde::Serialize;
12use std::error;
13use std::io::{Error, ErrorKind};
14
15use crate::PositionLog;
16
17use super::AppState;
18use super::PnlLog;
19use super::PriceLog;
20
21pub enum SearchMode {
22 Ascending,
23 Descending,
24 ById,
25}
26
27#[async_trait]
28pub trait Entity {
29 async fn insert(&self, db: &Database) -> Result<(), Box<dyn error::Error>>;
30 async fn update(&self, db: &Database) -> Result<(), Box<dyn error::Error>>;
31 async fn delete(&self, db: &Database) -> Result<(), Box<dyn error::Error>>;
32 async fn delete_all(&self, db: &Database) -> Result<(), Box<dyn error::Error>>;
33
34 async fn search(
35 &self,
36 db: &Database,
37 mode: SearchMode,
38 limit: Option<u32>,
39 id: Option<u32>,
40 sort_key: Option<&str>,
41 ) -> Result<Vec<Self>, Box<dyn error::Error>>
42 where
43 Self: std::marker::Sized;
44
45 fn get_collection_name(&self) -> &str;
46
47 fn get_collection(&self, db: &Database) -> Collection<Self>
48 where
49 Self: std::marker::Sized,
50 {
51 db.collection::<Self>(self.get_collection_name())
52 }
53
54 async fn create_indexes(&self, db: &Database) -> Result<(), Box<dyn error::Error>>
55 where
56 Self: std::marker::Sized,
57 Self: std::marker::Send;
58}
59
60pub async fn insert_item<T: Entity>(db: &Database, item: &T) -> Result<(), Box<dyn error::Error>> {
61 item.insert(db).await
62}
63
64pub async fn update_item<T: Entity>(db: &Database, item: &T) -> Result<(), Box<dyn error::Error>> {
65 item.update(db).await
66}
67
68#[allow(dead_code)]
69pub async fn delete_item<T: Entity>(db: &Database, item: &T) -> Result<(), Box<dyn error::Error>> {
70 item.delete(db).await
71}
72
73#[allow(dead_code)]
74pub async fn delete_item_all<T: Entity>(
75 db: &Database,
76 item: &T,
77) -> Result<(), Box<dyn error::Error>> {
78 item.delete_all(db).await
79}
80
81pub async fn search_items<T: Entity>(
82 db: &Database,
83 item: &T,
84 mode: SearchMode,
85 limit: Option<u32>,
86 id: Option<u32>,
87 sort_key: Option<&str>,
88) -> Result<Vec<T>, Box<dyn error::Error>> {
89 item.search(db, mode, limit, id, sort_key).await
90}
91
92pub async fn search_item<T: Entity>(
93 db: &Database,
94 item: &T,
95 id: Option<u32>,
96 sort_key: Option<&str>,
97) -> Result<T, Box<dyn error::Error>> {
98 let mut items = item
99 .search(db, SearchMode::ById, None, id, sort_key)
100 .await?;
101 if items.len() == 1 {
102 Ok(items.pop().unwrap())
103 } else {
104 Err(Box::new(Error::new(
105 ErrorKind::Other,
106 "Multiple items are found".to_string(),
107 )))
108 }
109}
110
111async fn ensure_collection_exists(collection: &Collection<Document>) {
112 let doc = doc! { "_id": bson::oid::ObjectId::new() };
113 let _ = collection.insert_one(doc, None).await.ok();
114}
115
116async fn get_existing_indexes<T>(
117 collection: &Collection<T>,
118) -> Result<Vec<String>, Box<dyn error::Error>> {
119 let mut indexes = collection.list_indexes(None).await?;
120 let mut index_names = Vec::new();
121
122 while let Some(index) = indexes.try_next().await? {
123 let document: Document = to_document(&index)?;
124 if let Some(name) = document.get_str("name").ok() {
125 index_names.push(name.to_string());
126 }
127 }
128
129 log::debug!("Existing indexes: {:?}", index_names);
130 Ok(index_names)
131}
132pub async fn create_unique_index(db: &Database) -> Result<(), Box<dyn error::Error>> {
133 async fn create_index<T: Entity>(
134 db: &Database,
135 entity: &T,
136 ) -> Result<(), Box<dyn error::Error>> {
137 let collection: Collection<Document> =
138 db.collection::<Document>(entity.get_collection_name());
139 ensure_collection_exists(&collection).await;
140 let existing_indexes = get_existing_indexes(&collection).await?;
141
142 let indexes = vec![
143 (
144 "id_1",
145 IndexModel::builder()
146 .keys(doc! {"id": 1})
147 .options(IndexOptions::builder().unique(true).build())
148 .build(),
149 ),
150 (
151 "open_timestamp_1",
152 IndexModel::builder()
153 .keys(doc! {"open_timestamp": 1})
154 .build(),
155 ),
156 (
157 "open_timestamp_-1",
158 IndexModel::builder()
159 .keys(doc! {"open_timestamp": -1})
160 .build(),
161 ),
162 (
163 "price_point.timestamp_1",
164 IndexModel::builder()
165 .keys(doc! {"price_point.timestamp": 1})
166 .build(),
167 ),
168 (
169 "price_point.timestamp_-1",
170 IndexModel::builder()
171 .keys(doc! {"price_point.timestamp": -1})
172 .build(),
173 ),
174 ];
175
176 for (index_name, index_model) in indexes {
177 if !existing_indexes.contains(&index_name.to_string()) {
178 log::info!("Creating index `{}`...", index_name);
179 collection.create_index(index_model, None).await?;
180 log::info!("Index `{}` has been created successfully!", index_name);
181 } else {
182 log::debug!("Index `{}` already exists, skipping.", index_name);
183 }
184 }
185
186 Ok(())
187 }
188
189 create_index(db, &PositionLog::default()).await?;
190 create_index(db, &AppState::default()).await?;
191 create_index(db, &PriceLog::default()).await?;
192 create_index(db, &PnlLog::default()).await?;
193
194 Ok(())
195}
196
197#[async_trait]
198impl Entity for PositionLog {
199 async fn create_indexes(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
200 let collection = self.get_collection(db);
201
202 let id_index = IndexModel::builder()
203 .keys(doc! {"id": 1})
204 .options(IndexOptions::builder().unique(true).build())
205 .build();
206
207 let open_timestamp_index = IndexModel::builder()
208 .keys(doc! {"open_timestamp": 1})
209 .build();
210
211 let open_timestamp_index_2 = IndexModel::builder()
212 .keys(doc! {"open_timestamp": -1})
213 .build();
214
215 collection.create_index(id_index, None).await?;
216 collection.create_index(open_timestamp_index, None).await?;
217 collection
218 .create_index(open_timestamp_index_2, None)
219 .await?;
220
221 Ok(())
222 }
223
224 async fn insert(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
225 let collection = self.get_collection(db);
226 collection.insert_one(self, None).await?;
227 Ok(())
228 }
229
230 async fn update(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
231 let query = doc! { "id": self.id() };
232 let update = bson::to_bson(self).unwrap();
233 let update = doc! { "$set" : update };
234 let collection = self.get_collection(db);
235 collection.update(query, update, true).await
236 }
237
238 async fn delete(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
239 panic!("Not implemented")
240 }
241
242 async fn delete_all(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
243 let collection = self.get_collection(db);
244 collection.delete_all().await
245 }
246
247 async fn search(
248 &self,
249 db: &Database,
250 mode: SearchMode,
251 limit: Option<u32>,
252 id: Option<u32>,
253 sort_key: Option<&str>,
254 ) -> Result<Vec<Self>, Box<dyn error::Error>> {
255 let mut query = doc! { "id": { "$gt": 0 }};
256 if self.id() != None {
257 query = doc! { "id": self.id().unwrap() };
258 }
259 let collection = self.get_collection(db);
260 let sort_key = sort_key.unwrap_or("id");
261 collection.search(query, mode, limit, id, &sort_key).await
262 }
263
264 fn get_collection_name(&self) -> &str {
265 "position"
266 }
267}
268
269#[async_trait]
270impl Entity for PnlLog {
271 async fn create_indexes(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
272 let collection = self.get_collection(db);
273
274 let id_index = IndexModel::builder()
275 .keys(doc! {"id": 1})
276 .options(IndexOptions::builder().unique(true).build())
277 .build();
278
279 collection.create_index(id_index, None).await?;
280
281 Ok(())
282 }
283
284 async fn insert(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
285 let collection = self.get_collection(db);
286 collection.insert_one(self, None).await?;
287 Ok(())
288 }
289
290 async fn update(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
291 panic!("Not implemented")
292 }
293
294 async fn delete(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
295 panic!("Not implemented")
296 }
297
298 async fn delete_all(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
299 panic!("Not implemented")
300 }
301
302 async fn search(
303 &self,
304 db: &Database,
305 mode: SearchMode,
306 limit: Option<u32>,
307 id: Option<u32>,
308 sort_key: Option<&str>,
309 ) -> Result<Vec<Self>, Box<dyn error::Error>> {
310 let mut query = doc! { "id": { "$gt": 0 }};
311 if self.id != None {
312 query = doc! { "id": self.id.unwrap() };
313 }
314 let collection = self.get_collection(db);
315 let sort_key = sort_key.unwrap_or("id");
316 collection.search(query, mode, limit, id, &sort_key).await
317 }
318
319 fn get_collection_name(&self) -> &str {
320 "balance"
321 }
322}
323
324#[async_trait]
325impl Entity for AppState {
326 async fn create_indexes(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
327 let collection = self.get_collection(db);
328
329 let id_index = IndexModel::builder()
330 .keys(doc! {"id": 1})
331 .options(IndexOptions::builder().unique(true).build())
332 .build();
333
334 collection.create_index(id_index, None).await?;
335
336 Ok(())
337 }
338
339 async fn insert(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
340 panic!("Not implemented")
341 }
342
343 async fn update(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
344 let query = doc! { "id": 1 };
345 let update = bson::to_bson(self).unwrap();
346 let update = doc! { "$set" : update };
347 let collection = self.get_collection(db);
348 collection.update(query, update, true).await
349 }
350
351 async fn delete(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
352 panic!("Not implemented")
353 }
354
355 async fn delete_all(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
356 let collection = self.get_collection(db);
357 collection.delete_all().await
358 }
359
360 async fn search(
361 &self,
362 db: &Database,
363 mode: SearchMode,
364 limit: Option<u32>,
365 id: Option<u32>,
366 sort_key: Option<&str>,
367 ) -> Result<Vec<Self>, Box<dyn error::Error>> {
368 let query = doc! { "id": 1 };
369 let collection = self.get_collection(db);
370 let sort_key = sort_key.unwrap_or("id");
371 collection.search(query, mode, limit, id, &sort_key).await
372 }
373
374 fn get_collection_name(&self) -> &str {
375 "app-state"
376 }
377}
378
379#[async_trait]
380impl Entity for PriceLog {
381 async fn create_indexes(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
382 let collection = self.get_collection(db);
383
384 let id_index = IndexModel::builder()
385 .keys(doc! {"id": 1})
386 .options(IndexOptions::builder().unique(true).build())
387 .build();
388
389 let price_point_timestamp_index = IndexModel::builder()
390 .keys(doc! {"price_point.timestamp": 1})
391 .build();
392
393 let price_point_timestamp_index_2 = IndexModel::builder()
394 .keys(doc! {"price_point.timestamp": -1})
395 .build();
396
397 collection.create_index(id_index, None).await?;
398 collection
399 .create_index(price_point_timestamp_index, None)
400 .await?;
401 collection
402 .create_index(price_point_timestamp_index_2, None)
403 .await?;
404
405 Ok(())
406 }
407
408 async fn insert(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
409 let collection = self.get_collection(db);
410 collection.insert_one(self, None).await?;
411 Ok(())
412 }
413
414 async fn update(&self, db: &Database) -> Result<(), Box<dyn error::Error>> {
415 let query = doc! { "id": self.id };
416 let update = bson::to_bson(self).unwrap();
417 let update = doc! { "$set" : update };
418 let collection = self.get_collection(db);
419 collection.update(query, update, true).await
420 }
421
422 async fn delete(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
423 panic!("Not implemented")
424 }
425
426 async fn delete_all(&self, _db: &Database) -> Result<(), Box<dyn error::Error>> {
427 panic!("Not implemented")
428 }
429
430 async fn search(
431 &self,
432 db: &Database,
433 mode: SearchMode,
434 limit: Option<u32>,
435 id: Option<u32>,
436 sort_key: Option<&str>,
437 ) -> Result<Vec<Self>, Box<dyn error::Error>> {
438 let mut query = doc! { "id": { "$gt": 0 }};
439 if self.id != None {
440 query = doc! { "id": self.id.unwrap() };
441 }
442 let collection = self.get_collection(db);
443 let sort_key = sort_key.unwrap_or("id");
444 collection.search(query, mode, limit, id, &sort_key).await
445 }
446
447 fn get_collection_name(&self) -> &str {
448 "price"
449 }
450}
451
452#[async_trait]
453pub trait HelperCollection<T> {
454 async fn update(
455 &self,
456 query: Document,
457 update: Document,
458 upsert: bool,
459 ) -> Result<(), Box<dyn error::Error>>;
460 async fn delete(&self, query: Document) -> Result<(), Box<dyn error::Error>>;
461 async fn delete_all(&self) -> Result<(), Box<dyn error::Error>>;
462 async fn search(
463 &self,
464 query: Document,
465 mode: SearchMode,
466 limit: Option<u32>,
467 id: Option<u32>,
468 sort_key: &str,
469 ) -> Result<Vec<T>, Box<dyn error::Error>>;
470}
471
472#[async_trait]
473impl<T> HelperCollection<T> for Collection<T>
474where
475 T: DeserializeOwned + Unpin + Send + Sync + Serialize + std::fmt::Debug,
476{
477 async fn update(
478 &self,
479 query: Document,
480 update: Document,
481 upsert: bool,
482 ) -> Result<(), Box<dyn error::Error>> {
483 let options = FindOneAndUpdateOptions::builder()
484 .upsert(upsert)
485 .return_document(ReturnDocument::After)
486 .build();
487 let _ = self.find_one_and_update(query, update, options).await?;
488 Ok(())
489 }
490
491 async fn delete(&self, query: Document) -> Result<(), Box<dyn error::Error>> {
492 let result = self.delete_one(query, None).await?;
493 if result.deleted_count == 1 {
494 return Ok(());
495 } else {
496 panic!("Not implemented")
497 }
498 }
499
500 async fn delete_all(&self) -> Result<(), Box<dyn error::Error>> {
501 let options = DropCollectionOptions::builder().build();
502 self.drop(options).await?;
503 Ok(())
504 }
505
506 async fn search(
507 &self,
508 mut query: Document,
509 mode: SearchMode,
510 limit: Option<u32>,
511 id: Option<u32>,
512 sort_key: &str,
513 ) -> Result<Vec<T>, Box<dyn error::Error>> {
514 let mut items: Vec<T> = vec![];
515
516 match sort_key {
517 "id" | "open_timestamp" | "price_point.timestamp" => {}
518 _ => {
519 return Err(Box::new(Error::new(
520 ErrorKind::InvalidInput,
521 "Invalid sort key",
522 )))
523 }
524 };
525
526 let find_options = match mode {
527 SearchMode::Ascending => {
528 let builder = FindOptions::builder()
529 .allow_disk_use(Some(true))
530 .sort(doc! { sort_key: 1 });
531
532 if let Some(limit_value) = limit {
533 builder.limit(limit_value as i64).build()
534 } else {
535 builder.build()
536 }
537 }
538 SearchMode::Descending => {
539 let builder = FindOptions::builder()
540 .allow_disk_use(Some(true))
541 .sort(doc! { sort_key: -1 });
542
543 if let Some(limit_value) = limit {
544 builder.limit(limit_value as i64).build()
545 } else {
546 builder.build()
547 }
548 }
549 SearchMode::ById => {
550 if let Some(id_value) = id {
551 query.insert("id", id_value);
552 } else {
553 return Err(Box::new(Error::new(
554 ErrorKind::InvalidInput,
555 "ID not provided".to_string(),
556 )));
557 }
558 FindOptions::builder().allow_disk_use(Some(true)).build()
559 }
560 };
561
562 let mut cursor = self.find(query, find_options).await?;
563 while let Some(item) = cursor.try_next().await? {
564 items.push(item);
565 }
566
567 if items.is_empty() {
568 Err(Box::new(Error::new(
569 ErrorKind::Other,
570 "Item not found".to_string(),
571 )))
572 } else {
573 Ok(items)
574 }
575 }
576}