convert-invert 0.1.0

Orchestrate spotify playlist downloads using soulseek-rs
Documentation
use anyhow::Context;
use diesel::{
    PgConnection, dsl::insert_into, prelude::*, sql_types::Integer, sql_types::Nullable,
    sql_types::Text,
};

use crate::internals::context::context_manager::{RejectedTrack, RetryRequest, Track};
use crate::internals::database::{model, schema};
use crate::internals::search::search_manager::{
    DownloadableFile as RuntimeDownloadableFile, JudgeSubmission as RuntimeJudgeSubmission,
    SearchItem as RuntimeSearchItem,
};

#[derive(QueryableByName)]
struct IdRow {
    #[diesel(sql_type = Integer)]
    id: i32,
}

fn reject_reason_name(reason: model::RejectReasonRow) -> &'static str {
    match reason {
        model::RejectReasonRow::AlreadyDownloaded => "already_downloaded",
        model::RejectReasonRow::LowScore => "low_score",
        model::RejectReasonRow::NotMusic => "not_music",
        model::RejectReasonRow::Banned => "banned",
        model::RejectReasonRow::AbandonedAttemptingSearch => "abandoned_attempting_search",
    }
}

pub struct DatabaseManager<'a> {
    pub connection: &'a mut PgConnection,
}

impl<'a> DatabaseManager<'a> {
    pub fn new(connection: &'a mut PgConnection) -> Self {
        Self { connection }
    }

    fn insert_search_item(
        connection: &mut PgConnection,
        search_item: &RuntimeSearchItem,
    ) -> anyhow::Result<i32> {
        let value = model::NewSearchItemRow::from(search_item);
        insert_into(schema::search_items::table)
            .values(&value)
            .on_conflict(schema::search_items::track_id)
            .do_nothing()
            .execute(connection)
            .context("Insert search item")?;

        let inserted_id = Self::get_search_item_id(connection, search_item)
            .context("Fetch inserted or existing search item")?;
        Ok(inserted_id)
    }

    fn insert_downloadable_file(
        connection: &mut PgConnection,
        downloadable_file: &RuntimeDownloadableFile,
    ) -> anyhow::Result<i32> {
        let value = model::NewDownloadableFileRow::from(downloadable_file);
        insert_into(schema::downloadable_files::table)
            .values(&value)
            .on_conflict((
                schema::downloadable_files::filename,
                schema::downloadable_files::username,
                schema::downloadable_files::size,
            ))
            .do_nothing()
            .execute(connection)
            .context("Insert downloadable file")?;

        let inserted_id = Self::get_downloadable_file_id(connection, downloadable_file)
            .context("Fetch inserted or existing downloadable file")?;
        Ok(inserted_id)
    }

    fn get_downloadable_file_id(
        connection: &mut PgConnection,
        downloadable_file: &RuntimeDownloadableFile,
    ) -> anyhow::Result<i32> {
        use schema::downloadable_files::dsl as df;
        let downloadable_id = schema::downloadable_files::table
            .filter(df::filename.eq(&downloadable_file.filename))
            .filter(df::username.eq(&downloadable_file.username))
            .filter(df::size.eq(downloadable_file.size))
            .select(df::id)
            .get_result(connection)
            .context("Fetch downloadable file id")?;
        Ok(downloadable_id)
    }

    fn upsert_downloaded_file(
        connection: &mut PgConnection,
        downloaded_file: &crate::internals::context::context_manager::DownloadedFile,
    ) -> anyhow::Result<()> {
        use schema::downloaded_file::dsl as dl;
        let track_id = Some(
            Self::get_search_item_id(connection, &downloaded_file.track)
                .context("Fetch downloaded file track id")?,
        );
        let value = model::NewDownloadedFileRow::from_runtime(downloaded_file, track_id);
        let Some(track_id) = value.track else {
            insert_into(schema::downloaded_file::table)
                .values(&value)
                .on_conflict(dl::filename)
                .do_update()
                .set(dl::track.eq(value.track))
                .execute(connection)
                .context("Insert downloaded file without track")?;
            return Ok(());
        };
        diesel::sql_query(
            "INSERT INTO downloaded_file (filename, track)
             VALUES ($1, $2)
             ON CONFLICT (track) WHERE track IS NOT NULL
             DO UPDATE SET filename = EXCLUDED.filename",
        )
        .bind::<Text, _>(&value.filename)
        .bind::<Integer, _>(track_id)
        .execute(connection)
        .context("Insert downloaded file")?;
        Ok(())
    }

