shindo_coding_utils 0.3.9

A utils crates which will be used in various micro-services
Documentation
use async_stream::stream;
use async_trait::async_trait;
use futures::Stream;
use sea_orm::{
    ColumnTrait, DatabaseConnection, DbErr, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder,
};
use std::pin::Pin;
use std::sync::Arc;

use crate::models::StockMarket;
use crate::schemas::stock_market_schema::{
    self as stock_market_schema, Entity as StockMarketEntity, Model as StockMarketModel,
};

#[async_trait]
pub trait StockMarketRepository: Send + Sync {
    fn get_tickers_by_cursor_stream(
        &self,
        cursor: Option<String>,
        limit: u64,
    ) -> Pin<Box<dyn Stream<Item = Result<StockMarket, DbErr>> + Send>>;

    async fn get_all_tickers(&self) -> Result<Vec<StockMarket>, DbErr>;

    async fn get_all_tickers_by_cursor(&self, ticker: String) -> Result<Vec<StockMarket>, DbErr>;
}

pub struct SeaOrmStockMarketRepository {
    db_conn: Arc<DatabaseConnection>,
}

impl SeaOrmStockMarketRepository {
    pub fn new(db_conn: Arc<DatabaseConnection>) -> Self {
        Self { db_conn }
    }
}

#[async_trait]
impl StockMarketRepository for SeaOrmStockMarketRepository {
    async fn get_all_tickers(&self) -> Result<Vec<StockMarket>, DbErr> {
        let db = self.db_conn.clone();
        let models = StockMarketEntity::find()
            .order_by_asc(stock_market_schema::Column::Ticker)
            .all(&*db)
            .await?;
        Ok(models
            .into_iter()
            .map(|m| StockMarket {
                ticker: m.ticker,
                market: m.market_name,
                stock_type: m.stock_type,
            })
            .collect())
    }

    async fn get_all_tickers_by_cursor(&self, ticker: String) -> Result<Vec<StockMarket>, DbErr> {
        let db = self.db_conn.clone();
        let models = StockMarketEntity::find()
            .filter(stock_market_schema::Column::Ticker.gte(ticker))
            .order_by_asc(stock_market_schema::Column::Ticker)
            .all(&*db)
            .await?;
        Ok(models
            .into_iter()
            .map(|m| StockMarket {
                ticker: m.ticker,
                market: m.market_name,
                stock_type: m.stock_type,
            })
            .collect())
    }

    fn get_tickers_by_cursor_stream(
        &self,
        cursor: Option<String>,
        limit: u64,
    ) -> Pin<Box<dyn Stream<Item = Result<StockMarket, DbErr>> + Send>> {
        let db = self.db_conn.clone();

        Box::pin(stream! {
            let mut query = StockMarketEntity::find()
                .order_by_asc(stock_market_schema::Column::Ticker);

            if let Some(c) = cursor {
                query = query.filter(stock_market_schema::Column::Ticker.gte(c));
            }

            let mut paginator = query.paginate(&*db, limit);

            loop {
                match paginator.fetch_and_next().await {
                    Ok(Some(tickers)) => {
                        for model in tickers {
                            yield Ok(StockMarket {
                                ticker: model.ticker,
                                market: model.market_name,
                                stock_type: model.stock_type,
                            });
                        }
                    }
                    Ok(None) => {
                        // End of stream, break the loop
                        break;
                    }
                    Err(e) => {
                        // Yield the database error and break the loop
                        yield Err(e);
                        break;
                    }
                }
            }
        })
    }
}

impl From<StockMarketModel> for StockMarket {
    fn from(model: StockMarketModel) -> Self {
        Self {
            ticker: model.ticker,
            market: model.market_name,
            stock_type: model.stock_type,
        }
    }
}