use async_trait::async_trait;
use sea_orm::ActiveValue::Set;
use sea_orm::sea_query::OnConflict;
use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, InsertResult, QueryFilter, QueryOrder};
use std::sync::Arc;
use crate::models::IntradayTrading;
use crate::schemas::intraday_trading::{self as intraday_trading};
use crate::types::TimeRange;
#[derive(Debug, Clone)]
pub struct FindOptions {
pub ticker: String,
pub timerange: TimeRange,
}
#[async_trait]
pub trait IntradayTradingRepository: Send + Sync {
async fn insert_list_stock_trade(
&self,
rows: Vec<IntradayTrading>,
) -> Result<InsertResult<intraday_trading::ActiveModel>, DbErr>;
async fn find_intraday_data_by_ticker(
&self,
ticker: String,
trading_timestamp: String,
) -> Result<Vec<IntradayTrading>, DbErr>;
async fn find(&self, options: FindOptions) -> Result<Vec<IntradayTrading>, DbErr>;
}
pub struct SeaOrmIntradayTradingRepository {
db_conn: Arc<DatabaseConnection>,
}
impl SeaOrmIntradayTradingRepository {
pub fn new(db_conn: Arc<DatabaseConnection>) -> Self {
Self { db_conn }
}
async fn upsert(
&self,
data: Vec<IntradayTrading>,
) -> Result<InsertResult<intraday_trading::ActiveModel>, DbErr> {
let unique_columns = [
intraday_trading::Column::Change,
intraday_trading::Column::LastPrice,
intraday_trading::Column::LastVol,
intraday_trading::Column::Ticker,
intraday_trading::Column::TotalVol,
intraday_trading::Column::TradingTimestamp,
];
let on_conflict = OnConflict::columns(unique_columns)
.update_columns(unique_columns)
.to_owned();
let active_models: Vec<intraday_trading::ActiveModel> = data
.into_iter()
.map(|item| intraday_trading::ActiveModel {
id: Set(0), change: Set(item.change),
last_price: Set(item.last_price),
last_vol: Set(item.last_vol),
side: Set(item.side),
ticker: Set(item.ticker),
total_vol: Set(item.total_vol),
trading_timestamp: Set(item.trading_timestamp),
})
.collect();
let res = intraday_trading::Entity::insert_many(active_models)
.on_conflict(on_conflict.clone())
.exec(&*self.db_conn)
.await?;
Ok(res)
}
}
#[async_trait]
impl IntradayTradingRepository for SeaOrmIntradayTradingRepository {
async fn insert_list_stock_trade(
&self,
rows: Vec<IntradayTrading>,
) -> Result<InsertResult<intraday_trading::ActiveModel>, DbErr> {
if rows.is_empty() {
return Ok(InsertResult {
last_insert_id: Default::default(),
});
}
let result = self.upsert(rows).await?;
let last_id = result.last_insert_id;
Ok(InsertResult {
last_insert_id: last_id as i32,
})
}
async fn find_intraday_data_by_ticker(
&self,
ticker: String,
trading_timestamp: String,
) -> Result<Vec<IntradayTrading>, DbErr> {
let db = &*self.db_conn;
let records = intraday_trading::Entity::find()
.filter(
intraday_trading::Column::Ticker.eq(ticker).and(
intraday_trading::Column::TradingTimestamp
.like(format!("%{}%", trading_timestamp)),
),
)
.all(db)
.await;
match records {
Ok(models) => {
let results = models
.into_iter()
.map(|model| IntradayTrading {
change: model.change,
last_price: model.last_price,
last_vol: model.last_vol,
side: model.side,
ticker: model.ticker,
total_vol: model.total_vol,
trading_timestamp: model.trading_timestamp,
})
.collect();
Ok(results)
}
Err(e) => Err(e),
}
}
async fn find(&self, options: FindOptions) -> Result<Vec<IntradayTrading>, DbErr> {
let db = &*self.db_conn;
let query = intraday_trading::Entity::find()
.filter(intraday_trading::Column::Ticker.eq(options.ticker))
.filter(intraday_trading::Column::TradingTimestamp.gte(options.timerange.start))
.filter(intraday_trading::Column::TradingTimestamp.lte(options.timerange.end))
.order_by_asc(intraday_trading::Column::TradingTimestamp);
let records = query.all(db).await;
match records {
Ok(models) => {
let results = models
.into_iter()
.map(|model| IntradayTrading {
change: model.change,
last_price: model.last_price,
last_vol: model.last_vol,
side: model.side,
ticker: model.ticker,
total_vol: model.total_vol,
trading_timestamp: model.trading_timestamp,
})
.collect();
Ok(results)
}
Err(e) => Err(e),
}
}
}