1use crate::{
2 Db,
3 types::{
4 BidderId, DemandId, PortfolioDemandHistoryRow, PortfolioId, PortfolioProductHistoryRow,
5 PortfolioRow, ProductId,
6 },
7};
8use fts_core::{
9 models::{
10 DateTimeRangeQuery, DateTimeRangeResponse, DemandGroup, PortfolioRecord, ProductGroup,
11 ValueRecord,
12 },
13 ports::PortfolioRepository,
14};
15
16impl<PortfolioData: Send + Unpin + serde::Serialize + serde::de::DeserializeOwned>
17 PortfolioRepository<PortfolioData> for Db
18{
19 async fn get_portfolio_bidder_id(
20 &self,
21 portfolio_id: Self::PortfolioId,
22 ) -> Result<Option<Self::BidderId>, Self::Error> {
23 sqlx::query_scalar!(
24 r#"
25 select
26 bidder_id as "id!: BidderId"
27 from
28 portfolio
29 where
30 id = $1
31 "#,
32 portfolio_id
33 )
34 .fetch_optional(&self.reader)
35 .await
36 }
37
38 async fn query_portfolio(
39 &self,
40 bidder_ids: &[Self::BidderId],
41 as_of: Self::DateTime,
42 ) -> Result<Vec<Self::PortfolioId>, Self::Error> {
43 if bidder_ids.len() == 0 {
44 Ok(Vec::new())
45 } else {
46 let bidder_ids = sqlx::types::Json(bidder_ids);
47 sqlx::query_scalar!(
48 r#"
49 select distinct
50 portfolio.id as "id!: PortfolioId"
51 from
52 portfolio
53 join
54 demand_group
55 on
56 portfolio.id = demand_group.portfolio_id
57 join
58 json_each($1) as bidder_ids
59 on
60 portfolio.bidder_id = bidder_ids.atom
61 where
62 valid_from <= $2
63 and
64 ($2 < valid_until or valid_until is null)
65 "#,
66 bidder_ids,
67 as_of,
68 )
69 .fetch_all(&self.reader)
70 .await
71 }
72 }
73
74 async fn create_portfolio(
75 &self,
76 portfolio_id: Self::PortfolioId,
77 bidder_id: Self::BidderId,
78 app_data: PortfolioData,
79 demand_group: DemandGroup<Self::DemandId>,
80 product_group: ProductGroup<Self::ProductId>,
81 as_of: Self::DateTime,
82 ) -> Result<(), Self::Error> {
83 let app_data = sqlx::types::Json(app_data);
84 let demand_group = sqlx::types::Json(demand_group);
85 let product_group = sqlx::types::Json(product_group);
86 sqlx::query!(
87 r#"
88 insert into
89 portfolio (id, as_of, bidder_id, app_data, demand_group, product_group)
90 values
91 ($1, $2, $3, jsonb($4), jsonb($5), jsonb($6))
92 "#,
93 portfolio_id,
94 as_of,
95 bidder_id,
96 app_data,
97 demand_group,
98 product_group
99 )
100 .execute(&self.writer)
101 .await?;
102 Ok(())
103 }
104
105 async fn update_portfolio(
106 &self,
107 portfolio_id: Self::PortfolioId,
108 demand_group: Option<DemandGroup<Self::DemandId>>,
109 product_group: Option<ProductGroup<Self::ProductId>>,
110 as_of: Self::DateTime,
111 ) -> Result<bool, Self::Error> {
112 let updated = match (demand_group, product_group) {
113 (Some(demand_group), Some(product_group)) => {
114 let demand_group = sqlx::types::Json(demand_group);
115 let product_group = sqlx::types::Json(product_group);
116 let query = sqlx::query!(
117 r#"
118 update
119 portfolio
120 set
121 as_of = $2,
122 demand_group = jsonb($3),
123 product_group = jsonb($4)
124 where
125 id = $1
126 "#,
127 portfolio_id,
128 as_of,
129 demand_group,
130 product_group,
131 )
132 .execute(&self.writer)
133 .await?;
134 query.rows_affected() > 0
135 }
136 (Some(demand_group), None) => {
137 let demand_group = sqlx::types::Json(demand_group);
138 let query = sqlx::query!(
139 r#"
140 update
141 portfolio
142 set
143 as_of = $2,
144 demand_group = jsonb($3)
145 where
146 id = $1
147 "#,
148 portfolio_id,
149 as_of,
150 demand_group,
151 )
152 .execute(&self.writer)
153 .await?;
154 query.rows_affected() > 0
155 }
156 (None, Some(product_group)) => {
157 let product_group = sqlx::types::Json(product_group);
158 let query = sqlx::query!(
159 r#"
160 update
161 portfolio
162 set
163 as_of = $2,
164 product_group = jsonb($3)
165 where
166 id = $1
167 "#,
168 portfolio_id,
169 as_of,
170 product_group,
171 )
172 .execute(&self.writer)
173 .await?;
174 query.rows_affected() > 0
175 }
176 (None, None) => false,
177 };
178
179 Ok(updated)
180 }
181
182 async fn get_portfolio(
183 &self,
184 portfolio_id: Self::PortfolioId,
185 as_of: Self::DateTime,
186 ) -> Result<
187 Option<
188 PortfolioRecord<
189 Self::DateTime,
190 Self::BidderId,
191 Self::PortfolioId,
192 Self::DemandId,
193 Self::ProductId,
194 PortfolioData,
195 >,
196 >,
197 Self::Error,
198 > {
199 let query = sqlx::query_file_as!(
200 PortfolioRow,
201 "queries/get_portfolio_by_id.sql",
202 portfolio_id,
203 as_of
204 )
205 .fetch_optional(&self.reader)
206 .await?;
207
208 Ok(query.map(|row| PortfolioRecord {
209 id: portfolio_id,
210 as_of,
211 bidder_id: row.bidder_id,
212 app_data: row.app_data.0,
213 demand_group: row.demand_group.map(|data| data.0).unwrap_or_default(),
214 product_group: row.product_group.map(|data| data.0).unwrap_or_default(),
215 }))
216 }
217
218 async fn get_portfolio_demand_history(
225 &self,
226 portfolio_id: Self::PortfolioId,
227 query: DateTimeRangeQuery<Self::DateTime>,
228 limit: usize,
229 ) -> Result<
230 DateTimeRangeResponse<
231 ValueRecord<Self::DateTime, DemandGroup<Self::DemandId>>,
232 Self::DateTime,
233 >,
234 Self::Error,
235 > {
236 let limit_p1 = (limit + 1) as i64;
237 let mut rows = sqlx::query_as!(
238 PortfolioDemandHistoryRow,
239 r#"
240 select
241 valid_from as "valid_from!: crate::types::DateTime",
242 valid_until as "valid_until?: crate::types::DateTime",
243 json_group_object(demand_id, weight) as "demand_group!: sqlx::types::Json<DemandGroup<DemandId>>"
244 from
245 demand_group
246 where
247 portfolio_id = $1
248 and
249 ($2 is null or valid_from >= $2)
250 and
251 ($3 is null or valid_until is null or valid_until < $3)
252 group by
253 valid_from
254 order by
255 valid_from desc
256 limit $4
257 "#,
258 portfolio_id,
259 query.after,
260 query.before,
261 limit_p1,
262 )
263 .fetch_all(&self.reader)
264 .await?;
265
266 let more = if rows.len() == limit + 1 {
267 let extra = rows.pop().unwrap();
268 Some(DateTimeRangeQuery {
269 before: Some(extra.valid_from),
270 after: query.after,
271 })
272 } else {
273 None
274 };
275
276 Ok(DateTimeRangeResponse {
277 results: rows.into_iter().map(Into::into).collect(),
278 more,
279 })
280 }
281
282 async fn get_portfolio_product_history(
289 &self,
290 portfolio_id: Self::PortfolioId,
291 query: DateTimeRangeQuery<Self::DateTime>,
292 limit: usize,
293 ) -> Result<
294 DateTimeRangeResponse<
295 ValueRecord<Self::DateTime, ProductGroup<Self::ProductId>>,
296 Self::DateTime,
297 >,
298 Self::Error,
299 > {
300 let limit_p1 = (limit + 1) as i64;
301 let mut rows = sqlx::query_as!(
302 PortfolioProductHistoryRow,
303 r#"
304 select
305 valid_from as "valid_from!: crate::types::DateTime",
306 valid_until as "valid_until?: crate::types::DateTime",
307 json_group_object(product_id, weight) as "product_group!: sqlx::types::Json<ProductGroup<ProductId>>"
308 from
309 product_group
310 where
311 portfolio_id = $1
312 and
313 ($2 is null or valid_from >= $2)
314 and
315 ($3 is null or valid_until is null or valid_until < $3)
316 group by
317 valid_from
318 order by
319 valid_from desc
320 limit $4
321 "#,
322 portfolio_id,
323 query.after,
324 query.before,
325 limit_p1,
326 )
327 .fetch_all(&self.reader)
328 .await?;
329
330 let more = if rows.len() == limit + 1 {
331 let extra = rows.pop().unwrap();
332 Some(DateTimeRangeQuery {
333 before: Some(extra.valid_from),
334 after: query.after,
335 })
336 } else {
337 None
338 };
339
340 Ok(DateTimeRangeResponse {
341 results: rows.into_iter().map(Into::into).collect(),
342 more,
343 })
344 }
345}