fts_sqlite/impl/
portfolio.rs

1use crate::{
2    Db,
3    types::{
4        BidderId, DemandId, PortfolioDemandHistoryRow, PortfolioId, PortfolioProductHistoryRow,
5        PortfolioRow, ProductId,
6    },
7};
8use fts_core::{
9    models::{
10        DateTimeRangeQuery, DateTimeRangeResponse, DemandGroup, PortfolioRecord, ProductGroup,
11        ValueRecord,
12    },
13    ports::PortfolioRepository,
14};
15
16impl<PortfolioData: Send + Unpin + serde::Serialize + serde::de::DeserializeOwned>
17    PortfolioRepository<PortfolioData> for Db
18{
19    async fn get_portfolio_bidder_id(
20        &self,
21        portfolio_id: Self::PortfolioId,
22    ) -> Result<Option<Self::BidderId>, Self::Error> {
23        sqlx::query_scalar!(
24            r#"
25            select
26                bidder_id as "id!: BidderId"
27            from
28                portfolio
29            where
30                id = $1
31            "#,
32            portfolio_id
33        )
34        .fetch_optional(&self.reader)
35        .await
36    }
37
38    async fn query_portfolio(
39        &self,
40        bidder_ids: &[Self::BidderId],
41        as_of: Self::DateTime,
42    ) -> Result<Vec<Self::PortfolioId>, Self::Error> {
43        if bidder_ids.len() == 0 {
44            Ok(Vec::new())
45        } else {
46            let bidder_ids = sqlx::types::Json(bidder_ids);
47            sqlx::query_scalar!(
48                r#"
49                select distinct
50                    portfolio.id as "id!: PortfolioId"
51                from
52                    portfolio
53                join
54                    demand_group
55                on
56                    portfolio.id = demand_group.portfolio_id
57                join
58                    json_each($1) as bidder_ids
59                on
60                    portfolio.bidder_id = bidder_ids.atom
61                where
62                    valid_from <= $2
63                and
64                    ($2 < valid_until or valid_until is null) 
65                "#,
66                bidder_ids,
67                as_of,
68            )
69            .fetch_all(&self.reader)
70            .await
71        }
72    }
73
74    async fn create_portfolio(
75        &self,
76        portfolio_id: Self::PortfolioId,
77        bidder_id: Self::BidderId,
78        app_data: PortfolioData,
79        demand_group: DemandGroup<Self::DemandId>,
80        product_group: ProductGroup<Self::ProductId>,
81        as_of: Self::DateTime,
82    ) -> Result<(), Self::Error> {
83        let app_data = sqlx::types::Json(app_data);
84        let demand_group = sqlx::types::Json(demand_group);
85        let product_group = sqlx::types::Json(product_group);
86        sqlx::query!(
87            r#"
88            insert into
89                portfolio (id, as_of, bidder_id, app_data, demand_group, product_group)
90            values
91                ($1, $2, $3, jsonb($4), jsonb($5), jsonb($6))
92            "#,
93            portfolio_id,
94            as_of,
95            bidder_id,
96            app_data,
97            demand_group,
98            product_group
99        )
100        .execute(&self.writer)
101        .await?;
102        Ok(())
103    }
104
105    async fn update_portfolio(
106        &self,
107        portfolio_id: Self::PortfolioId,
108        demand_group: Option<DemandGroup<Self::DemandId>>,
109        product_group: Option<ProductGroup<Self::ProductId>>,
110        as_of: Self::DateTime,
111    ) -> Result<bool, Self::Error> {
112        let updated = match (demand_group, product_group) {
113            (Some(demand_group), Some(product_group)) => {
114                let demand_group = sqlx::types::Json(demand_group);
115                let product_group = sqlx::types::Json(product_group);
116                let query = sqlx::query!(
117                    r#"
118                    update
119                        portfolio
120                    set
121                        as_of = $2,
122                        demand_group = jsonb($3),
123                        product_group = jsonb($4)
124                    where
125                        id = $1
126                    "#,
127                    portfolio_id,
128                    as_of,
129                    demand_group,
130                    product_group,
131                )
132                .execute(&self.writer)
133                .await?;
134                query.rows_affected() > 0
135            }
136            (Some(demand_group), None) => {
137                let demand_group = sqlx::types::Json(demand_group);
138                let query = sqlx::query!(
139                    r#"
140                    update
141                        portfolio
142                    set
143                        as_of = $2,
144                        demand_group = jsonb($3)
145                    where
146                        id = $1
147                    "#,
148                    portfolio_id,
149                    as_of,
150                    demand_group,
151                )
152                .execute(&self.writer)
153                .await?;
154                query.rows_affected() > 0
155            }
156            (None, Some(product_group)) => {
157                let product_group = sqlx::types::Json(product_group);
158                let query = sqlx::query!(
159                    r#"
160                    update
161                        portfolio
162                    set
163                        as_of = $2,
164                        product_group = jsonb($3)
165                    where
166                        id = $1
167                    "#,
168                    portfolio_id,
169                    as_of,
170                    product_group,
171                )
172                .execute(&self.writer)
173                .await?;
174                query.rows_affected() > 0
175            }
176            (None, None) => false,
177        };
178
179        Ok(updated)
180    }
181
182    async fn get_portfolio(
183        &self,
184        portfolio_id: Self::PortfolioId,
185        as_of: Self::DateTime,
186    ) -> Result<
187        Option<
188            PortfolioRecord<
189                Self::DateTime,
190                Self::BidderId,
191                Self::PortfolioId,
192                Self::DemandId,
193                Self::ProductId,
194                PortfolioData,
195            >,
196        >,
197        Self::Error,
198    > {
199        let query = sqlx::query_file_as!(
200            PortfolioRow,
201            "queries/get_portfolio_by_id.sql",
202            portfolio_id,
203            as_of
204        )
205        .fetch_optional(&self.reader)
206        .await?;
207
208        Ok(query.map(|row| PortfolioRecord {
209            id: portfolio_id,
210            as_of,
211            bidder_id: row.bidder_id,
212            app_data: row.app_data.0,
213            demand_group: row.demand_group.map(|data| data.0).unwrap_or_default(),
214            product_group: row.product_group.map(|data| data.0).unwrap_or_default(),
215        }))
216    }
217
218    /// Get the history of this portfolio's demands
219    ///
220    /// This returns a list of records, each containing the state of the portfolio's demand group
221    /// at a specific point in time. The records are ordered by `valid_from` in descending order
222    /// and are grouped by `valid_from`. This is important for a `more` pointer to work correctly,
223    /// so the demand_group is actually a map of `demand_id` to `weight` at that point in time.
224    async fn get_portfolio_demand_history(
225        &self,
226        portfolio_id: Self::PortfolioId,
227        query: DateTimeRangeQuery<Self::DateTime>,
228        limit: usize,
229    ) -> Result<
230        DateTimeRangeResponse<
231            ValueRecord<Self::DateTime, DemandGroup<Self::DemandId>>,
232            Self::DateTime,
233        >,
234        Self::Error,
235    > {
236        let limit_p1 = (limit + 1) as i64;
237        let mut rows = sqlx::query_as!(
238            PortfolioDemandHistoryRow,
239            r#"
240                select
241                    valid_from as "valid_from!: crate::types::DateTime",
242                    valid_until as "valid_until?: crate::types::DateTime",
243                    json_group_object(demand_id, weight) as "demand_group!: sqlx::types::Json<DemandGroup<DemandId>>"
244                from
245                    demand_group
246                where
247                    portfolio_id = $1
248                and
249                    ($2 is null or valid_from >= $2)
250                and
251                    ($3 is null or valid_until is null or valid_until < $3)
252                group by
253                    valid_from
254                order by
255                    valid_from desc
256                limit $4
257            "#,
258            portfolio_id,
259            query.after,
260            query.before,
261            limit_p1,
262        )
263        .fetch_all(&self.reader)
264        .await?;
265
266        let more = if rows.len() == limit + 1 {
267            let extra = rows.pop().unwrap();
268            Some(DateTimeRangeQuery {
269                before: Some(extra.valid_from),
270                after: query.after,
271            })
272        } else {
273            None
274        };
275
276        Ok(DateTimeRangeResponse {
277            results: rows.into_iter().map(Into::into).collect(),
278            more,
279        })
280    }
281
282    /// Get the history of this portfolio's products
283    ///
284    /// This returns a list of records, each containing the state of the portfolio's product group
285    /// at a specific point in time. The records are ordered by `valid_from` in descending order
286    /// and are grouped by `valid_from`. This is important for a `more` pointer to work correctly,
287    /// so the product_group is actually a map of `product_id` to `weight` at that point in time.
288    async fn get_portfolio_product_history(
289        &self,
290        portfolio_id: Self::PortfolioId,
291        query: DateTimeRangeQuery<Self::DateTime>,
292        limit: usize,
293    ) -> Result<
294        DateTimeRangeResponse<
295            ValueRecord<Self::DateTime, ProductGroup<Self::ProductId>>,
296            Self::DateTime,
297        >,
298        Self::Error,
299    > {
300        let limit_p1 = (limit + 1) as i64;
301        let mut rows = sqlx::query_as!(
302            PortfolioProductHistoryRow,
303            r#"
304                select
305                    valid_from as "valid_from!: crate::types::DateTime",
306                    valid_until as "valid_until?: crate::types::DateTime",
307                    json_group_object(product_id, weight) as "product_group!: sqlx::types::Json<ProductGroup<ProductId>>"
308                from
309                    product_group
310                where
311                    portfolio_id = $1
312                and
313                    ($2 is null or valid_from >= $2)
314                and
315                    ($3 is null or valid_until is null or valid_until < $3)
316                group by
317                    valid_from
318                order by
319                    valid_from desc
320                limit $4
321            "#,
322            portfolio_id,
323            query.after,
324            query.before,
325            limit_p1,
326        )
327        .fetch_all(&self.reader)
328        .await?;
329
330        let more = if rows.len() == limit + 1 {
331            let extra = rows.pop().unwrap();
332            Some(DateTimeRangeQuery {
333                before: Some(extra.valid_from),
334                after: query.after,
335            })
336        } else {
337            None
338        };
339
340        Ok(DateTimeRangeResponse {
341            results: rows.into_iter().map(Into::into).collect(),
342            more,
343        })
344    }
345}