use crate::schemas::historical_data::{
self, ActiveModel as HistoricalDataActiveModel, Entity, Model as HistoricalDataModel,
};
use crate::services::fireant::FireAntStockData;
use crate::types::SortOrder;
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use sea_orm::sea_query::*;
use sea_orm::*;
use std::sync::Arc;
#[async_trait]
pub trait HistoricalDataRepository: Send + Sync {
async fn upsert_historical_data(
&self,
rows: Vec<FireAntStockData>,
) -> Result<InsertResult<HistoricalDataActiveModel>, DbErr>;
async fn get_historical_data_by_symbol_and_time(
&self,
options: HistoricalDataSearchOptions,
) -> Result<Vec<HistoricalDataModel>, DbErr>;
async fn find_by_timerange(
&self,
ticker_code: String,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
sort_order: SortOrder,
) -> Result<Vec<HistoricalDataModel>, DbErr>;
}
pub struct SeaOrmHistoricalDataRepository {
db_conn: Arc<DatabaseConnection>,
}
#[derive(Debug, Clone)]
pub struct HistoricalDataSearchOptions {
pub symbol: String,
pub end_date: String,
pub limit: u64,
pub order: SortOrder,
}
impl SeaOrmHistoricalDataRepository {
pub fn new(db_conn: Arc<DatabaseConnection>) -> Self {
Self { db_conn }
}
async fn upsert(
&self,
active_models: Vec<HistoricalDataActiveModel>,
) -> Result<InsertResult<HistoricalDataActiveModel>, DbErr> {
let unique_columns = [
historical_data::Column::Date,
historical_data::Column::Ticker,
];
let on_conflict = OnConflict::columns(unique_columns)
.update_columns([
historical_data::Column::Date
])
.to_owned();
let res = Entity::insert_many(active_models)
.on_conflict(on_conflict.clone())
.exec(&*self.db_conn)
.await;
match res {
Ok(val) => Ok(val),
Err(DbErr::RecordNotInserted) => Ok(InsertResult { last_insert_id: 0 }),
Err(e) => Err(e),
}
}
}
#[async_trait]
impl HistoricalDataRepository for SeaOrmHistoricalDataRepository {
async fn upsert_historical_data(
&self,
rows: Vec<FireAntStockData>,
) -> Result<InsertResult<HistoricalDataActiveModel>, DbErr> {
if rows.is_empty() {
return Ok(InsertResult { last_insert_id: 0 });
}
let active_models: Vec<HistoricalDataActiveModel> =
rows.into_iter().map(|row| row.into()).collect();
self.upsert(active_models).await
}
async fn get_historical_data_by_symbol_and_time(
&self,
options: HistoricalDataSearchOptions,
) -> Result<Vec<HistoricalDataModel>, DbErr> {
let end_date = options
.end_date
.parse::<chrono::NaiveDate>()
.map_err(|_| DbErr::Custom("Invalid end date format".to_string()))?
+ Duration::days(1);
let query = historical_data::Entity::find()
.filter(historical_data::Column::Ticker.eq(options.symbol))
.filter(historical_data::Column::Date.lte(end_date))
.order_by_desc(historical_data::Column::Date);
let raw = query.limit(options.limit).all(&*self.db_conn).await;
match options.order {
SortOrder::Ascending => Ok(raw?.into_iter().rev().collect()), SortOrder::Descending => Ok(raw?),
}
}
async fn find_by_timerange(
&self,
ticker_code: String,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
sort_order: SortOrder,
) -> Result<Vec<HistoricalDataModel>, DbErr> {
let mut query = Entity::find()
.filter(historical_data::Column::Ticker.contains(ticker_code.clone()))
.filter(historical_data::Column::Date.gte(start_time))
.filter(historical_data::Column::Date.lte(end_time));
let sort_column = historical_data::Column::Date;
query = match sort_order {
SortOrder::Descending => query.order_by_desc(sort_column),
SortOrder::Ascending => query.order_by_asc(sort_column),
};
let rows = query.all(&*self.db_conn).await?;
if rows.is_empty() {
return Ok(vec![]);
}
let mut result = Vec::new();
for item in rows {
let row: HistoricalDataModel = item.try_into().unwrap();
result.push(row);
}
Ok(result)
}
}