    pub fn is_search_item_downloaded(
        &mut self,
        search_item: &RuntimeSearchItem,
    ) -> anyhow::Result<bool> {
        use schema::downloaded_file::dsl as dl;
        let search_id = match Self::get_search_item_id(self.connection, search_item) {
            Ok(search_id) => search_id,
            Err(_) => return Ok(false),
        };
        let count: i64 = schema::downloaded_file::table
            .filter(dl::track.eq(search_id))
            .count()
            .get_result(self.connection)
            .context("Check downloaded track")?;
        Ok(count > 0)
    }

    fn existing_rejected_track_id(
        connection: &mut PgConnection,
        track_id: i32,
        value: &model::NewRejectedTrackRow,
    ) -> anyhow::Result<Option<i32>> {
        let existing = diesel::sql_query(
            "SELECT id
             FROM rejected_track
             WHERE track = $1
               AND reason = $2::reject_reason
               AND value IS NOT DISTINCT FROM $3
             LIMIT 1",
        )
        .bind::<Integer, _>(track_id)
        .bind::<Text, _>(reject_reason_name(value.reason))
        .bind::<Nullable<Text>, _>(value.value.as_deref())
        .get_result::<IdRow>(connection)
        .optional()
        .map(|row| row.map(|row| row.id))
        .context("Fetch existing rejected track")?;
        Ok(existing)
    }

    fn existing_retry_request_id(
        connection: &mut PgConnection,
        value: &model::NewRetryRequestRow,
    ) -> anyhow::Result<Option<i32>> {
        use schema::retry_request::dsl as rr;
        let existing = schema::retry_request::table
            .filter(rr::request.eq(value.request))
            .filter(rr::retry_attempts.eq(value.retry_attempts))
            .filter(rr::failed_download_result.eq(value.failed_download_result))
            .select(rr::id)
            .first::<i32>(connection)
            .optional()
            .context("Fetch existing retry request")?;
        Ok(existing)
    }

    fn update_jugde_submission_score(
        connection: &mut PgConnection,
        judge_submission: &RuntimeJudgeSubmission,
    ) -> anyhow::Result<()> {
        let judge_submission_id = Self::get_judge_submission_id(connection, judge_submission)?;
        diesel::update(
            schema::judge_submissions::table
                .filter(schema::judge_submissions::id.eq(&judge_submission_id)),
        )
        .set(schema::judge_submissions::score.eq(judge_submission.score))
        .execute(connection)
        .context("update and set score")?;
        Ok(())
    }
    fn insert_judge_submission(
        connection: &mut PgConnection,
        judge_submission: &RuntimeJudgeSubmission,
    ) -> anyhow::Result<i32> {
        use schema::judge_submissions::dsl as js;
        let track_id = Self::get_search_item_id(connection, &judge_submission.track)
            .with_context(|| format!("fetching track_id={:?}", judge_submission.track))?;
        let query_id = Self::insert_downloadable_file(connection, &judge_submission.query)
            .with_context(|| format!("fetching track_id={:?}", judge_submission.query))?;

        let value = model::NewJudgeSubmissionRow {
            track: track_id,
            query: query_id,
            score: None,
        };
        insert_into(schema::judge_submissions::table)
            .values(&value)
            .on_conflict((js::track, js::query))
            .do_nothing()
            .execute(connection)
            .context("Insert judge submission")?;
        let inserted_id = schema::judge_submissions::table
            .filter(js::track.eq(track_id))
            .filter(js::query.eq(query_id))
            .select(js::id)
            .get_result(connection)
            .context("Fetch inserted or existing judge submission")?;
        Ok(inserted_id)
    }
    pub fn get_judge_submission_id(
        connection: &mut PgConnection,
        judge_submission: &RuntimeJudgeSubmission,
    ) -> anyhow::Result<i32> {
        use schema::downloadable_files::dsl as df;
        use schema::judge_submissions::dsl as js;
        let query_id: i32 = schema::downloadable_files::table
            .filter(df::filename.eq(&judge_submission.query.filename))
            .filter(df::username.eq(&judge_submission.query.username))
            .filter(df::size.eq(judge_submission.query.size))
            .select(df::id)
            .get_result(connection)
            .context("Getting download id in js")?;
        let search_id = Self::get_search_item_id(connection, &judge_submission.track)?;

        let judge_id = schema::judge_submissions::table
            .filter(js::track.eq(search_id))
            .filter(js::query.eq(query_id))
            .select(js::id)
            .get_result(connection)
            .with_context(|| {
                format!(
                    "Fetch judge submission id for track_id={} query_id={}",
                    search_id, query_id
                )
            })?;
        Ok(judge_id)
    }

