entertainarr_adapter_sqlite/
tvshow_season.rs

1use anyhow::Context;
2use entertainarr_domain::language::Language;
3use entertainarr_domain::prelude::SortOrder;
4use entertainarr_domain::tvshow::entity::{ExternalTvShowSeason, TvShowSeason, TvShowSource};
5use entertainarr_domain::tvshow::entity::{ListTvShowSeasonParams, TvShowSeasonField};
6use sqlx::types::chrono::{DateTime, NaiveDate, Utc};
7
8use crate::prelude::HasAnyOf;
9use crate::{IndexIter, Wrapper};
10
11impl crate::Pool {
12    const UPSERT_TVSHOW_SEASONS_QUERY: &str = r#"insert into tvshow_seasons (source, tvshow_id, season_number, air_date)
13values (?, ?, ?, ?)
14on conflict (tvshow_id, season_number) do update set
15    source = excluded.source,
16    air_date = excluded.air_date,
17    updated_at = current_timestamp
18returning id, source, tvshow_id, season_number, air_date, created_at, updated_at"#;
19
20    #[tracing::instrument(
21        skip_all,
22        fields(
23            otel.kind = "client",
24            db.system = "sqlite",
25            db.name = "tvshow",
26            db.operation = "insert",
27            db.sql.table = "tvshow_seasons",
28            db.query.text = Self::UPSERT_TVSHOW_SEASONS_QUERY,
29            db.response.returned_rows = tracing::field::Empty,
30            error.type = tracing::field::Empty,
31            error.message = tracing::field::Empty,
32            error.stacktrace = tracing::field::Empty,
33        ),
34        err(Debug),
35    )]
36    pub(crate) async fn upsert_tvshow_season_row<'c, E: sqlx::SqliteExecutor<'c>>(
37        &self,
38        executor: E,
39        tvshow_id: u64,
40        input: &ExternalTvShowSeason,
41    ) -> anyhow::Result<TvShowSeasonRow> {
42        sqlx::query_as(Self::UPSERT_TVSHOW_SEASONS_QUERY)
43            .bind(Wrapper(input.source))
44            .bind(tvshow_id as i64)
45            .bind(input.season_number as i64)
46            .bind(input.air_date)
47            .fetch_one(executor)
48            .await
49            .inspect(crate::record_one)
50            .inspect_err(crate::record_error)
51            .context("unable to upsert tvshow season")
52    }
53}
54
55pub(crate) struct TvShowSeasonRow {
56    pub id: u64,
57    pub source: TvShowSource,
58    pub tvshow_id: u64,
59    pub season_number: u64,
60    pub air_date: Option<NaiveDate>,
61    pub created_at: DateTime<Utc>,
62    pub updated_at: DateTime<Utc>,
63}
64
65impl<'c> sqlx::FromRow<'c, sqlx::sqlite::SqliteRow> for TvShowSeasonRow {
66    fn from_row(row: &'c sqlx::sqlite::SqliteRow) -> Result<Self, sqlx::Error> {
67        use sqlx::Row;
68
69        let mut idx = IndexIter::default();
70
71        Ok(TvShowSeasonRow {
72            id: row.try_get(idx.next())?,
73            source: row.try_get(idx.next()).map(Wrapper::inner)?,
74            tvshow_id: row.try_get(idx.next())?,
75            season_number: row.try_get(idx.next())?,
76            air_date: row.try_get(idx.next())?,
77            created_at: row.try_get(idx.next())?,
78            updated_at: row.try_get(idx.next())?,
79        })
80    }
81}
82
83impl crate::Pool {
84    const UPSERT_TVSHOW_SEASON_LABELS_QUERY: &str = r#"insert into tvshow_season_labels (tvshow_season_id, language, name, overview, poster_url)
85values (?, ?, ?, ?, ?)
86on conflict (tvshow_season_id, language) do update set
87    name = excluded.name,
88    overview = excluded.overview,
89    poster_url = excluded.poster_url,
90    updated_at = current_timestamp
91returning tvshow_season_id, language, name, overview, poster_url, created_at, updated_at"#;
92
93    #[tracing::instrument(
94        skip_all,
95        fields(
96            otel.kind = "client",
97            db.system = "sqlite",
98            db.name = "tvshow",
99            db.operation = "insert",
100            db.sql.table = "tvshow_season_labels",
101            db.query.text = Self::UPSERT_TVSHOW_SEASON_LABELS_QUERY,
102            db.response.returned_rows = tracing::field::Empty,
103            error.type = tracing::field::Empty,
104            error.message = tracing::field::Empty,
105            error.stacktrace = tracing::field::Empty,
106        ),
107        err(Debug),
108    )]
109    pub(crate) async fn upsert_tvshow_season_label_row<'c, E: sqlx::SqliteExecutor<'c>>(
110        &self,
111        executor: E,
112        season_id: u64,
113        input: &ExternalTvShowSeason,
114    ) -> anyhow::Result<TvShowSeasonLabelRow> {
115        sqlx::query_as(Self::UPSERT_TVSHOW_SEASON_LABELS_QUERY)
116            .bind(season_id as i64)
117            .bind(super::Wrapper(input.language))
118            .bind(&input.name)
119            .bind(&input.overview)
120            .bind(&input.poster_url)
121            .fetch_one(executor)
122            .await
123            .inspect(crate::record_one)
124            .inspect_err(crate::record_error)
125            .context("unable to upsert tvshow season")
126    }
127}
128
129impl crate::Pool {
130    pub(crate) async fn upsert_tvshow_season(
131        &self,
132        tvshow_id: u64,
133        input: &ExternalTvShowSeason,
134    ) -> anyhow::Result<TvShowSeason> {
135        let mut tx = self
136            .as_ref()
137            .begin()
138            .await
139            .context("unable to begin transaction")?;
140
141        let season_row = self
142            .upsert_tvshow_season_row(&mut *tx, tvshow_id, input)
143            .await?;
144        let label_row = self
145            .upsert_tvshow_season_label_row(&mut *tx, season_row.id, input)
146            .await?;
147
148        tx.commit().await.context("unable to submit transaction")?;
149
150        Ok(Wrapper::<TvShowSeason>::from((season_row, label_row)).inner())
151    }
152}
153
154impl<'c> sqlx::FromRow<'c, sqlx::sqlite::SqliteRow> for Wrapper<TvShowSeason> {
155    fn from_row(row: &'c sqlx::sqlite::SqliteRow) -> Result<Self, sqlx::Error> {
156        use sqlx::Row;
157
158        let mut idx = IndexIter::default();
159
160        Ok(Wrapper(TvShowSeason {
161            id: row.try_get(idx.next())?,
162            source: row.try_get(idx.next()).map(Wrapper::inner)?,
163            tvshow_id: row.try_get(idx.next())?,
164            season_number: row.try_get(idx.next())?,
165            air_date: row.try_get(idx.next())?,
166            language: row.try_get(idx.next()).map(Wrapper::inner)?,
167            name: row.try_get(idx.next())?,
168            overview: row.try_get(idx.next())?,
169            poster_url: row.try_get(idx.next())?,
170            created_at: row.try_get(idx.next())?,
171            updated_at: row.try_get(idx.next())?,
172        }))
173    }
174}
175
176impl crate::Pool {
177    const FIND_TVSHOW_SEASON_BY_ID_QUERY: &str = r#"select id, source, tvshow_id, season_number, air_date, language, name, overview, poster_url, tvshow_seasons.created_at, tvshow_seasons.updated_at
178from tvshow_seasons
179join tvshow_season_labels on tvshow_seasons.id = tvshow_season_labels.tvshow_season_id
180where tvshow_season_labels.language = ?
181    and tvshow_seasons.id = ?
182"#;
183
184    #[tracing::instrument(
185        skip_all,
186        fields(
187            otel.kind = "client",
188            db.system = "sqlite",
189            db.name = "tvshow",
190            db.operation = "select",
191            db.sql.table = "tvshow_seasons",
192            db.query.text = Self::FIND_TVSHOW_SEASON_BY_ID_QUERY,
193            db.response.returned_rows = tracing::field::Empty,
194            error.type = tracing::field::Empty,
195            error.message = tracing::field::Empty,
196            error.stacktrace = tracing::field::Empty,
197        ),
198        err(Debug),
199    )]
200    pub(crate) async fn find_tvshow_season_by_id(
201        &self,
202        season_id: u64,
203        language: Language,
204    ) -> anyhow::Result<Option<TvShowSeason>> {
205        sqlx::query_as(Self::FIND_TVSHOW_SEASON_BY_ID_QUERY)
206            .bind(Wrapper(language))
207            .bind(season_id as i64)
208            .fetch_optional(self.as_ref())
209            .await
210            .map(Wrapper::maybe_inner)
211            .context("unable to fetch tvshow season by id")
212    }
213
214    #[tracing::instrument(
215        skip_all,
216        fields(
217            otel.kind = "client",
218            db.system = "sqlite",
219            db.name = "tvshow",
220            db.operation = "select",
221            db.sql.table = "tvshow_seasons",
222            db.query.text = tracing::field::Empty,
223            db.response.returned_rows = tracing::field::Empty,
224            error.type = tracing::field::Empty,
225            error.message = tracing::field::Empty,
226            error.stacktrace = tracing::field::Empty,
227        ),
228        err(Debug),
229    )]
230    pub(crate) async fn list_tvshow_season<'a>(
231        &self,
232        params: ListTvShowSeasonParams<'a>,
233    ) -> anyhow::Result<Vec<TvShowSeason>> {
234        let mut qb = sqlx::QueryBuilder::new(
235            "select id, source, tvshow_id, season_number, air_date, language, name, overview, poster_url, tvshow_seasons.created_at, tvshow_seasons.updated_at",
236        );
237        qb.push(" from tvshow_seasons");
238        qb.push(" join tvshow_season_labels on tvshow_seasons.id = tvshow_season_labels.tvshow_season_id");
239        qb.push(" where tvshow_season_labels.language = ")
240            .push_bind(Wrapper(params.language));
241        if let Some(tvshow_id) = params.filter.tvshow_id {
242            qb.push(" and tvshow_id = ").push_bind(tvshow_id as i64);
243        }
244        if !params.filter.season_ids.is_empty() {
245            qb.push(" and ");
246            qb.push_any("tvshow_seasons.id", params.filter.season_ids);
247        }
248        qb.push(" order by")
249            .push(match params.sort.field {
250                TvShowSeasonField::SeasonNumber => " season_number",
251            })
252            .push(match params.sort.order {
253                SortOrder::Asc => " asc",
254                SortOrder::Desc => " desc",
255            });
256        qb.push(" limit ").push_bind(params.page.limit);
257        qb.push(" offset ").push_bind(params.page.offset);
258
259        tracing::Span::current().record("db.query.text", qb.sql());
260
261        qb.build_query_as()
262            .fetch_all(self.as_ref())
263            .await
264            .map(Wrapper::list)
265            .context("unable to list tvshow seasons")
266    }
267}
268
269#[allow(unused)]
270pub(crate) struct TvShowSeasonLabelRow {
271    pub tvshow_season_id: u64,
272    pub language: Language,
273    pub name: String,
274    pub overview: Option<String>,
275    pub poster_url: Option<String>,
276    pub created_at: DateTime<Utc>,
277    pub updated_at: DateTime<Utc>,
278}
279
280impl<'c> sqlx::FromRow<'c, sqlx::sqlite::SqliteRow> for TvShowSeasonLabelRow {
281    fn from_row(row: &'c sqlx::sqlite::SqliteRow) -> Result<Self, sqlx::Error> {
282        use sqlx::Row;
283
284        let mut idx = IndexIter::default();
285
286        Ok(Self {
287            tvshow_season_id: row.try_get(idx.next())?,
288            language: row.try_get(idx.next()).map(|Wrapper(v)| v)?,
289            name: row.try_get(idx.next())?,
290            overview: row.try_get(idx.next())?,
291            poster_url: row.try_get(idx.next())?,
292            created_at: row.try_get(idx.next())?,
293            updated_at: row.try_get(idx.next())?,
294        })
295    }
296}
297
298impl From<(TvShowSeasonRow, TvShowSeasonLabelRow)> for Wrapper<TvShowSeason> {
299    fn from((season, label): (TvShowSeasonRow, TvShowSeasonLabelRow)) -> Self {
300        Wrapper(TvShowSeason {
301            id: season.id,
302            source: season.source,
303            tvshow_id: season.tvshow_id,
304            season_number: season.season_number,
305            language: label.language,
306            name: label.name,
307            overview: label.overview,
308            poster_url: label.poster_url,
309            air_date: season.air_date,
310            created_at: season.created_at,
311            updated_at: season.updated_at,
312        })
313    }
314}
315
316impl entertainarr_domain::tvshow::prelude::TvShowSeasonRepository for crate::Pool {
317    async fn find_by_id(
318        &self,
319        season_id: u64,
320        language: Language,
321    ) -> anyhow::Result<Option<TvShowSeason>> {
322        self.find_tvshow_season_by_id(season_id, language).await
323    }
324
325    async fn list<'a>(
326        &self,
327        params: ListTvShowSeasonParams<'a>,
328    ) -> anyhow::Result<Vec<TvShowSeason>> {
329        self.list_tvshow_season(params).await
330    }
331
332    async fn upsert(
333        &self,
334        tvshow_id: u64,
335        input: &ExternalTvShowSeason,
336    ) -> anyhow::Result<TvShowSeason> {
337        self.upsert_tvshow_season(tvshow_id, input).await
338    }
339}
340
341#[cfg(test)]
342pub(crate) mod tests {
343    use entertainarr_domain::language::Language;
344    use entertainarr_domain::tvshow::entity::{ExternalTvShowSeason, TvShowSource};
345
346    use crate::tvshow::tests::breaking_bad;
347
348    pub fn season(source: TvShowSource, season_number: u64) -> ExternalTvShowSeason {
349        ExternalTvShowSeason {
350            source,
351            air_date: None,
352            language: Language::En,
353            name: format!("season {season_number}"),
354            overview: None,
355            season_number,
356            poster_url: None,
357            episodes: Vec::default(),
358        }
359    }
360
361    #[tokio::test]
362    async fn should_upsert_missing_tvshow_season() {
363        let tmpdir = tempfile::tempdir().unwrap();
364        let pool = crate::Pool::test(&tmpdir.path().join("db")).await;
365
366        let tvshow = pool.upsert_tvshow(&breaking_bad()).await.unwrap();
367        let _s1 = pool
368            .upsert_tvshow_season(tvshow.id, &season(tvshow.source, 1))
369            .await
370            .unwrap();
371        let _s2 = pool
372            .upsert_tvshow_season(tvshow.id, &season(tvshow.source, 2))
373            .await
374            .unwrap();
375    }
376
377    #[tokio::test]
378    async fn should_upsert_existing_tvshow_season() {
379        let tmpdir = tempfile::tempdir().unwrap();
380        let pool = crate::Pool::test(&tmpdir.path().join("db")).await;
381
382        let tvshow = pool.upsert_tvshow(&breaking_bad()).await.unwrap();
383        let first = pool
384            .upsert_tvshow_season(tvshow.id, &season(tvshow.source, 1))
385            .await
386            .unwrap();
387        let second = pool
388            .upsert_tvshow_season(tvshow.id, &season(tvshow.source, 1))
389            .await
390            .unwrap();
391        assert_eq!(first.id, second.id);
392        assert_eq!(first.tvshow_id, second.tvshow_id);
393        assert_eq!(first.season_number, second.season_number);
394    }
395}