use async_stream::stream;
use async_trait::async_trait;
use futures::Stream;
use sea_orm::{
ColumnTrait, DatabaseConnection, DbErr, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder,
};
use std::pin::Pin;
use std::sync::Arc;
use crate::models::StockMarket;
use crate::schemas::stock_market_schema::{
self as stock_market_schema, Entity as StockMarketEntity, Model as StockMarketModel,
};
#[async_trait]
pub trait StockMarketRepository: Send + Sync {
fn get_tickers_by_cursor_stream(
&self,
cursor: Option<String>,
limit: u64,
) -> Pin<Box<dyn Stream<Item = Result<StockMarket, DbErr>> + Send>>;
async fn get_all_tickers(&self) -> Result<Vec<StockMarket>, DbErr>;
}
pub struct SeaOrmStockMarketRepository {
db_conn: Arc<DatabaseConnection>,
}
impl SeaOrmStockMarketRepository {
pub fn new(db_conn: Arc<DatabaseConnection>) -> Self {
Self { db_conn }
}
}
#[async_trait]
impl StockMarketRepository for SeaOrmStockMarketRepository {
async fn get_all_tickers(&self) -> Result<Vec<StockMarket>, DbErr> {
let db = self.db_conn.clone();
let models = StockMarketEntity::find()
.order_by_asc(stock_market_schema::Column::Ticker)
.all(&*db)
.await?;
Ok(models.into_iter().map(|m| StockMarket { ticker: m.ticker }).collect())
}
fn get_tickers_by_cursor_stream(
&self,
cursor: Option<String>,
limit: u64,
) -> Pin<Box<dyn Stream<Item = Result<StockMarket, DbErr>> + Send>> {
let db = self.db_conn.clone();
Box::pin(stream! {
let mut query = StockMarketEntity::find()
.order_by_asc(stock_market_schema::Column::Ticker);
if let Some(c) = cursor {
query = query.filter(stock_market_schema::Column::Ticker.gte(c));
}
let mut paginator = query.paginate(&*db, limit);
loop {
match paginator.fetch_and_next().await {
Ok(Some(tickers)) => {
for model in tickers {
yield Ok(StockMarket {
ticker: model.ticker,
});
}
}
Ok(None) => {
break;
}
Err(e) => {
yield Err(e);
break;
}
}
}
})
}
}
impl From<StockMarketModel> for StockMarket {
fn from(model: StockMarketModel) -> Self {
Self {
ticker: model.ticker,
}
}
}