shindo_coding_utils 0.2.2

A utils crates which will be used in various micro-services
Documentation
use crate::schemas::historical_data::{
    self, ActiveModel as HistoricalDataActiveModel, Entity, Model as HistoricalDataModel,
};
use crate::services::fireant::FireAntStockData;
use crate::types::SortOrder;
use async_trait::async_trait;
use chrono::Duration;
use sea_orm::sea_query::*;
use sea_orm::*;
use std::sync::Arc;

/// Trait defining the repository for historical stock data.
#[async_trait]
pub trait HistoricalDataRepository: Send + Sync {
    /// Upserts multiple historical data records into the database, ignoring duplicates.
    async fn upsert_historical_data(
        &self,
        rows: Vec<FireAntStockData>,
    ) -> Result<InsertResult<HistoricalDataActiveModel>, DbErr>;

    /// Retrieves historical data based on symbol and time range.
    async fn get_historical_data_by_symbol_and_time(
        &self,
        options: HistoricalDataSearchOptions,
    ) -> Result<Vec<HistoricalDataModel>, DbErr>;
}

/// SeaORM implementation of the HistoricalDataRepository.
pub struct SeaOrmHistoricalDataRepository {
    db_conn: Arc<DatabaseConnection>,
}

#[derive(Debug, Clone)]
pub struct HistoricalDataSearchOptions {
    pub symbol: String,
    pub end_date: String,
    pub limit: u64,
    pub order: SortOrder,
}

impl SeaOrmHistoricalDataRepository {
    /// Creates a new instance of the repository.
    pub fn new(db_conn: Arc<DatabaseConnection>) -> Self {
        Self { db_conn }
    }

    async fn upsert(
        &self,
        active_models: Vec<HistoricalDataActiveModel>,
    ) -> Result<InsertResult<HistoricalDataActiveModel>, DbErr> {
        let unique_columns = [
            historical_data::Column::Date,
            historical_data::Column::Ticker,
        ];
        let on_conflict = OnConflict::columns(unique_columns)
            .update_columns(unique_columns)
            .to_owned();

        let res = Entity::insert_many(active_models)
            .on_conflict(on_conflict.clone())
            .exec(&*self.db_conn)
            .await?;

        Ok(res)
    }
}

#[async_trait]
impl HistoricalDataRepository for SeaOrmHistoricalDataRepository {
    async fn upsert_historical_data(
        &self,
        rows: Vec<FireAntStockData>,
    ) -> Result<InsertResult<HistoricalDataActiveModel>, DbErr> {
        if rows.is_empty() {
            return Ok(InsertResult { last_insert_id: 0 });
        }

        let active_models: Vec<HistoricalDataActiveModel> =
            rows.into_iter().map(|row| row.into()).collect();

        self.upsert(active_models).await
    }

    async fn get_historical_data_by_symbol_and_time(
        &self,
        options: HistoricalDataSearchOptions,
    ) -> Result<Vec<HistoricalDataModel>, DbErr> {
        let end_date = options
            .end_date
            .parse::<chrono::NaiveDate>()
            .map_err(|_| DbErr::Custom("Invalid end date format".to_string()))?
            + Duration::days(1);

        let query = historical_data::Entity::find()
            .filter(historical_data::Column::Ticker.eq(options.symbol))
            .filter(historical_data::Column::Date.lte(end_date))
            .order_by_desc(historical_data::Column::Date); // Get latest data by default. Important: don't change this order

        let raw = query.limit(options.limit).all(&*self.db_conn).await;

        // Manually sort the results by `date` column if needed
        match options.order {
            SortOrder::Ascending => Ok(raw?.into_iter().rev().collect()), // Reverse the order for ascending
            SortOrder::Descending => Ok(raw?),
        }
    }
}