    fn insert_retry_request(
        connection: &mut PgConnection,
        retry_request: &RetryRequest,
    ) -> anyhow::Result<()> {
        let request_id = Self::get_judge_submission_id(connection, &retry_request.request)?;
        let failed_download_result =
            Self::get_downloadable_file_id(connection, &retry_request.failed_download_result)?;
        let value = model::NewRetryRequestRow {
            request: request_id,
            retry_attempts: i32::from(retry_request.retry_attempts),
            failed_download_result,
        };
        if Self::existing_retry_request_id(connection, &value)?.is_none() {
            insert_into(schema::retry_request::table)
                .values(&value)
                .execute(connection)
                .context("Insert retry request")?;
        }
        Ok(())
    }

    fn insert_rejected_track(
        connection: &mut PgConnection,
        rejected_track: &RejectedTrack,
    ) -> anyhow::Result<()> {
        let (judge_submission, _) = rejected_track.parts();
        let track_id = Self::get_judge_submission_id(connection, judge_submission)?;
        let value = model::NewRejectedTrackRow::from_runtime(track_id, rejected_track);
        if Self::existing_rejected_track_id(connection, track_id, &value)?.is_none() {
            insert_into(schema::rejected_track::table)
                .values(&value)
                .execute(connection)
                .context("Insert rejected track")?;
        }
        Ok(())
    }
    pub fn get_search_item_id(
        connection: &mut PgConnection,
        search_item: &RuntimeSearchItem,
    ) -> anyhow::Result<i32> {
        use schema::search_items::dsl as sl;
        let search_id = schema::search_items::table
            .filter(sl::track_id.eq(&search_item.track_id))
            .select(schema::search_items::id)
            .get_result(connection)
            .context("database fetch search_id in get seatch id func")?;
        Ok(search_id)
    }

    pub fn load_item_to_database(&mut self, item: &Track) -> anyhow::Result<()> {
        self.connection
            .transaction::<_, anyhow::Error, _>(|connection| {
                match item {
                    Track::Query(search_item) | Track::SearchRetry(search_item) => {
                        Self::insert_search_item(connection, search_item)?;
                    }
                    Track::Result(judge_submission) => {
                        Self::insert_judge_submission(connection, judge_submission)?;
                    }
                    Track::Downloadable(judge_submission) => {
                        Self::update_jugde_submission_score(connection, judge_submission)?;
                    }
                    Track::File(downloaded_file) => {
                        Self::upsert_downloaded_file(connection, downloaded_file)?;
                    }
                    Track::Retry(retry_request) => {
                        Self::insert_retry_request(connection, retry_request)?;
                    }
                    Track::Reject(rejected_track) => {
                        Self::insert_rejected_track(connection, rejected_track)?;
                    }
                }
                Ok(())
            })
            .context("Persist track into database")?;
        Ok(())
    }
}