use crate::{
Db,
types::{
BidderId, DemandId, PortfolioDemandHistoryRow, PortfolioId, PortfolioProductHistoryRow,
PortfolioRow, ProductId,
},
};
use fts_core::{
models::{
DateTimeRangeQuery, DateTimeRangeResponse, DemandGroup, PortfolioRecord, ProductGroup,
ValueRecord,
},
ports::PortfolioRepository,
};
impl<PortfolioData: Send + Unpin + serde::Serialize + serde::de::DeserializeOwned>
PortfolioRepository<PortfolioData> for Db
{
async fn get_portfolio_bidder_id(
&self,
portfolio_id: Self::PortfolioId,
) -> Result<Option<Self::BidderId>, Self::Error> {
sqlx::query_scalar!(
r#"
select
bidder_id as "id!: BidderId"
from
portfolio
where
id = $1
"#,
portfolio_id
)
.fetch_optional(&self.reader)
.await
}
async fn query_portfolio(
&self,
bidder_ids: &[Self::BidderId],
as_of: Self::DateTime,
) -> Result<Vec<Self::PortfolioId>, Self::Error> {
if bidder_ids.len() == 0 {
Ok(Vec::new())
} else {
let bidder_ids = sqlx::types::Json(bidder_ids);
sqlx::query_scalar!(
r#"
select distinct
portfolio.id as "id!: PortfolioId"
from
portfolio
join
demand_group
on
portfolio.id = demand_group.portfolio_id
join
json_each($1) as bidder_ids
on
portfolio.bidder_id = bidder_ids.atom
where
valid_from <= $2
and
($2 < valid_until or valid_until is null)
"#,
bidder_ids,
as_of,
)
.fetch_all(&self.reader)
.await
}
}
async fn create_portfolio(
&self,
portfolio_id: Self::PortfolioId,
bidder_id: Self::BidderId,
app_data: PortfolioData,
demand_group: DemandGroup<Self::DemandId>,
product_group: ProductGroup<Self::ProductId>,
as_of: Self::DateTime,
) -> Result<(), Self::Error> {
let app_data = sqlx::types::Json(app_data);
let demand_group = sqlx::types::Json(demand_group);
let product_group = sqlx::types::Json(product_group);
sqlx::query!(
r#"
insert into
portfolio (id, as_of, bidder_id, app_data, demand_group, product_group)
values
($1, $2, $3, jsonb($4), jsonb($5), jsonb($6))
"#,
portfolio_id,
as_of,
bidder_id,
app_data,
demand_group,
product_group
)
.execute(&self.writer)
.await?;
Ok(())
}
async fn update_portfolio(
&self,
portfolio_id: Self::PortfolioId,
demand_group: Option<DemandGroup<Self::DemandId>>,
product_group: Option<ProductGroup<Self::ProductId>>,
as_of: Self::DateTime,
) -> Result<bool, Self::Error> {
let updated = match (demand_group, product_group) {
(Some(demand_group), Some(product_group)) => {
let demand_group = sqlx::types::Json(demand_group);
let product_group = sqlx::types::Json(product_group);
let query = sqlx::query!(
r#"
update
portfolio
set
as_of = $2,
demand_group = jsonb($3),
product_group = jsonb($4)
where
id = $1
"#,
portfolio_id,
as_of,
demand_group,
product_group,
)
.execute(&self.writer)
.await?;
query.rows_affected() > 0
}
(Some(demand_group), None) => {
let demand_group = sqlx::types::Json(demand_group);
let query = sqlx::query!(
r#"
update
portfolio
set
as_of = $2,
demand_group = jsonb($3)
where
id = $1
"#,
portfolio_id,
as_of,
demand_group,
)
.execute(&self.writer)
.await?;
query.rows_affected() > 0
}
(None, Some(product_group)) => {
let product_group = sqlx::types::Json(product_group);
let query = sqlx::query!(
r#"
update
portfolio
set
as_of = $2,
product_group = jsonb($3)
where
id = $1
"#,
portfolio_id,
as_of,
product_group,
)
.execute(&self.writer)
.await?;
query.rows_affected() > 0
}
(None, None) => false,
};
Ok(updated)
}
async fn get_portfolio(
&self,
portfolio_id: Self::PortfolioId,
as_of: Self::DateTime,
) -> Result<
Option<
PortfolioRecord<
Self::DateTime,
Self::BidderId,
Self::PortfolioId,
Self::DemandId,
Self::ProductId,
PortfolioData,
>,
>,
Self::Error,
> {
let query = sqlx::query_file_as!(
PortfolioRow,
"queries/get_portfolio_by_id.sql",
portfolio_id,
as_of
)
.fetch_optional(&self.reader)
.await?;
Ok(query.map(|row| PortfolioRecord {
id: portfolio_id,
as_of,
bidder_id: row.bidder_id,
app_data: row.app_data.0,
demand_group: row.demand_group.map(|data| data.0).unwrap_or_default(),
product_group: row.product_group.map(|data| data.0).unwrap_or_default(),
}))
}
async fn get_portfolio_demand_history(
&self,
portfolio_id: Self::PortfolioId,
query: DateTimeRangeQuery<Self::DateTime>,
limit: usize,
) -> Result<
DateTimeRangeResponse<
ValueRecord<Self::DateTime, DemandGroup<Self::DemandId>>,
Self::DateTime,
>,
Self::Error,
> {
let limit_p1 = (limit + 1) as i64;
let mut rows = sqlx::query_as!(
PortfolioDemandHistoryRow,
r#"
select
valid_from as "valid_from!: crate::types::DateTime",
valid_until as "valid_until?: crate::types::DateTime",
json_group_object(demand_id, weight) as "demand_group!: sqlx::types::Json<DemandGroup<DemandId>>"
from
demand_group
where
portfolio_id = $1
and
($2 is null or valid_from >= $2)
and
($3 is null or valid_until is null or valid_until < $3)
group by
valid_from
order by
valid_from desc
limit $4
"#,
portfolio_id,
query.after,
query.before,
limit_p1,
)
.fetch_all(&self.reader)
.await?;
let more = if rows.len() == limit + 1 {
let extra = rows.pop().unwrap();
Some(DateTimeRangeQuery {
before: Some(extra.valid_from),
after: query.after,
})
} else {
None
};
Ok(DateTimeRangeResponse {
results: rows.into_iter().map(Into::into).collect(),
more,
})
}
async fn get_portfolio_product_history(
&self,
portfolio_id: Self::PortfolioId,
query: DateTimeRangeQuery<Self::DateTime>,
limit: usize,
) -> Result<
DateTimeRangeResponse<
ValueRecord<Self::DateTime, ProductGroup<Self::ProductId>>,
Self::DateTime,
>,
Self::Error,
> {
let limit_p1 = (limit + 1) as i64;
let mut rows = sqlx::query_as!(
PortfolioProductHistoryRow,
r#"
select
valid_from as "valid_from!: crate::types::DateTime",
valid_until as "valid_until?: crate::types::DateTime",
json_group_object(product_id, weight) as "product_group!: sqlx::types::Json<ProductGroup<ProductId>>"
from
product_group
where
portfolio_id = $1
and
($2 is null or valid_from >= $2)
and
($3 is null or valid_until is null or valid_until < $3)
group by
valid_from
order by
valid_from desc
limit $4
"#,
portfolio_id,
query.after,
query.before,
limit_p1,
)
.fetch_all(&self.reader)
.await?;
let more = if rows.len() == limit + 1 {
let extra = rows.pop().unwrap();
Some(DateTimeRangeQuery {
before: Some(extra.valid_from),
after: query.after,
})
} else {
None
};
Ok(DateTimeRangeResponse {
results: rows.into_iter().map(Into::into).collect(),
more,
})
}
}