1use std::collections::HashMap;
2
3use anyhow::Context;
4use entertainarr_domain::language::Language;
5use entertainarr_domain::prelude::SortOrder;
6use entertainarr_domain::tvshow::entity::{
7 ExternalTvShowEpisode, ListTvShowEpisodeParams, TvShowEpisode, TvShowEpisodeField,
8 TvShowEpisodeProgress, TvShowSource,
9};
10use sqlx::types::chrono::{DateTime, NaiveDate, Utc};
11
12use crate::prelude::HasAnyOf;
13use crate::{IndexIter, Wrapper};
14
15impl crate::Pool {
16 #[tracing::instrument(
17 skip_all,
18 fields(
19 otel.kind = "client",
20 db.system = "sqlite",
21 db.name = "tvshow",
22 db.operation = "insert",
23 db.sql.table = "tvshow_episodes",
24 db.query.text = tracing::field::Empty,
25 db.response.returned_rows = tracing::field::Empty,
26 error.type = tracing::field::Empty,
27 error.message = tracing::field::Empty,
28 error.stacktrace = tracing::field::Empty,
29 ),
30 err(Debug),
31 )]
32 pub(crate) async fn upsert_tvshow_episode_row<'c, E: sqlx::SqliteExecutor<'c>>(
33 &self,
34 executor: E,
35 tvshow_season_id: u64,
36 input: &[ExternalTvShowEpisode],
37 ) -> anyhow::Result<Vec<TvShowEpisodeRow>> {
38 let mut qb = sqlx::QueryBuilder::new(
39 "insert into tvshow_episodes (source, tvshow_season_id, episode_number, air_date, production_code, runtime, vote_average, vote_count) ",
40 );
41 qb.push_values(input.iter(), |mut q, row| {
42 q.push_bind(Wrapper(row.source))
43 .push_bind(tvshow_season_id as i64)
44 .push_bind(row.episode_number as i64)
45 .push_bind(row.air_date)
46 .push_bind(row.production_code.as_str())
47 .push_bind(row.runtime.map(|value| value as i64))
48 .push_bind(row.vote_average)
49 .push_bind(row.vote_count as i64);
50 });
51 qb.push(" on conflict (tvshow_season_id, episode_number) do update set ");
52 qb.push(" source = excluded.source,");
53 qb.push(" air_date = excluded.air_date,");
54 qb.push(" production_code = excluded.production_code,");
55 qb.push(" runtime = excluded.runtime,");
56 qb.push(" vote_average = excluded.vote_average,");
57 qb.push(" vote_count = excluded.vote_count");
58 qb.push(" returning id, source, tvshow_season_id, episode_number, air_date, production_code, runtime, vote_average, vote_count, created_at, updated_at");
59
60 tracing::Span::current().record("db.query.text", qb.sql());
61
62 qb.build_query_as()
63 .fetch_all(executor)
64 .await
65 .inspect(crate::record_all)
66 .inspect_err(crate::record_error)
67 .context("unable to upsert tvshow season")
68 }
69}
70
71pub(crate) struct TvShowEpisodeRow {
72 pub id: u64,
73 pub source: TvShowSource,
74 pub tvshow_season_id: u64,
75 pub episode_number: u64,
76 pub air_date: Option<NaiveDate>,
77 pub production_code: String,
78 pub runtime: Option<u32>,
79 pub vote_average: f64,
80 pub vote_count: u64,
81 pub created_at: DateTime<Utc>,
82 pub updated_at: DateTime<Utc>,
83}
84
85impl<'c> sqlx::FromRow<'c, sqlx::sqlite::SqliteRow> for TvShowEpisodeRow {
86 fn from_row(row: &'c sqlx::sqlite::SqliteRow) -> Result<Self, sqlx::Error> {
87 use sqlx::Row;
88
89 let mut idx = IndexIter::default();
90
91 Ok(TvShowEpisodeRow {
92 id: row.try_get(idx.next())?,
93 source: row.try_get(idx.next()).map(Wrapper::inner)?,
94 tvshow_season_id: row.try_get(idx.next())?,
95 episode_number: row.try_get(idx.next())?,
96 air_date: row.try_get(idx.next())?,
97 production_code: row.try_get(idx.next())?,
98 runtime: row.try_get(idx.next())?,
99 vote_average: row.try_get(idx.next())?,
100 vote_count: row.try_get(idx.next())?,
101 created_at: row.try_get(idx.next())?,
102 updated_at: row.try_get(idx.next())?,
103 })
104 }
105}
106
107impl crate::Pool {
108 #[tracing::instrument(
109 skip_all,
110 fields(
111 otel.kind = "client",
112 db.system = "sqlite",
113 db.name = "tvshow",
114 db.operation = "insert",
115 db.sql.table = "tvshow_episode_labels",
116 db.query.text = tracing::field::Empty,
117 db.response.returned_rows = tracing::field::Empty,
118 error.type = tracing::field::Empty,
119 error.message = tracing::field::Empty,
120 error.stacktrace = tracing::field::Empty,
121 ),
122 err(Debug),
123 )]
124 pub(crate) async fn upsert_tvshow_episode_label_row<'c, E: sqlx::SqliteExecutor<'c>>(
125 &self,
126 executor: E,
127 items: impl Iterator<Item = (u64, &ExternalTvShowEpisode)>,
128 ) -> anyhow::Result<Vec<TvShowEpisodeLabelRow>> {
129 let mut qb = sqlx::QueryBuilder::new(
130 "insert into tvshow_episode_labels (tvshow_episode_id, language, name, overview, still_url) ",
131 );
132 qb.push_values(items, |mut q, (id, item)| {
133 q.push_bind(id as i64)
134 .push_bind(Wrapper(item.language))
135 .push_bind(&item.name)
136 .push_bind(&item.overview)
137 .push_bind(&item.still_url);
138 });
139
140 qb.push(" on conflict (tvshow_episode_id, language) do update set ");
141 qb.push(" name = excluded.name,");
142 qb.push(" overview = excluded.overview,");
143 qb.push(" still_url = excluded.still_url");
144 qb.push(" returning tvshow_episode_id, language, name, overview, still_url, created_at, updated_at");
145
146 tracing::Span::current().record("db.query.text", qb.sql());
147
148 qb.build_query_as()
149 .fetch_all(executor)
150 .await
151 .inspect(crate::record_all)
152 .inspect_err(crate::record_error)
153 .context("unable to upsert tvshow episode")
154 }
155}
156
157impl crate::Pool {
158 pub(crate) async fn upsert_tvshow_episodes(
159 &self,
160 tvshow_season_id: u64,
161 input: &[ExternalTvShowEpisode],
162 ) -> anyhow::Result<Vec<TvShowEpisode>> {
163 let mut tx = self
164 .as_ref()
165 .begin()
166 .await
167 .context("unable to begin transaction")?;
168
169 let episodes = self
170 .upsert_tvshow_episode_row(&mut *tx, tvshow_season_id, input)
171 .await?;
172 let episode_ids: HashMap<u64, u64> = HashMap::from_iter(
173 episodes
174 .iter()
175 .map(|episode| (episode.episode_number, episode.id)),
176 );
177 let mut episodes: HashMap<u64, TvShowEpisodeRow> =
178 HashMap::from_iter(episodes.into_iter().map(|episode| (episode.id, episode)));
179 let labels = self
180 .upsert_tvshow_episode_label_row(
181 &mut *tx,
182 input.iter().filter_map(|item| {
183 episode_ids
184 .get(&item.episode_number)
185 .map(|found| (*found, item))
186 }),
187 )
188 .await?;
189
190 tx.commit().await.context("unable to submit transaction")?;
191
192 Ok(labels
193 .into_iter()
194 .filter_map(|label| {
195 episodes
196 .remove(&label.tvshow_episode_id)
197 .map(|found| Wrapper::<TvShowEpisode>::from((found, label)).inner())
198 })
199 .collect())
200 }
201}
202
203#[allow(unused)]
204pub(crate) struct TvShowEpisodeLabelRow {
205 pub tvshow_episode_id: u64,
206 pub language: Language,
207 pub name: String,
208 pub overview: Option<String>,
209 pub still_url: Option<String>,
210 pub created_at: DateTime<Utc>,
211 pub updated_at: DateTime<Utc>,
212}
213
214impl<'c> sqlx::FromRow<'c, sqlx::sqlite::SqliteRow> for TvShowEpisodeLabelRow {
215 fn from_row(row: &'c sqlx::sqlite::SqliteRow) -> Result<Self, sqlx::Error> {
216 use sqlx::Row;
217
218 let mut idx = IndexIter::default();
219
220 Ok(Self {
221 tvshow_episode_id: row.try_get(idx.next())?,
222 language: row.try_get(idx.next()).map(|Wrapper(v)| v)?,
223 name: row.try_get(idx.next())?,
224 overview: row.try_get(idx.next())?,
225 still_url: row.try_get(idx.next())?,
226 created_at: row.try_get(idx.next())?,
227 updated_at: row.try_get(idx.next())?,
228 })
229 }
230}
231
232impl From<(TvShowEpisodeRow, TvShowEpisodeLabelRow)> for Wrapper<TvShowEpisode> {
233 fn from((episode, label): (TvShowEpisodeRow, TvShowEpisodeLabelRow)) -> Self {
234 Wrapper(TvShowEpisode {
235 id: episode.id,
236 source: episode.source,
237 tvshow_season_id: episode.tvshow_season_id,
238 episode_number: episode.episode_number,
239 production_code: episode.production_code,
240 language: label.language,
241 name: label.name,
242 overview: label.overview,
243 still_url: label.still_url,
244 air_date: episode.air_date,
245 runtime: episode.runtime,
246 vote_average: episode.vote_average,
247 vote_count: episode.vote_count,
248 created_at: episode.created_at,
249 updated_at: episode.updated_at,
250 })
251 }
252}
253
254impl<'c> sqlx::FromRow<'c, sqlx::sqlite::SqliteRow> for Wrapper<TvShowEpisode> {
255 fn from_row(row: &'c sqlx::sqlite::SqliteRow) -> Result<Self, sqlx::Error> {
256 use sqlx::Row;
257
258 let mut idx = IndexIter::default();
259
260 Ok(Wrapper(TvShowEpisode {
261 id: row.try_get(idx.next())?,
262 source: row.try_get(idx.next()).map(Wrapper::inner)?,
263 tvshow_season_id: row.try_get(idx.next())?,
264 episode_number: row.try_get(idx.next())?,
265 production_code: row.try_get(idx.next())?,
266 language: row.try_get(idx.next()).map(Wrapper::inner)?,
267 name: row.try_get(idx.next())?,
268 overview: row.try_get(idx.next())?,
269 still_url: row.try_get(idx.next())?,
270 air_date: row.try_get(idx.next())?,
271 runtime: row.try_get(idx.next())?,
272 vote_average: row.try_get(idx.next())?,
273 vote_count: row.try_get(idx.next())?,
274 created_at: row.try_get(idx.next())?,
275 updated_at: row.try_get(idx.next())?,
276 }))
277 }
278}
279
280impl crate::Pool {
281 const FIND_TVSHOW_EPISODE_QUERY: &str = r#"select te.id, te.source, te.tvshow_season_id, te.episode_number, te.production_code, tel.language, tel.name, tel.overview, tel.still_url, te.air_date, te.runtime, te.vote_average, te.vote_count, te.created_at, te.updated_at
282from tvshow_episodes te
283join tvshow_episode_labels tel on te.id = tel.tvshow_episode_id
284where te.id = ? and tel.language = ?
285limit 1"#;
286
287 #[tracing::instrument(
288 skip_all,
289 fields(
290 otel.kind = "client",
291 db.system = "sqlite",
292 db.name = "tvshow",
293 db.operation = "select",
294 db.sql.table = "tvshow_episodes",
295 db.query.text = Self::FIND_TVSHOW_EPISODE_QUERY,
296 db.response.returned_rows = tracing::field::Empty,
297 error.type = tracing::field::Empty,
298 error.message = tracing::field::Empty,
299 error.stacktrace = tracing::field::Empty,
300 ),
301 err(Debug),
302 )]
303 async fn find_tvshow_episode_by_id(
304 &self,
305 episode_id: u64,
306 language: Language,
307 ) -> anyhow::Result<Option<TvShowEpisode>> {
308 sqlx::query_as(Self::FIND_TVSHOW_EPISODE_QUERY)
309 .bind(episode_id as i64)
310 .bind(Wrapper(language))
311 .fetch_optional(self.as_ref())
312 .await
313 .map(Wrapper::maybe_inner)
314 .context("unable to fetch tvshow episode")
315 }
316
317 const LIST_TVSHOW_EPISODE_QUERY: &str = r#"select te.id, te.source, te.tvshow_season_id, te.episode_number, te.production_code, tel.language, tel.name, tel.overview, tel.still_url, te.air_date, te.runtime, te.vote_average, te.vote_count, te.created_at, te.updated_at
318from tvshow_episodes te
319join tvshow_episode_labels tel on te.id = tel.tvshow_episode_id
320join tvshow_seasons ts on ts.id = te.tvshow_season_id"#;
321
322 #[tracing::instrument(
323 skip_all,
324 fields(
325 otel.kind = "client",
326 db.system = "sqlite",
327 db.name = "tvshow",
328 db.operation = "select",
329 db.sql.table = "tvshow_episodes",
330 db.query.text = Self::LIST_TVSHOW_EPISODE_QUERY,
331 db.response.returned_rows = tracing::field::Empty,
332 error.type = tracing::field::Empty,
333 error.message = tracing::field::Empty,
334 error.stacktrace = tracing::field::Empty,
335 ),
336 err(Debug),
337 )]
338 async fn list_tvshow_episode<'a>(
339 &self,
340 params: ListTvShowEpisodeParams<'a>,
341 ) -> anyhow::Result<Vec<TvShowEpisode>> {
342 let mut qb = sqlx::QueryBuilder::new(Self::LIST_TVSHOW_EPISODE_QUERY);
343 if params.filter.subscribed.is_some() {
344 qb.push(" left outer join user_tvshows ut on ut.user_id = ")
345 .push_bind(params.user_id as i64)
346 .push(" and ut.tvshow_id = ts.tvshow_id");
347 }
348
349 qb.push(" where tel.language = ")
350 .push_bind(Wrapper(params.language));
351
352 match params.filter.subscribed {
353 Some(true) => {
354 qb.push(" and ut.created_at is not null");
355 }
356 Some(false) => {
357 qb.push(" and ut.created_at is null");
358 }
359 None => {}
360 }
361
362 if !params.filter.tvshow_ids.is_empty() {
363 qb.push(" and ");
364 qb.push_any("ts.tvshow_id", params.filter.tvshow_ids);
365 }
366 if !params.filter.season_ids.is_empty() {
367 qb.push(" and ");
368 qb.push_any("ts.id", params.filter.season_ids);
369 }
370 qb.push(" order by")
371 .push(match params.sort.field {
372 TvShowEpisodeField::AirDate => " te.air_date",
373 TvShowEpisodeField::EpisodeNumber => " te.episode_number",
374 })
375 .push(match params.sort.order {
376 SortOrder::Asc => " asc",
377 SortOrder::Desc => " desc",
378 });
379
380 qb.push(" limit ")
381 .push_bind(params.page.limit)
382 .push(" offset ")
383 .push_bind(params.page.offset);
384
385 qb.build_query_as()
386 .fetch_all(self.as_ref())
387 .await
388 .map(Wrapper::list)
389 .context("unable to list tvshow episodes")
390 }
391}
392
393impl<'c> sqlx::FromRow<'c, sqlx::sqlite::SqliteRow> for Wrapper<TvShowEpisodeProgress> {
394 fn from_row(row: &'c sqlx::sqlite::SqliteRow) -> Result<Self, sqlx::Error> {
395 use sqlx::Row;
396
397 let mut idx = IndexIter::default();
398
399 Ok(Wrapper(TvShowEpisodeProgress {
400 user_id: row.try_get(idx.next())?,
401 tvshow_episode_id: row.try_get(idx.next())?,
402 progress: row.try_get(idx.next())?,
403 completed: row.try_get(idx.next())?,
404 created_at: row.try_get(idx.next())?,
405 updated_at: row.try_get(idx.next())?,
406 }))
407 }
408}
409
410impl crate::Pool {
411 #[tracing::instrument(
412 skip_all,
413 fields(
414 otel.kind = "client",
415 db.system = "sqlite",
416 db.name = "tvshow",
417 db.operation = "DELETE",
418 db.sql.table = "user_tvshow_episodes",
419 db.query.text = tracing::field::Empty,
420 db.response.returned_rows = tracing::field::Empty,
421 error.type = tracing::field::Empty,
422 error.message = tracing::field::Empty,
423 error.stacktrace = tracing::field::Empty,
424 ),
425 err(Debug),
426 )]
427 async fn delete_tvshow_episode_progressions(
428 &self,
429 user_id: u64,
430 episode_ids: &[u64],
431 ) -> anyhow::Result<()> {
432 let mut qb = sqlx::QueryBuilder::new("delete from user_tvshow_episodes");
433 qb.push(" where user_id = ").push_bind(user_id as i64);
434 qb.push(" and ").push_any("tvshow_episode_id", episode_ids);
435 qb.build()
436 .execute(self.as_ref())
437 .await
438 .inspect_err(super::record_error)
439 .map(|_| ())
440 .context("unable to delete progressions")
441 }
442
443 #[tracing::instrument(
444 skip_all,
445 fields(
446 otel.kind = "client",
447 db.system = "sqlite",
448 db.name = "tvshow",
449 db.operation = "SELECT",
450 db.sql.table = "user_tvshow_episodes",
451 db.query.text = tracing::field::Empty,
452 db.response.returned_rows = tracing::field::Empty,
453 error.type = tracing::field::Empty,
454 error.message = tracing::field::Empty,
455 error.stacktrace = tracing::field::Empty,
456 ),
457 err(Debug),
458 )]
459 async fn list_tvshow_episode_progressions(
460 &self,
461 user_id: u64,
462 episode_ids: &[u64],
463 ) -> anyhow::Result<Vec<entertainarr_domain::tvshow::entity::TvShowEpisodeProgress>> {
464 let mut qb = sqlx::QueryBuilder::new(
465 "select user_id, tvshow_episode_id, progress, completed, created_at, updated_at from user_tvshow_episodes",
466 );
467 qb.push(" where user_id = ").push_bind(user_id as i64);
468 qb.push(" and ").push_any("tvshow_episode_id", episode_ids);
469 qb.build_query_as()
470 .fetch_all(self.as_ref())
471 .await
472 .inspect(super::record_all)
473 .inspect_err(super::record_error)
474 .map(Wrapper::list)
475 .context("unable to delete progressions")
476 }
477
478 #[tracing::instrument(
479 skip_all,
480 fields(
481 otel.kind = "client",
482 db.system = "sqlite",
483 db.name = "tvshow",
484 db.operation = "UPSERT",
485 db.sql.table = "user_tvshow_episodes",
486 db.query.text = tracing::field::Empty,
487 db.response.returned_rows = tracing::field::Empty,
488 error.type = tracing::field::Empty,
489 error.message = tracing::field::Empty,
490 error.stacktrace = tracing::field::Empty,
491 ),
492 err(Debug),
493 )]
494 async fn upsert_tvshow_episode_progressions(
495 &self,
496 inputs: &[entertainarr_domain::tvshow::entity::TvShowEpisodeProgressInput],
497 ) -> anyhow::Result<Vec<TvShowEpisodeProgress>> {
498 if inputs.is_empty() {
499 return Ok(Default::default());
500 }
501
502 let mut qb = sqlx::QueryBuilder::new(
503 "insert into user_tvshow_episodes (user_id, tvshow_episode_id, progress, completed) ",
504 );
505 qb.push_values(inputs, |mut q, item| {
506 q.push_bind(item.user_id as i64)
507 .push_bind(item.tvshow_episode_id as i64)
508 .push_bind(item.progress as i64)
509 .push_bind(item.completed);
510 });
511 qb.push(" on conflict (user_id, tvshow_episode_id) do update set progress = excluded.progress, completed = excluded.completed, updated_at = CURRENT_TIMESTAMP");
512 qb.push(
513 " returning user_id, tvshow_episode_id, progress, completed, created_at, updated_at",
514 );
515
516 let span = tracing::Span::current();
517 span.record("db.query.text", qb.sql());
518
519 qb.build_query_as()
520 .fetch_all(self.as_ref())
521 .await
522 .inspect(super::record_all)
523 .inspect_err(super::record_error)
524 .map(super::Wrapper::list)
525 .context("unable to upsert tvshow episode progresses")
526 }
527}
528
529impl entertainarr_domain::tvshow::prelude::TvShowEpisodeRepository for crate::Pool {
530 async fn find_by_id(
531 &self,
532 episode_id: u64,
533 language: Language,
534 ) -> anyhow::Result<Option<TvShowEpisode>> {
535 self.find_tvshow_episode_by_id(episode_id, language).await
536 }
537
538 async fn list<'a>(
539 &self,
540 params: ListTvShowEpisodeParams<'a>,
541 ) -> anyhow::Result<Vec<TvShowEpisode>> {
542 self.list_tvshow_episode(params).await
543 }
544
545 async fn upsert(
546 &self,
547 tvshow_id: u64,
548 input: &[ExternalTvShowEpisode],
549 ) -> anyhow::Result<Vec<TvShowEpisode>> {
550 self.upsert_tvshow_episodes(tvshow_id, input).await
551 }
552
553 async fn delete_progressions(&self, user_id: u64, episode_ids: &[u64]) -> anyhow::Result<()> {
554 self.delete_tvshow_episode_progressions(user_id, episode_ids)
555 .await
556 }
557
558 async fn list_progressions(
559 &self,
560 user_id: u64,
561 episode_ids: &[u64],
562 ) -> anyhow::Result<Vec<entertainarr_domain::tvshow::entity::TvShowEpisodeProgress>> {
563 self.list_tvshow_episode_progressions(user_id, episode_ids)
564 .await
565 }
566
567 async fn upsert_progressions(
568 &self,
569 inputs: &[entertainarr_domain::tvshow::entity::TvShowEpisodeProgressInput],
570 ) -> anyhow::Result<Vec<TvShowEpisodeProgress>> {
571 self.upsert_tvshow_episode_progressions(inputs).await
572 }
573}
574
575#[cfg(test)]
576mod tests {
577 use entertainarr_domain::language::Language;
578 use entertainarr_domain::tvshow::entity::{ExternalTvShowEpisode, TvShowSource};
579
580 use crate::tvshow::tests::breaking_bad;
581 use crate::tvshow_season::tests::season;
582
583 fn episode(
584 source: TvShowSource,
585 season_number: u64,
586 episode_number: u64,
587 ) -> ExternalTvShowEpisode {
588 ExternalTvShowEpisode {
589 source,
590 language: Language::En,
591 air_date: None,
592 name: format!("episode {episode_number}"),
593 overview: None,
594 production_code: format!("episode_{episode_number}"),
595 episode_number,
596 season_number,
597 runtime: None,
598 still_url: None,
599 vote_average: 2.5,
600 vote_count: 42,
601 }
602 }
603
604 #[tokio::test]
605 async fn should_upsert_missing_tvshow_episodes() {
606 let tmpdir = tempfile::tempdir().unwrap();
607 let pool = crate::Pool::test(&tmpdir.path().join("db")).await;
608
609 let tvshow = pool.upsert_tvshow(&breaking_bad()).await.unwrap();
610 let s1 = pool
611 .upsert_tvshow_season(tvshow.id, &season(tvshow.source, 1))
612 .await
613 .unwrap();
614 let _eps1 = pool
615 .upsert_tvshow_episodes(
616 s1.id,
617 &[
618 episode(TvShowSource::Tmdb { tmdb_id: 1 }, 1, 1),
619 episode(TvShowSource::Tmdb { tmdb_id: 2 }, 1, 2),
620 episode(TvShowSource::Tmdb { tmdb_id: 3 }, 1, 3),
621 ],
622 )
623 .await
624 .unwrap();
625 let s2 = pool
626 .upsert_tvshow_season(tvshow.id, &season(tvshow.source, 2))
627 .await
628 .unwrap();
629 let _eps2 = pool
630 .upsert_tvshow_episodes(
631 s2.id,
632 &[
633 episode(TvShowSource::Tmdb { tmdb_id: 4 }, 2, 4),
634 episode(TvShowSource::Tmdb { tmdb_id: 5 }, 2, 5),
635 episode(TvShowSource::Tmdb { tmdb_id: 6 }, 2, 6),
636 ],
637 )
638 .await
639 .unwrap();
640 }
641
642 #[tokio::test]
643 async fn should_upsert_existing_tvshow_episodes() {
644 let tmpdir = tempfile::tempdir().unwrap();
645 let pool = crate::Pool::test(&tmpdir.path().join("db")).await;
646
647 let tvshow = pool.upsert_tvshow(&breaking_bad()).await.unwrap();
648 let s1 = pool
649 .upsert_tvshow_season(tvshow.id, &season(tvshow.source, 1))
650 .await
651 .unwrap();
652 let _eps1 = pool
653 .upsert_tvshow_episodes(
654 s1.id,
655 &[
656 episode(TvShowSource::Tmdb { tmdb_id: 1 }, 1, 1),
657 episode(TvShowSource::Tmdb { tmdb_id: 2 }, 1, 2),
658 episode(TvShowSource::Tmdb { tmdb_id: 3 }, 1, 3),
659 ],
660 )
661 .await
662 .unwrap();
663 let _eps2 = pool
664 .upsert_tvshow_episodes(
665 s1.id,
666 &[
667 episode(TvShowSource::Tmdb { tmdb_id: 1 }, 1, 1),
668 episode(TvShowSource::Tmdb { tmdb_id: 2 }, 1, 2),
669 episode(TvShowSource::Tmdb { tmdb_id: 3 }, 1, 3),
670 ],
671 )
672 .await
673 .unwrap();
674 }
675}