entertainarr_adapter_sqlite/
tvshow_episode.rs

1use std::collections::HashMap;
2
3use anyhow::Context;
4use entertainarr_domain::language::Language;
5use entertainarr_domain::prelude::SortOrder;
6use entertainarr_domain::tvshow::entity::{
7    ExternalTvShowEpisode, ListTvShowEpisodeParams, TvShowEpisode, TvShowEpisodeField,
8    TvShowEpisodeProgress, TvShowSource,
9};
10use sqlx::types::chrono::{DateTime, NaiveDate, Utc};
11
12use crate::prelude::HasAnyOf;
13use crate::{IndexIter, Wrapper};
14
15impl crate::Pool {
16    #[tracing::instrument(
17        skip_all,
18        fields(
19            otel.kind = "client",
20            db.system = "sqlite",
21            db.name = "tvshow",
22            db.operation = "insert",
23            db.sql.table = "tvshow_episodes",
24            db.query.text = tracing::field::Empty,
25            db.response.returned_rows = tracing::field::Empty,
26            error.type = tracing::field::Empty,
27            error.message = tracing::field::Empty,
28            error.stacktrace = tracing::field::Empty,
29        ),
30        err(Debug),
31    )]
32    pub(crate) async fn upsert_tvshow_episode_row<'c, E: sqlx::SqliteExecutor<'c>>(
33        &self,
34        executor: E,
35        tvshow_season_id: u64,
36        input: &[ExternalTvShowEpisode],
37    ) -> anyhow::Result<Vec<TvShowEpisodeRow>> {
38        let mut qb = sqlx::QueryBuilder::new(
39            "insert into tvshow_episodes (source, tvshow_season_id, episode_number, air_date, production_code, runtime, vote_average, vote_count) ",
40        );
41        qb.push_values(input.iter(), |mut q, row| {
42            q.push_bind(Wrapper(row.source))
43                .push_bind(tvshow_season_id as i64)
44                .push_bind(row.episode_number as i64)
45                .push_bind(row.air_date)
46                .push_bind(row.production_code.as_str())
47                .push_bind(row.runtime.map(|value| value as i64))
48                .push_bind(row.vote_average)
49                .push_bind(row.vote_count as i64);
50        });
51        qb.push(" on conflict (tvshow_season_id, episode_number) do update set ");
52        qb.push(" source = excluded.source,");
53        qb.push(" air_date = excluded.air_date,");
54        qb.push(" production_code = excluded.production_code,");
55        qb.push(" runtime = excluded.runtime,");
56        qb.push(" vote_average = excluded.vote_average,");
57        qb.push(" vote_count = excluded.vote_count");
58        qb.push(" returning id, source, tvshow_season_id, episode_number, air_date, production_code, runtime, vote_average, vote_count, created_at, updated_at");
59
60        tracing::Span::current().record("db.query.text", qb.sql());
61
62        qb.build_query_as()
63            .fetch_all(executor)
64            .await
65            .inspect(crate::record_all)
66            .inspect_err(crate::record_error)
67            .context("unable to upsert tvshow season")
68    }
69}
70
71pub(crate) struct TvShowEpisodeRow {
72    pub id: u64,
73    pub source: TvShowSource,
74    pub tvshow_season_id: u64,
75    pub episode_number: u64,
76    pub air_date: Option<NaiveDate>,
77    pub production_code: String,
78    pub runtime: Option<u32>,
79    pub vote_average: f64,
80    pub vote_count: u64,
81    pub created_at: DateTime<Utc>,
82    pub updated_at: DateTime<Utc>,
83}
84
85impl<'c> sqlx::FromRow<'c, sqlx::sqlite::SqliteRow> for TvShowEpisodeRow {
86    fn from_row(row: &'c sqlx::sqlite::SqliteRow) -> Result<Self, sqlx::Error> {
87        use sqlx::Row;
88
89        let mut idx = IndexIter::default();
90
91        Ok(TvShowEpisodeRow {
92            id: row.try_get(idx.next())?,
93            source: row.try_get(idx.next()).map(Wrapper::inner)?,
94            tvshow_season_id: row.try_get(idx.next())?,
95            episode_number: row.try_get(idx.next())?,
96            air_date: row.try_get(idx.next())?,
97            production_code: row.try_get(idx.next())?,
98            runtime: row.try_get(idx.next())?,
99            vote_average: row.try_get(idx.next())?,
100            vote_count: row.try_get(idx.next())?,
101            created_at: row.try_get(idx.next())?,
102            updated_at: row.try_get(idx.next())?,
103        })
104    }
105}
106
107impl crate::Pool {
108    #[tracing::instrument(
109        skip_all,
110        fields(
111            otel.kind = "client",
112            db.system = "sqlite",
113            db.name = "tvshow",
114            db.operation = "insert",
115            db.sql.table = "tvshow_episode_labels",
116            db.query.text = tracing::field::Empty,
117            db.response.returned_rows = tracing::field::Empty,
118            error.type = tracing::field::Empty,
119            error.message = tracing::field::Empty,
120            error.stacktrace = tracing::field::Empty,
121        ),
122        err(Debug),
123    )]
124    pub(crate) async fn upsert_tvshow_episode_label_row<'c, E: sqlx::SqliteExecutor<'c>>(
125        &self,
126        executor: E,
127        items: impl Iterator<Item = (u64, &ExternalTvShowEpisode)>,
128    ) -> anyhow::Result<Vec<TvShowEpisodeLabelRow>> {
129        let mut qb = sqlx::QueryBuilder::new(
130            "insert into tvshow_episode_labels (tvshow_episode_id, language, name, overview, still_url) ",
131        );
132        qb.push_values(items, |mut q, (id, item)| {
133            q.push_bind(id as i64)
134                .push_bind(Wrapper(item.language))
135                .push_bind(&item.name)
136                .push_bind(&item.overview)
137                .push_bind(&item.still_url);
138        });
139
140        qb.push(" on conflict (tvshow_episode_id, language) do update set ");
141        qb.push(" name = excluded.name,");
142        qb.push(" overview = excluded.overview,");
143        qb.push(" still_url = excluded.still_url");
144        qb.push(" returning tvshow_episode_id, language, name, overview, still_url, created_at, updated_at");
145
146        tracing::Span::current().record("db.query.text", qb.sql());
147
148        qb.build_query_as()
149            .fetch_all(executor)
150            .await
151            .inspect(crate::record_all)
152            .inspect_err(crate::record_error)
153            .context("unable to upsert tvshow episode")
154    }
155}
156
157impl crate::Pool {
158    pub(crate) async fn upsert_tvshow_episodes(
159        &self,
160        tvshow_season_id: u64,
161        input: &[ExternalTvShowEpisode],
162    ) -> anyhow::Result<Vec<TvShowEpisode>> {
163        let mut tx = self
164            .as_ref()
165            .begin()
166            .await
167            .context("unable to begin transaction")?;
168
169        let episodes = self
170            .upsert_tvshow_episode_row(&mut *tx, tvshow_season_id, input)
171            .await?;
172        let episode_ids: HashMap<u64, u64> = HashMap::from_iter(
173            episodes
174                .iter()
175                .map(|episode| (episode.episode_number, episode.id)),
176        );
177        let mut episodes: HashMap<u64, TvShowEpisodeRow> =
178            HashMap::from_iter(episodes.into_iter().map(|episode| (episode.id, episode)));
179        let labels = self
180            .upsert_tvshow_episode_label_row(
181                &mut *tx,
182                input.iter().filter_map(|item| {
183                    episode_ids
184                        .get(&item.episode_number)
185                        .map(|found| (*found, item))
186                }),
187            )
188            .await?;
189
190        tx.commit().await.context("unable to submit transaction")?;
191
192        Ok(labels
193            .into_iter()
194            .filter_map(|label| {
195                episodes
196                    .remove(&label.tvshow_episode_id)
197                    .map(|found| Wrapper::<TvShowEpisode>::from((found, label)).inner())
198            })
199            .collect())
200    }
201}
202
203#[allow(unused)]
204pub(crate) struct TvShowEpisodeLabelRow {
205    pub tvshow_episode_id: u64,
206    pub language: Language,
207    pub name: String,
208    pub overview: Option<String>,
209    pub still_url: Option<String>,
210    pub created_at: DateTime<Utc>,
211    pub updated_at: DateTime<Utc>,
212}
213
214impl<'c> sqlx::FromRow<'c, sqlx::sqlite::SqliteRow> for TvShowEpisodeLabelRow {
215    fn from_row(row: &'c sqlx::sqlite::SqliteRow) -> Result<Self, sqlx::Error> {
216        use sqlx::Row;
217
218        let mut idx = IndexIter::default();
219
220        Ok(Self {
221            tvshow_episode_id: row.try_get(idx.next())?,
222            language: row.try_get(idx.next()).map(|Wrapper(v)| v)?,
223            name: row.try_get(idx.next())?,
224            overview: row.try_get(idx.next())?,
225            still_url: row.try_get(idx.next())?,
226            created_at: row.try_get(idx.next())?,
227            updated_at: row.try_get(idx.next())?,
228        })
229    }
230}
231
232impl From<(TvShowEpisodeRow, TvShowEpisodeLabelRow)> for Wrapper<TvShowEpisode> {
233    fn from((episode, label): (TvShowEpisodeRow, TvShowEpisodeLabelRow)) -> Self {
234        Wrapper(TvShowEpisode {
235            id: episode.id,
236            source: episode.source,
237            tvshow_season_id: episode.tvshow_season_id,
238            episode_number: episode.episode_number,
239            production_code: episode.production_code,
240            language: label.language,
241            name: label.name,
242            overview: label.overview,
243            still_url: label.still_url,
244            air_date: episode.air_date,
245            runtime: episode.runtime,
246            vote_average: episode.vote_average,
247            vote_count: episode.vote_count,
248            created_at: episode.created_at,
249            updated_at: episode.updated_at,
250        })
251    }
252}
253
254impl<'c> sqlx::FromRow<'c, sqlx::sqlite::SqliteRow> for Wrapper<TvShowEpisode> {
255    fn from_row(row: &'c sqlx::sqlite::SqliteRow) -> Result<Self, sqlx::Error> {
256        use sqlx::Row;
257
258        let mut idx = IndexIter::default();
259
260        Ok(Wrapper(TvShowEpisode {
261            id: row.try_get(idx.next())?,
262            source: row.try_get(idx.next()).map(Wrapper::inner)?,
263            tvshow_season_id: row.try_get(idx.next())?,
264            episode_number: row.try_get(idx.next())?,
265            production_code: row.try_get(idx.next())?,
266            language: row.try_get(idx.next()).map(Wrapper::inner)?,
267            name: row.try_get(idx.next())?,
268            overview: row.try_get(idx.next())?,
269            still_url: row.try_get(idx.next())?,
270            air_date: row.try_get(idx.next())?,
271            runtime: row.try_get(idx.next())?,
272            vote_average: row.try_get(idx.next())?,
273            vote_count: row.try_get(idx.next())?,
274            created_at: row.try_get(idx.next())?,
275            updated_at: row.try_get(idx.next())?,
276        }))
277    }
278}
279
280impl crate::Pool {
281    const FIND_TVSHOW_EPISODE_QUERY: &str = r#"select te.id, te.source, te.tvshow_season_id, te.episode_number, te.production_code, tel.language, tel.name, tel.overview, tel.still_url, te.air_date, te.runtime, te.vote_average, te.vote_count, te.created_at, te.updated_at
282from tvshow_episodes te
283join tvshow_episode_labels tel on te.id = tel.tvshow_episode_id
284where te.id = ? and tel.language = ?
285limit 1"#;
286
287    #[tracing::instrument(
288        skip_all,
289        fields(
290            otel.kind = "client",
291            db.system = "sqlite",
292            db.name = "tvshow",
293            db.operation = "select",
294            db.sql.table = "tvshow_episodes",
295            db.query.text = Self::FIND_TVSHOW_EPISODE_QUERY,
296            db.response.returned_rows = tracing::field::Empty,
297            error.type = tracing::field::Empty,
298            error.message = tracing::field::Empty,
299            error.stacktrace = tracing::field::Empty,
300        ),
301        err(Debug),
302    )]
303    async fn find_tvshow_episode_by_id(
304        &self,
305        episode_id: u64,
306        language: Language,
307    ) -> anyhow::Result<Option<TvShowEpisode>> {
308        sqlx::query_as(Self::FIND_TVSHOW_EPISODE_QUERY)
309            .bind(episode_id as i64)
310            .bind(Wrapper(language))
311            .fetch_optional(self.as_ref())
312            .await
313            .map(Wrapper::maybe_inner)
314            .context("unable to fetch tvshow episode")
315    }
316
317    const LIST_TVSHOW_EPISODE_QUERY: &str = r#"select te.id, te.source, te.tvshow_season_id, te.episode_number, te.production_code, tel.language, tel.name, tel.overview, tel.still_url, te.air_date, te.runtime, te.vote_average, te.vote_count, te.created_at, te.updated_at
318from tvshow_episodes te
319join tvshow_episode_labels tel on te.id = tel.tvshow_episode_id
320join tvshow_seasons ts on ts.id = te.tvshow_season_id"#;
321
322    #[tracing::instrument(
323        skip_all,
324        fields(
325            otel.kind = "client",
326            db.system = "sqlite",
327            db.name = "tvshow",
328            db.operation = "select",
329            db.sql.table = "tvshow_episodes",
330            db.query.text = Self::LIST_TVSHOW_EPISODE_QUERY,
331            db.response.returned_rows = tracing::field::Empty,
332            error.type = tracing::field::Empty,
333            error.message = tracing::field::Empty,
334            error.stacktrace = tracing::field::Empty,
335        ),
336        err(Debug),
337    )]
338    async fn list_tvshow_episode<'a>(
339        &self,
340        params: ListTvShowEpisodeParams<'a>,
341    ) -> anyhow::Result<Vec<TvShowEpisode>> {
342        let mut qb = sqlx::QueryBuilder::new(Self::LIST_TVSHOW_EPISODE_QUERY);
343        if params.filter.subscribed.is_some() {
344            qb.push(" left outer join user_tvshows ut on ut.user_id = ")
345                .push_bind(params.user_id as i64)
346                .push(" and ut.tvshow_id = ts.tvshow_id");
347        }
348
349        qb.push(" where tel.language = ")
350            .push_bind(Wrapper(params.language));
351
352        match params.filter.subscribed {
353            Some(true) => {
354                qb.push(" and ut.created_at is not null");
355            }
356            Some(false) => {
357                qb.push(" and ut.created_at is null");
358            }
359            None => {}
360        }
361
362        if !params.filter.tvshow_ids.is_empty() {
363            qb.push(" and ");
364            qb.push_any("ts.tvshow_id", params.filter.tvshow_ids);
365        }
366        if !params.filter.season_ids.is_empty() {
367            qb.push(" and ");
368            qb.push_any("ts.id", params.filter.season_ids);
369        }
370        qb.push(" order by")
371            .push(match params.sort.field {
372                TvShowEpisodeField::AirDate => " te.air_date",
373                TvShowEpisodeField::EpisodeNumber => " te.episode_number",
374            })
375            .push(match params.sort.order {
376                SortOrder::Asc => " asc",
377                SortOrder::Desc => " desc",
378            });
379
380        qb.push(" limit ")
381            .push_bind(params.page.limit)
382            .push(" offset ")
383            .push_bind(params.page.offset);
384
385        qb.build_query_as()
386            .fetch_all(self.as_ref())
387            .await
388            .map(Wrapper::list)
389            .context("unable to list tvshow episodes")
390    }
391}
392
393impl<'c> sqlx::FromRow<'c, sqlx::sqlite::SqliteRow> for Wrapper<TvShowEpisodeProgress> {
394    fn from_row(row: &'c sqlx::sqlite::SqliteRow) -> Result<Self, sqlx::Error> {
395        use sqlx::Row;
396
397        let mut idx = IndexIter::default();
398
399        Ok(Wrapper(TvShowEpisodeProgress {
400            user_id: row.try_get(idx.next())?,
401            tvshow_episode_id: row.try_get(idx.next())?,
402            progress: row.try_get(idx.next())?,
403            completed: row.try_get(idx.next())?,
404            created_at: row.try_get(idx.next())?,
405            updated_at: row.try_get(idx.next())?,
406        }))
407    }
408}
409
410impl crate::Pool {
411    #[tracing::instrument(
412        skip_all,
413        fields(
414            otel.kind = "client",
415            db.system = "sqlite",
416            db.name = "tvshow",
417            db.operation = "DELETE",
418            db.sql.table = "user_tvshow_episodes",
419            db.query.text = tracing::field::Empty,
420            db.response.returned_rows = tracing::field::Empty,
421            error.type = tracing::field::Empty,
422            error.message = tracing::field::Empty,
423            error.stacktrace = tracing::field::Empty,
424        ),
425        err(Debug),
426    )]
427    async fn delete_tvshow_episode_progressions(
428        &self,
429        user_id: u64,
430        episode_ids: &[u64],
431    ) -> anyhow::Result<()> {
432        let mut qb = sqlx::QueryBuilder::new("delete from user_tvshow_episodes");
433        qb.push(" where user_id = ").push_bind(user_id as i64);
434        qb.push(" and ").push_any("tvshow_episode_id", episode_ids);
435        qb.build()
436            .execute(self.as_ref())
437            .await
438            .inspect_err(super::record_error)
439            .map(|_| ())
440            .context("unable to delete progressions")
441    }
442
443    #[tracing::instrument(
444        skip_all,
445        fields(
446            otel.kind = "client",
447            db.system = "sqlite",
448            db.name = "tvshow",
449            db.operation = "SELECT",
450            db.sql.table = "user_tvshow_episodes",
451            db.query.text = tracing::field::Empty,
452            db.response.returned_rows = tracing::field::Empty,
453            error.type = tracing::field::Empty,
454            error.message = tracing::field::Empty,
455            error.stacktrace = tracing::field::Empty,
456        ),
457        err(Debug),
458    )]
459    async fn list_tvshow_episode_progressions(
460        &self,
461        user_id: u64,
462        episode_ids: &[u64],
463    ) -> anyhow::Result<Vec<entertainarr_domain::tvshow::entity::TvShowEpisodeProgress>> {
464        let mut qb = sqlx::QueryBuilder::new(
465            "select user_id, tvshow_episode_id, progress, completed, created_at, updated_at from user_tvshow_episodes",
466        );
467        qb.push(" where user_id = ").push_bind(user_id as i64);
468        qb.push(" and ").push_any("tvshow_episode_id", episode_ids);
469        qb.build_query_as()
470            .fetch_all(self.as_ref())
471            .await
472            .inspect(super::record_all)
473            .inspect_err(super::record_error)
474            .map(Wrapper::list)
475            .context("unable to delete progressions")
476    }
477
478    #[tracing::instrument(
479        skip_all,
480        fields(
481            otel.kind = "client",
482            db.system = "sqlite",
483            db.name = "tvshow",
484            db.operation = "UPSERT",
485            db.sql.table = "user_tvshow_episodes",
486            db.query.text = tracing::field::Empty,
487            db.response.returned_rows = tracing::field::Empty,
488            error.type = tracing::field::Empty,
489            error.message = tracing::field::Empty,
490            error.stacktrace = tracing::field::Empty,
491        ),
492        err(Debug),
493    )]
494    async fn upsert_tvshow_episode_progressions(
495        &self,
496        inputs: &[entertainarr_domain::tvshow::entity::TvShowEpisodeProgressInput],
497    ) -> anyhow::Result<Vec<TvShowEpisodeProgress>> {
498        if inputs.is_empty() {
499            return Ok(Default::default());
500        }
501
502        let mut qb = sqlx::QueryBuilder::new(
503            "insert into user_tvshow_episodes (user_id, tvshow_episode_id, progress, completed) ",
504        );
505        qb.push_values(inputs, |mut q, item| {
506            q.push_bind(item.user_id as i64)
507                .push_bind(item.tvshow_episode_id as i64)
508                .push_bind(item.progress as i64)
509                .push_bind(item.completed);
510        });
511        qb.push(" on conflict (user_id, tvshow_episode_id) do update set progress = excluded.progress, completed = excluded.completed, updated_at = CURRENT_TIMESTAMP");
512        qb.push(
513            " returning user_id, tvshow_episode_id, progress, completed, created_at, updated_at",
514        );
515
516        let span = tracing::Span::current();
517        span.record("db.query.text", qb.sql());
518
519        qb.build_query_as()
520            .fetch_all(self.as_ref())
521            .await
522            .inspect(super::record_all)
523            .inspect_err(super::record_error)
524            .map(super::Wrapper::list)
525            .context("unable to upsert tvshow episode progresses")
526    }
527}
528
529impl entertainarr_domain::tvshow::prelude::TvShowEpisodeRepository for crate::Pool {
530    async fn find_by_id(
531        &self,
532        episode_id: u64,
533        language: Language,
534    ) -> anyhow::Result<Option<TvShowEpisode>> {
535        self.find_tvshow_episode_by_id(episode_id, language).await
536    }
537
538    async fn list<'a>(
539        &self,
540        params: ListTvShowEpisodeParams<'a>,
541    ) -> anyhow::Result<Vec<TvShowEpisode>> {
542        self.list_tvshow_episode(params).await
543    }
544
545    async fn upsert(
546        &self,
547        tvshow_id: u64,
548        input: &[ExternalTvShowEpisode],
549    ) -> anyhow::Result<Vec<TvShowEpisode>> {
550        self.upsert_tvshow_episodes(tvshow_id, input).await
551    }
552
553    async fn delete_progressions(&self, user_id: u64, episode_ids: &[u64]) -> anyhow::Result<()> {
554        self.delete_tvshow_episode_progressions(user_id, episode_ids)
555            .await
556    }
557
558    async fn list_progressions(
559        &self,
560        user_id: u64,
561        episode_ids: &[u64],
562    ) -> anyhow::Result<Vec<entertainarr_domain::tvshow::entity::TvShowEpisodeProgress>> {
563        self.list_tvshow_episode_progressions(user_id, episode_ids)
564            .await
565    }
566
567    async fn upsert_progressions(
568        &self,
569        inputs: &[entertainarr_domain::tvshow::entity::TvShowEpisodeProgressInput],
570    ) -> anyhow::Result<Vec<TvShowEpisodeProgress>> {
571        self.upsert_tvshow_episode_progressions(inputs).await
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    use entertainarr_domain::language::Language;
578    use entertainarr_domain::tvshow::entity::{ExternalTvShowEpisode, TvShowSource};
579
580    use crate::tvshow::tests::breaking_bad;
581    use crate::tvshow_season::tests::season;
582
583    fn episode(
584        source: TvShowSource,
585        season_number: u64,
586        episode_number: u64,
587    ) -> ExternalTvShowEpisode {
588        ExternalTvShowEpisode {
589            source,
590            language: Language::En,
591            air_date: None,
592            name: format!("episode {episode_number}"),
593            overview: None,
594            production_code: format!("episode_{episode_number}"),
595            episode_number,
596            season_number,
597            runtime: None,
598            still_url: None,
599            vote_average: 2.5,
600            vote_count: 42,
601        }
602    }
603
604    #[tokio::test]
605    async fn should_upsert_missing_tvshow_episodes() {
606        let tmpdir = tempfile::tempdir().unwrap();
607        let pool = crate::Pool::test(&tmpdir.path().join("db")).await;
608
609        let tvshow = pool.upsert_tvshow(&breaking_bad()).await.unwrap();
610        let s1 = pool
611            .upsert_tvshow_season(tvshow.id, &season(tvshow.source, 1))
612            .await
613            .unwrap();
614        let _eps1 = pool
615            .upsert_tvshow_episodes(
616                s1.id,
617                &[
618                    episode(TvShowSource::Tmdb { tmdb_id: 1 }, 1, 1),
619                    episode(TvShowSource::Tmdb { tmdb_id: 2 }, 1, 2),
620                    episode(TvShowSource::Tmdb { tmdb_id: 3 }, 1, 3),
621                ],
622            )
623            .await
624            .unwrap();
625        let s2 = pool
626            .upsert_tvshow_season(tvshow.id, &season(tvshow.source, 2))
627            .await
628            .unwrap();
629        let _eps2 = pool
630            .upsert_tvshow_episodes(
631                s2.id,
632                &[
633                    episode(TvShowSource::Tmdb { tmdb_id: 4 }, 2, 4),
634                    episode(TvShowSource::Tmdb { tmdb_id: 5 }, 2, 5),
635                    episode(TvShowSource::Tmdb { tmdb_id: 6 }, 2, 6),
636                ],
637            )
638            .await
639            .unwrap();
640    }
641
642    #[tokio::test]
643    async fn should_upsert_existing_tvshow_episodes() {
644        let tmpdir = tempfile::tempdir().unwrap();
645        let pool = crate::Pool::test(&tmpdir.path().join("db")).await;
646
647        let tvshow = pool.upsert_tvshow(&breaking_bad()).await.unwrap();
648        let s1 = pool
649            .upsert_tvshow_season(tvshow.id, &season(tvshow.source, 1))
650            .await
651            .unwrap();
652        let _eps1 = pool
653            .upsert_tvshow_episodes(
654                s1.id,
655                &[
656                    episode(TvShowSource::Tmdb { tmdb_id: 1 }, 1, 1),
657                    episode(TvShowSource::Tmdb { tmdb_id: 2 }, 1, 2),
658                    episode(TvShowSource::Tmdb { tmdb_id: 3 }, 1, 3),
659                ],
660            )
661            .await
662            .unwrap();
663        let _eps2 = pool
664            .upsert_tvshow_episodes(
665                s1.id,
666                &[
667                    episode(TvShowSource::Tmdb { tmdb_id: 1 }, 1, 1),
668                    episode(TvShowSource::Tmdb { tmdb_id: 2 }, 1, 2),
669                    episode(TvShowSource::Tmdb { tmdb_id: 3 }, 1, 3),
670                ],
671            )
672            .await
673            .unwrap();
674    }
675}