use crate::schemas::historical_data::{
self, ActiveModel as HistoricalDataActiveModel, Entity, Model as HistoricalDataModel,
};
use crate::services::fireant::FireAntStockData;
use crate::types::SortOrder;
use async_trait::async_trait;
use chrono::Duration;
use sea_orm::sea_query::*;
use sea_orm::*;
use std::sync::Arc;
#[async_trait]
pub trait HistoricalDataRepository: Send + Sync {
async fn upsert_historical_data(
&self,
rows: Vec<FireAntStockData>,
) -> Result<InsertResult<HistoricalDataActiveModel>, DbErr>;
async fn get_historical_data_by_symbol_and_time(
&self,
options: HistoricalDataSearchOptions,
) -> Result<Vec<HistoricalDataModel>, DbErr>;
}
pub struct SeaOrmHistoricalDataRepository {
db_conn: Arc<DatabaseConnection>,
}
#[derive(Debug, Clone)]
pub struct HistoricalDataSearchOptions {
pub symbol: String,
pub end_date: String,
pub limit: u64,
pub order: SortOrder,
}
impl SeaOrmHistoricalDataRepository {
pub fn new(db_conn: Arc<DatabaseConnection>) -> Self {
Self { db_conn }
}
async fn upsert(
&self,
active_models: Vec<HistoricalDataActiveModel>,
) -> Result<InsertResult<HistoricalDataActiveModel>, DbErr> {
let unique_columns = [
historical_data::Column::Date,
historical_data::Column::Ticker,
];
let on_conflict = OnConflict::columns(unique_columns)
.update_columns(unique_columns)
.to_owned();
let res = Entity::insert_many(active_models)
.on_conflict(on_conflict.clone())
.exec(&*self.db_conn)
.await?;
Ok(res)
}
}
#[async_trait]
impl HistoricalDataRepository for SeaOrmHistoricalDataRepository {
async fn upsert_historical_data(
&self,
rows: Vec<FireAntStockData>,
) -> Result<InsertResult<HistoricalDataActiveModel>, DbErr> {
if rows.is_empty() {
return Ok(InsertResult { last_insert_id: 0 });
}
let active_models: Vec<HistoricalDataActiveModel> =
rows.into_iter().map(|row| row.into()).collect();
self.upsert(active_models).await
}
async fn get_historical_data_by_symbol_and_time(
&self,
options: HistoricalDataSearchOptions,
) -> Result<Vec<HistoricalDataModel>, DbErr> {
let end_date = options
.end_date
.parse::<chrono::NaiveDate>()
.map_err(|_| DbErr::Custom("Invalid end date format".to_string()))?
+ Duration::days(1);
let query = historical_data::Entity::find()
.filter(historical_data::Column::Ticker.eq(options.symbol))
.filter(historical_data::Column::Date.lte(end_date))
.order_by_desc(historical_data::Column::Date);
let raw = query.limit(options.limit).all(&*self.db_conn).await;
match options.order {
SortOrder::Ascending => Ok(raw?.into_iter().rev().collect()), SortOrder::Descending => Ok(raw?),
}
}
}