shindo_coding_utils 0.4.6

A utils crates which will be used in various micro-services
Documentation
use crate::schemas::historical_data::{
    self, ActiveModel as HistoricalDataActiveModel, Entity, Model as HistoricalDataModel,
};
use crate::services::fireant::FireAntStockData;
use crate::types::SortOrder;
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use sea_orm::sea_query::*;
use sea_orm::*;
use std::sync::Arc;

/// Trait defining the repository for historical stock data.
#[async_trait]
pub trait HistoricalDataRepository: Send + Sync {
    /// Upserts multiple historical data records into the database, ignoring duplicates.
    async fn upsert_historical_data(
        &self,
        rows: Vec<FireAntStockData>,
    ) -> Result<InsertResult<HistoricalDataActiveModel>, DbErr>;

    /// Retrieves historical data based on symbol and time range.
    async fn get_historical_data_by_symbol_and_time(
        &self,
        options: HistoricalDataSearchOptions,
    ) -> Result<Vec<HistoricalDataModel>, DbErr>;

    async fn find_by_timerange(
        &self,
        ticker_code: String,
        start_time: DateTime<Utc>,
        end_time: DateTime<Utc>,
        sort_order: SortOrder,
    ) -> Result<Vec<HistoricalDataModel>, DbErr>;
}

/// SeaORM implementation of the HistoricalDataRepository.
pub struct SeaOrmHistoricalDataRepository {
    db_conn: Arc<DatabaseConnection>,
}

#[derive(Debug, Clone)]
pub struct HistoricalDataSearchOptions {
    pub symbol: String,
    pub end_date: String,
    pub limit: u64,
    pub order: SortOrder,
}

impl SeaOrmHistoricalDataRepository {
    /// Creates a new instance of the repository.
    pub fn new(db_conn: Arc<DatabaseConnection>) -> Self {
        Self { db_conn }
    }

    async fn upsert(
        &self,
        active_models: Vec<HistoricalDataActiveModel>,
    ) -> Result<InsertResult<HistoricalDataActiveModel>, DbErr> {
        let unique_columns = [
            historical_data::Column::Date,
            historical_data::Column::Ticker,
        ];
        let on_conflict = OnConflict::columns(unique_columns)
            .update_columns(unique_columns)
            .to_owned();

        let res = Entity::insert_many(active_models)
            .on_conflict(on_conflict.clone())
            .exec(&*self.db_conn)
            .await?;

        Ok(res)
    }
}

#[async_trait]
impl HistoricalDataRepository for SeaOrmHistoricalDataRepository {
    async fn upsert_historical_data(
        &self,
        rows: Vec<FireAntStockData>,
    ) -> Result<InsertResult<HistoricalDataActiveModel>, DbErr> {
        if rows.is_empty() {
            return Ok(InsertResult { last_insert_id: 0 });
        }

        let active_models: Vec<HistoricalDataActiveModel> =
            rows.into_iter().map(|row| row.into()).collect();

        self.upsert(active_models).await
    }

    async fn get_historical_data_by_symbol_and_time(
        &self,
        options: HistoricalDataSearchOptions,
    ) -> Result<Vec<HistoricalDataModel>, DbErr> {
        let end_date = options
            .end_date
            .parse::<chrono::NaiveDate>()
            .map_err(|_| DbErr::Custom("Invalid end date format".to_string()))?
            + Duration::days(1);

        let query = historical_data::Entity::find()
            .filter(historical_data::Column::Ticker.eq(options.symbol))
            .filter(historical_data::Column::Date.lte(end_date))
            .order_by_desc(historical_data::Column::Date); // Get latest data by default. Important: don't change this order

        let raw = query.limit(options.limit).all(&*self.db_conn).await;

        // Manually sort the results by `date` column if needed
        match options.order {
            SortOrder::Ascending => Ok(raw?.into_iter().rev().collect()), // Reverse the order for ascending
            SortOrder::Descending => Ok(raw?),
        }
    }

    /// Finds historical data records within a specified time range for a given ticker symbol.
    ///
    /// # Arguments
    ///
    /// * `ticker_code` - The stock ticker symbol to search for (e.g., "STB", "SAS")
    /// * `start_time` - The beginning of the time range (inclusive) in UTC
    /// * `end_time` - The end of the time range (inclusive) in UTC
    /// * `sort_order` - The order to sort results by date (`SortOrder::Ascending` or `SortOrder::Descending`)
    ///
    /// # Returns
    ///
    /// Returns a `Result` containing:
    /// * `Ok(Vec<HistoricalDataModel>)` - A vector of historical data records matching the criteria
    /// * `Err(DbErr)` - Database error if the query fails
    ///
    /// # Examples
    ///
    /// ```rust
    /// let start = Utc::now() - Duration::days(30);
    /// let end = Utc::now();
    /// let data = repository.find_by_timerange(
    ///     "STB".to_string(),
    ///     start,
    ///     end,
    ///     SortOrder::Descending
    /// ).await?;
    /// ```
    async fn find_by_timerange(
        &self,
        ticker_code: String,
        start_time: DateTime<Utc>,
        end_time: DateTime<Utc>,
        sort_order: SortOrder,
    ) -> Result<Vec<HistoricalDataModel>, DbErr> {
        let mut query = Entity::find()
            .filter(historical_data::Column::Ticker.contains(ticker_code.clone()))
            .filter(historical_data::Column::Date.gte(start_time))
            .filter(historical_data::Column::Date.lte(end_time));

        // Apply sorting based on sort_order parameter
        let sort_column = historical_data::Column::Date;
        query = match sort_order {
            SortOrder::Descending => query.order_by_desc(sort_column),
            SortOrder::Ascending => query.order_by_asc(sort_column),
        };

        let rows = query.all(&*self.db_conn).await?;

        if rows.is_empty() {
            return Ok(vec![]);
        }

        let mut result = Vec::new();
        for item in rows {
            let row: HistoricalDataModel = item.try_into().unwrap();
            result.push(row);
        }

        Ok(result)
    }
}