use async_trait::async_trait;
use sea_orm::*;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use crate::{
schemas::abnormal_stock_trade::{
self, ActiveModel as AbnormalStockTradeActiveModel, Entity as AbnormalStockTradeEntity,
Model as AbnormalStockTradeModel,
},
types::AbnormalStockTrade,
};
#[derive(Deserialize, Serialize, Debug)]
pub struct AbnormalStockTradeSearchOptions {
pub action_time: String,
pub analysis_type: String,
pub score: i64,
}
#[async_trait]
pub trait AbnormalStockTradeRepository: Send + Sync {
async fn insert_abnormal_stock_trade(
&self,
abnormal_record: AbnormalStockTrade,
) -> Result<AbnormalStockTradeModel, DbErr>;
async fn find_abnormal_stock_trades(
&self,
options: AbnormalStockTradeSearchOptions,
) -> Result<Vec<AbnormalStockTradeModel>, DbErr>;
}
pub struct SeaOrmAbnormalStockTradeRepository {
db_conn: Arc<DatabaseConnection>,
}
impl SeaOrmAbnormalStockTradeRepository {
pub fn new(db_conn: Arc<DatabaseConnection>) -> Self {
Self { db_conn }
}
}
#[async_trait]
impl AbnormalStockTradeRepository for SeaOrmAbnormalStockTradeRepository {
async fn insert_abnormal_stock_trade(
&self,
abnormal_record: AbnormalStockTrade,
) -> Result<AbnormalStockTradeModel, DbErr> {
let records = AbnormalStockTradeEntity::find()
.filter(abnormal_stock_trade::Column::Ticker.eq(abnormal_record.ticker.clone()))
.filter(
abnormal_stock_trade::Column::AnalysisType
.eq(abnormal_record.analysis_type.clone()),
)
.filter(abnormal_stock_trade::Column::ActionTime.like(format!(
"%{}%",
abnormal_record.action_time.format("%Y-%m-%d")
)))
.all(&*self.db_conn)
.await?;
if let Some(existing) = records.into_iter().next() {
return Ok(existing);
}
let active_model = AbnormalStockTradeActiveModel {
created_at: Set(abnormal_record.created_at),
ticker: Set(abnormal_record.ticker),
analysis_type: Set(abnormal_record.analysis_type),
last_vol: Set(abnormal_record.last_vol),
last_price: Set(abnormal_record.last_price),
price_change: Set(abnormal_record.price_change),
total_vol: Set(abnormal_record.total_vol),
action_time: Set(abnormal_record.action_time),
notification_sent: Set(abnormal_record.notification_sent),
reason: Set(abnormal_record.reason),
score: Set(abnormal_record.score),
..Default::default()
};
let res = abnormal_stock_trade::Entity::insert(active_model)
.exec(&*self.db_conn)
.await?;
let inserted = abnormal_stock_trade::Entity::find_by_id(res.last_insert_id)
.one(&*self.db_conn)
.await?
.ok_or(DbErr::Custom("Insert failed".into()))?;
Ok(inserted)
}
async fn find_abnormal_stock_trades(
&self,
options: AbnormalStockTradeSearchOptions,
) -> Result<Vec<AbnormalStockTradeModel>, DbErr> {
let AbnormalStockTradeSearchOptions { action_time, analysis_type, score, .. } = options;
AbnormalStockTradeEntity::find()
.filter(abnormal_stock_trade::Column::ActionTime.like(format!("%{}%", action_time)))
.filter(abnormal_stock_trade::Column::NotificationSent.eq(false))
.filter(
Condition::all()
.add(
abnormal_stock_trade::Column::AnalysisType
.eq(analysis_type),
)
.add(abnormal_stock_trade::Column::Score.gte(score)),
)
.all(&*self.db_conn)
.await
}
}