shindo_coding_utils 0.4.5

A utils crates which will be used in various micro-services
Documentation
use async_trait::async_trait;
use sea_orm::ActiveValue::Set;
use sea_orm::sea_query::OnConflict;
use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, InsertResult, QueryFilter, QueryOrder};
use std::sync::Arc;

use crate::models::IntradayTrading;
use crate::schemas::intraday_trading::{self as intraday_trading};
use crate::types::TimeRange;

#[derive(Debug, Clone)]
pub struct FindOptions {
    pub ticker: String,
    pub timerange: TimeRange,
}

#[async_trait]
pub trait IntradayTradingRepository: Send + Sync {
    async fn insert_list_stock_trade(
        &self,
        rows: Vec<IntradayTrading>,
    ) -> Result<InsertResult<intraday_trading::ActiveModel>, DbErr>;

    /// Find intraday data by ticker and trading timestamp.
    /// This method returns all records that match the ticker and trading timestamp.
    /// The `trading_timestamp` should be in the format "YYYY-MM-DD".
    async fn find_intraday_data_by_ticker(
        &self,
        ticker: String,
        trading_timestamp: String,
    ) -> Result<Vec<IntradayTrading>, DbErr>;

    async fn find(&self, options: FindOptions) -> Result<Vec<IntradayTrading>, DbErr>;
}

pub struct SeaOrmIntradayTradingRepository {
    db_conn: Arc<DatabaseConnection>,
}

impl SeaOrmIntradayTradingRepository {
    pub fn new(db_conn: Arc<DatabaseConnection>) -> Self {
        Self { db_conn }
    }

    async fn upsert(
        &self,
        data: Vec<IntradayTrading>,
    ) -> Result<InsertResult<intraday_trading::ActiveModel>, DbErr> {
        let unique_columns = [
            intraday_trading::Column::Change,
            intraday_trading::Column::LastPrice,
            intraday_trading::Column::LastVol,
            intraday_trading::Column::Ticker,
            intraday_trading::Column::TotalVol,
            intraday_trading::Column::TradingTimestamp,
        ];
        let on_conflict = OnConflict::columns(unique_columns)
            .update_columns(unique_columns)
            .to_owned();

        let active_models: Vec<intraday_trading::ActiveModel> = data
            .into_iter()
            .map(|item| intraday_trading::ActiveModel {
                id: Set(0), // ID will be auto-generated by the database
                change: Set(item.change),
                last_price: Set(item.last_price),
                last_vol: Set(item.last_vol),
                side: Set(item.side),
                ticker: Set(item.ticker),
                total_vol: Set(item.total_vol),
                trading_timestamp: Set(item.trading_timestamp),
            })
            .collect();

        let res = intraday_trading::Entity::insert_many(active_models)
            .on_conflict(on_conflict.clone())
            .exec(&*self.db_conn)
            .await?;

        Ok(res)
    }
}

#[async_trait]
impl IntradayTradingRepository for SeaOrmIntradayTradingRepository {
    async fn insert_list_stock_trade(
        &self,
        rows: Vec<IntradayTrading>,
    ) -> Result<InsertResult<intraday_trading::ActiveModel>, DbErr> {
        if rows.is_empty() {
            return Ok(InsertResult {
                last_insert_id: Default::default(),
            });
        }

        let result = self.upsert(rows).await?;
        let last_id = result.last_insert_id;
        Ok(InsertResult {
            last_insert_id: last_id as i32,
        })
    }

    /// Find intraday data by ticker and trading timestamp.
    /// This method returns all records that match the ticker and trading timestamp.
    /// The `trading_timestamp` should be in the format "YYYY-MM-DD".
    async fn find_intraday_data_by_ticker(
        &self,
        ticker: String,
        trading_timestamp: String,
    ) -> Result<Vec<IntradayTrading>, DbErr> {
        let db = &*self.db_conn;
        let records = intraday_trading::Entity::find()
            .filter(
                intraday_trading::Column::Ticker.eq(ticker).and(
                    intraday_trading::Column::TradingTimestamp
                        .like(format!("%{}%", trading_timestamp)),
                ),
            )
            .all(db)
            .await;

        match records {
            Ok(models) => {
                let results = models
                    .into_iter()
                    .map(|model| IntradayTrading {
                        change: model.change,
                        last_price: model.last_price,
                        last_vol: model.last_vol,
                        side: model.side,
                        ticker: model.ticker,
                        total_vol: model.total_vol,
                        trading_timestamp: model.trading_timestamp,
                    })
                    .collect();
                Ok(results)
            }
            Err(e) => Err(e),
        }
    }

    async fn find(&self, options: FindOptions) -> Result<Vec<IntradayTrading>, DbErr> {
        let db = &*self.db_conn;
        let query = intraday_trading::Entity::find()
            .filter(intraday_trading::Column::Ticker.eq(options.ticker))
            .filter(intraday_trading::Column::TradingTimestamp.gte(options.timerange.start))
            .filter(intraday_trading::Column::TradingTimestamp.lte(options.timerange.end))
            .order_by_asc(intraday_trading::Column::TradingTimestamp);

        let records = query.all(db).await;

        match records {
            Ok(models) => {
                let results = models
                    .into_iter()
                    .map(|model| IntradayTrading {
                        change: model.change,
                        last_price: model.last_price,
                        last_vol: model.last_vol,
                        side: model.side,
                        ticker: model.ticker,
                        total_vol: model.total_vol,
                        trading_timestamp: model.trading_timestamp,
                    })
                    .collect();
                Ok(results)
            }
            Err(e) => Err(e),
        }
    }
}