shindo_coding_utils 0.3.5

A utils crates which will be used in various micro-services
Documentation
use async_trait::async_trait;
use sea_orm::*;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

use crate::{
    schemas::abnormal_stock_trade::{
        self, ActiveModel as AbnormalStockTradeActiveModel, Entity as AbnormalStockTradeEntity,
        Model as AbnormalStockTradeModel,
    },
    types::AbnormalStockTrade,
};

#[derive(Deserialize, Serialize, Debug)]
pub struct AbnormalStockTradeSearchOptions {
    /// action_date: YYYY-mm-DD
    pub action_time: String,
    pub analysis_type: String,
    pub score: i64,
}

#[async_trait]
pub trait AbnormalStockTradeRepository: Send + Sync {
    async fn insert_abnormal_stock_trade(
        &self,
        abnormal_record: AbnormalStockTrade,
    ) -> Result<AbnormalStockTradeModel, DbErr>;

    async fn find_abnormal_stock_trades(
        &self,
        options: AbnormalStockTradeSearchOptions,
    ) -> Result<Vec<AbnormalStockTradeModel>, DbErr>;
}

pub struct SeaOrmAbnormalStockTradeRepository {
    db_conn: Arc<DatabaseConnection>,
}

impl SeaOrmAbnormalStockTradeRepository {
    pub fn new(db_conn: Arc<DatabaseConnection>) -> Self {
        Self { db_conn }
    }
}

#[async_trait]
impl AbnormalStockTradeRepository for SeaOrmAbnormalStockTradeRepository {
    async fn insert_abnormal_stock_trade(
        &self,
        abnormal_record: AbnormalStockTrade,
    ) -> Result<AbnormalStockTradeModel, DbErr> {
        let records = AbnormalStockTradeEntity::find()
            .filter(abnormal_stock_trade::Column::Ticker.eq(abnormal_record.ticker.clone()))
            .filter(
                abnormal_stock_trade::Column::AnalysisType
                    .eq(abnormal_record.analysis_type.clone()),
            )
            .filter(abnormal_stock_trade::Column::ActionTime.like(format!(
                "%{}%",
                abnormal_record.action_time.format("%Y-%m-%d")
            )))
            .all(&*self.db_conn)
            .await?;

        if let Some(existing) = records.into_iter().next() {
            return Ok(existing);
        }

        let active_model = AbnormalStockTradeActiveModel {
            created_at: Set(abnormal_record.created_at),
            ticker: Set(abnormal_record.ticker),
            analysis_type: Set(abnormal_record.analysis_type),
            last_vol: Set(abnormal_record.last_vol),
            last_price: Set(abnormal_record.last_price),
            price_change: Set(abnormal_record.price_change),
            total_vol: Set(abnormal_record.total_vol),
            action_time: Set(abnormal_record.action_time),
            notification_sent: Set(abnormal_record.notification_sent),
            reason: Set(abnormal_record.reason),
            score: Set(abnormal_record.score),
            ..Default::default()
        };

        let res = abnormal_stock_trade::Entity::insert(active_model)
            .exec(&*self.db_conn)
            .await?;

        let inserted = abnormal_stock_trade::Entity::find_by_id(res.last_insert_id)
            .one(&*self.db_conn)
            .await?
            .ok_or(DbErr::Custom("Insert failed".into()))?;

        Ok(inserted)
    }

    async fn find_abnormal_stock_trades(
        &self,
        options: AbnormalStockTradeSearchOptions,
    ) -> Result<Vec<AbnormalStockTradeModel>, DbErr> {
        let AbnormalStockTradeSearchOptions { action_time, analysis_type, score, .. } = options;

        AbnormalStockTradeEntity::find()
            .filter(abnormal_stock_trade::Column::ActionTime.like(format!("%{}%", action_time)))
            .filter(abnormal_stock_trade::Column::NotificationSent.eq(false))
            .filter(
                Condition::all()
                    .add(
                        abnormal_stock_trade::Column::AnalysisType
                            .eq(analysis_type),
                    )
                    .add(abnormal_stock_trade::Column::Score.gte(score)),
            )
            .all(&*self.db_conn)
            .await
    }
}