mod episodes;
mod format;
mod iter;
mod pending;
mod remotes;
mod seasons;
mod series;
mod sync;
mod watched;
use std::collections::HashSet;
use std::future::Future;
use std::path::Path;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use tracing_futures::Instrument;
pub(crate) use self::episodes::EpisodeRef;
pub(crate) use self::seasons::SeasonRef;
use crate::model::{Config, Episode, Pending, RemoteId, Season, Series, SeriesId, Watched};
use crate::queue::Queue;
use crate::service::paths;
#[derive(Default)]
pub(crate) struct Database {
pub(crate) config: Config,
pub(crate) remotes: remotes::Database,
pub(crate) series: series::Database,
pub(crate) episodes: episodes::Database,
pub(crate) seasons: seasons::Database,
pub(crate) watched: watched::Database,
pub(crate) pending: pending::Database,
pub(crate) sync: sync::Database,
pub(crate) changes: Changes,
pub(crate) tasks: Queue,
}
impl Database {
pub(crate) fn load(paths: &paths::Paths) -> Result<Self> {
let mut db = Self::default();
if let Some((format, config)) =
format::load(&paths.config).with_context(|| anyhow!("{}", paths.config.display()))?
{
db.config = config;
if matches!(format, format::Format::Json) {
db.changes.change(Change::Config);
}
}
if let Some((format, remotes)) = format::load_array::<RemoteId>(&paths.remotes)? {
for remote_id in remotes {
match remote_id {
RemoteId::Series { uuid, remotes } => {
for remote_id in remotes {
db.remotes.insert_series(remote_id, uuid);
}
}
RemoteId::Episode { uuid, remotes } => {
for remote_id in remotes {
db.remotes.insert_episode(remote_id, uuid);
}
}
}
}
if matches!(format, format::Format::Json) {
db.changes.change(Change::Remotes);
}
}
if let Some((_format, syncs)) = format::load_array::<sync::Export>(&paths.sync)? {
for sync in syncs {
db.sync.import_push(sync);
}
}
if let Some((format, series)) = format::load_array::<Series>(&paths.series)? {
for mut s in series {
migrate_series(&mut db, &mut s);
db.series.insert(s);
}
if matches!(format, format::Format::Json) {
db.changes.change(Change::Series);
}
}
if let Some((format, watched)) = format::load_array::<Watched>(&paths.watched)? {
for w in watched {
db.watched.insert(w);
}
if matches!(format, format::Format::Json) {
db.changes.change(Change::Watched);
}
}
if let Some((format, pending)) = format::load_array::<Pending>(&paths.pending)? {
db.pending.extend(pending);
if matches!(format, format::Format::Json) {
db.changes.change(Change::Pending);
}
}
if let Some(episodes) = format::load_directory::<_, SeriesId, Episode>(&paths.episodes)? {
for (id, _format, mut episodes) in episodes {
for e in &mut episodes {
if let Some(filename) = e.compat_filename.take() {
e.graphics.filename = Some(filename.into_v2());
db.changes.add_series(&id);
}
}
db.episodes.insert(id, episodes);
}
}
if let Some(seasons) = format::load_directory::<_, SeriesId, Season>(&paths.seasons)? {
for (id, _format, mut seasons) in seasons {
for s in &mut seasons {
if let Some(poster) = s.compat_poster.take() {
s.graphics.poster = Some(poster.into_v2());
db.changes.add_series(&id);
}
}
db.seasons.insert(id, seasons);
}
}
Ok(db)
}
pub(crate) fn save_changes(
&mut self,
paths: &Arc<paths::Paths>,
do_not_save: bool,
) -> impl Future<Output = Result<()>> {
let changes = std::mem::take(&mut self.changes);
let config = changes
.set
.contains(Change::Config)
.then(|| self.config.clone());
let sync = changes
.set
.contains(Change::Sync)
.then(|| self.sync.export());
let watched = changes
.set
.contains(Change::Watched)
.then(|| self.watched.export(&self.episodes));
let pending = changes
.set
.contains(Change::Pending)
.then(|| self.pending.export());
let series = changes
.set
.contains(Change::Series)
.then(|| self.series.export());
let remove_series = changes.remove;
let mut add_series = Vec::with_capacity(changes.add.len());
for id in changes.add {
let episodes = self.episodes.by_series(&id);
let seasons = self.seasons.by_series(&id);
add_series.push((id, episodes.export(), seasons.export()));
}
let remotes = if changes.set.contains(Change::Remotes) {
Some(self.remotes.export())
} else {
None
};
let paths = paths.clone();
let future = async move {
if do_not_save {
return Ok(());
}
tracing::info!("saving database");
let guard = paths.lock.lock().await;
if let Some(config) = config {
format::save_pretty("config", &paths.config, config).await?;
}
if let Some(sync) = sync {
format::save_array("sync", &paths.sync, sync).await?;
}
if let Some(series) = series {
format::save_array("series", &paths.series, series)
.await
.context("series")?;
}
if let Some(watched) = watched {
format::save_array("watched", &paths.watched, watched)
.await
.context("watched")?;
}
if let Some(pending) = pending {
format::save_array("pending", &paths.pending, pending)
.await
.context("pending")?;
}
if let Some(remotes) = remotes {
format::save_array("remotes", &paths.remotes, remotes).await?;
}
for series_id in remove_series {
let episodes_path = paths.episodes.join(format!("{series_id}"));
let seasons_path = paths.seasons.join(format!("{series_id}"));
let a = remove_all("episodes", episodes_path.all());
let b = remove_all("seasons", seasons_path.all());
let _ = tokio::try_join!(a, b)?;
}
for (series_id, episodes, seasons) in add_series {
let episodes_path = paths.episodes.join(format!("{series_id}"));
let seasons_path = paths.seasons.join(format!("{series_id}"));
let a = format::save_array("episodes", &episodes_path, episodes);
let b = format::save_array("seasons", &seasons_path, seasons);
let _ = tokio::try_join!(a, b)?;
}
drop(guard);
Ok(())
};
future.in_current_span()
}
}
#[derive(Clone, Copy, fixed_map::Key)]
pub(crate) enum Change {
Config,
Sync,
Watched,
Pending,
Series,
Remotes,
Queue,
Schedule,
}
#[derive(Default)]
pub(crate) struct Changes {
set: fixed_map::Set<Change>,
remove: HashSet<SeriesId>,
add: HashSet<SeriesId>,
}
impl Changes {
pub(crate) fn change(&mut self, change: Change) {
self.set.insert(change);
}
pub(crate) fn contains(&self, change: Change) -> bool {
self.set.contains(change)
}
#[inline]
pub(crate) fn has_changes(&self) -> bool {
!self.set.is_empty() || !self.remove.is_empty() || !self.add.is_empty()
}
pub(crate) fn add_series(&mut self, id: &SeriesId) {
self.set.insert(Change::Series);
self.remove.remove(id);
self.add.insert(*id);
}
pub(crate) fn remove_series(&mut self, id: &SeriesId) {
self.set.insert(Change::Series);
self.add.remove(id);
self.remove.insert(*id);
}
}
async fn remove_all<const N: usize>(what: &'static str, paths: [&Path; N]) -> Result<()> {
for path in paths {
tracing::trace!("{what}: removing: {}", path.display());
match tokio::fs::remove_file(path).await {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(e.into()),
}
}
Ok(())
}
#[allow(deprecated)]
fn migrate_series(db: &mut Database, s: &mut Series) {
if let Some(image) = s.compat_poster.take() {
s.graphics.poster = Some(image.into_v2());
db.changes.change(Change::Series);
}
if let Some(image) = s.compat_banner.take() {
s.graphics.banner = Some(image.into_v2());
db.changes.change(Change::Series);
}
if let Some(image) = s.compat_fanart.take() {
s.graphics.fanart = Some(image.into_v2());
db.changes.change(Change::Series);
}
if let Some(remote_id) = &s.remote_id {
if let Some(etag) = s.compat_last_etag.take() {
if db.sync.update_last_etag(&s.id, remote_id, etag) {
db.changes.change(Change::Sync);
}
}
let last_modified = s.compat_last_modified.take();
if let Some(last_modified) = &last_modified {
if db
.sync
.update_last_modified(&s.id, remote_id, Some(last_modified))
{
db.changes.change(Change::Sync);
}
}
}
for (remote_id, last_sync) in std::mem::take(&mut s.compat_last_sync) {
if db.sync.import_last_sync(&s.id, &remote_id, &last_sync) {
db.changes.change(Change::Sync);
}
}
}